Source code for particle_tracker_one_d.trajectory.trajectory

import numpy as np
import matplotlib.pyplot as plt


[docs]class Trajectory: """ Object that describes a trajectory. With functions for checking if the trajectory describes real diffusion, convenient plotting and calculations of diffusion coefficients. Parameters ---------- pixel_width: float Defines the length one pixel corresponds to. This value will be used when calculating diffusion coefficients. Default is 1. Attributes ---------- pixel_width particle_positions: np.array Numpy array with all particle positions in the trajectory on the form `np.array((nParticles,), dtype=[('frame_index', np.int16), ('time', np.float32),('position', np.int16),('zeroth_order_moment', np.float32),('second_order_moment', np.float32)])` """ def __init__(self, pixel_width=1): self.particle_positions = np.empty((0,), dtype=[('frame_index', np.int16), ('time', np.float32), ('position', np.float32), ('zeroth_order_moment', np.float32), ('second_order_moment', np.float32)]) self.pixel_width = pixel_width def __add__(self, other): new_trajectory = Trajectory(pixel_width=self.pixel_width) if self.pixel_width != other.pixel_width: raise ValueError('Pixel width must be equal when adding trajectories together.') elif self.particle_positions.shape[0] == 0 and other._particle_positions.shape[0] == 0: return new_trajectory elif self.particle_positions.shape == (0,): new_trajectory.particle_positions = other._particle_positions elif other._particle_positions.shape == (0,): new_trajectory.particle_positions = self.particle_positions elif other._particle_positions[0]['frame_index'] == self.particle_positions[0]['frame_index']: raise ValueError('Both trajectories cant start at same frame index.') elif other._particle_positions.shape[0] == 1 and self.particle_positions.shape[0] == 1: if other._particle_positions['frame_index'][0] < self.particle_positions['frame_index'][0]: new_trajectory.particle_positions = np.append(other._particle_positions, self.particle_positions) else: new_trajectory.particle_positions = np.append(self.particle_positions, other._particle_positions) elif other._particle_positions['frame_index'][0] < self.particle_positions['frame_index'][0]: index = np.where( other._particle_positions['frame_index'] < self.particle_positions[0]['frame_index'] ) new_trajectory.particle_positions = np.append(other._particle_positions[index], self.particle_positions) elif self.particle_positions['frame_index'][0] < other._particle_positions['frame_index'][0]: index = np.where( self.particle_positions['frame_index'] < other._particle_positions[0]['frame_index'] ) new_trajectory.particle_positions = np.append(self.particle_positions[index], other._particle_positions) return new_trajectory @property def velocities(self): """ np.array: The velocities the particle moves at. """ if self.length > 1: return self._calculate_particle_velocities() return 0 @property def density(self): """ float: How dense the trajectory is in time. Returns self.length/(self.particle_positions['frame_index'][-1]-self.particle_positions['frame_index'][0]). """ if self.length == 0 or self.length == 1: return 1 return self.length / (1 + self.particle_positions['frame_index'][-1] - self.particle_positions['frame_index'][0]) @property def length(self): """ int: The length of the trajectory. Returns self.particle_postions.shape[0] """ return self.particle_positions.shape[0]
[docs] def overlaps_with(self, trajectory): """ Check if the trajectories overlaps trajectory: Trajectory to compare with. If both trajectories has any identical elements will return true otherwise false. Returns ------- bool """ if self.length == 0 or trajectory.length == 0: return False for p in trajectory.particle_positions: for p2 in self.particle_positions: if (p['frame_index'] == p2['frame_index']) and (p['position'] == p2['position']): return True return False
[docs] def split(self, trajectory): """ If two trajectories overlaps, this function will split them into three or more non overlapping trajectories. trajectory: Returns ------- list Returns a list with the new trajectories """ if ( (self.particle_positions[0]['frame_index'] > trajectory.particle_positions[-1]['frame_index']) or (self.particle_positions[-1]['frame_index'] < trajectory.particle_positions[0]['frame_index']) ): return [self, trajectory] p1 = self.particle_positions.copy() p2 = trajectory.particle_positions.copy() new_positions = [] while (p1.shape[0] > 0) or (p2.shape[0] > 0): if (p1.shape[0] == 0) and (p2.shape[0] > 0): new_positions.append(p2.copy()) break elif (p2.shape[0] == 0) and (p1.shape[0] > 0): new_positions.append(p1.copy()) break elif p1[0] == p2[0]: index = self._find_index_of_first_not_equal_element(p1, p2) new_positions.append(p1[:index].copy()) p1 = p1[index:] p2 = p2[index:] else: index1, index2 = self._find_last_index_where_no_overlaps_occurs(p1, p2) new_positions.append(p1[:index1 + 1].copy()) new_positions.append(p2[:index2 + 1].copy()) p1 = p1[index1 + 1:] p2 = p2[index2 + 1:] new_trajectories = [] for p in new_positions: if p.shape[0] > 0: t = Trajectory(pixel_width=self.pixel_width) t.particle_positions = p new_trajectories.append(t) return new_trajectories
[docs] def plot_trajectory(self, x='frame_index', y='position', ax=None, **kwargs): """ Plots the trajectory using the frame index and the particle position in pixels. x: str 'frame_index', 'time', 'position' (default), 'zeroth_order_moment', 'second_order_moment' choose the x-axis value y: str 'frame_index' (default), 'time', 'position', 'zeroth_order_moment', 'second_order_moment' choose the y-axis value ax: matplotlib axes instance The axes which you want the frames to plotted on. If none is provided a new instance will be created. **kwargs: Plot settings, any settings which can be used in matplotlib.pyplot.plot method. Returns ------- matplotlib axes instance Returns the axes input argument or creates and returns a new instance of a matplotlib axes object. """ if ax is None: ax = plt.axes() ax.plot(self.particle_positions[x], self.particle_positions[y], **kwargs) return ax
[docs] def plot_velocity_auto_correlation(self, ax=None, **kwargs): """ Plots the particle velocity auto correlation function which can be used for examining if the trajectory describes free diffusion. ax: matplotlib axes instance The axes which you want the frames to plotted on. If none is provided a new instance will be created. **kwargs: Plot settings, any settings which can be used in matplotlib.pyplot.plot method. Returns ------- matplotlib axes instance Returns the axes input argument or creates and returns a new instance of a matplotlib axes object. """ if ax is None: ax = plt.axes() ax.acorr(self.velocities, **kwargs) return ax
[docs] def calculate_mean_square_displacement_function(self): """ Calculate the average squared displacements for different time steps. Returns ------- time: np.array The time corresponding to the mean squared displacements. msd: np.array The mean squared displacements of the trajectory. """ mean_square_displacements = np.zeros((self.particle_positions['frame_index'][-1] - self.particle_positions['frame_index'][0] + 1,), dtype=[('msd', np.float32), ('nr_of_values', np.int16)]) times = np.arange(0, self.particle_positions['frame_index'][-1] - self.particle_positions['frame_index'][0] + 1, dtype=np.float32) * self._calculate_time_step() for first_index, first_position in enumerate(self.particle_positions[:-1]): for second_index, second_position in enumerate(self.particle_positions[first_index + 1:]): index_difference = second_position['frame_index'] - first_position['frame_index'] mean_square_displacements['msd'][index_difference] += ((second_position['position'] - first_position['position']) * self.pixel_width) ** 2 mean_square_displacements['nr_of_values'][index_difference] += 1 for index, msd in enumerate(mean_square_displacements): if mean_square_displacements['nr_of_values'][index] != 0: mean_square_displacements['msd'][index] = msd['msd'] / mean_square_displacements['nr_of_values'][index].astype(np.float32) non_zeros_indices = np.nonzero(mean_square_displacements['nr_of_values']) return times[non_zeros_indices], mean_square_displacements['msd'][non_zeros_indices]
def _append_position(self, particle_position): self.particle_positions = np.append(self.particle_positions, particle_position, axis=0) def _position_exists_in_trajectory(self, particle_position): for p in self.particle_positions: if np.array_equal(p, particle_position): return True def _calculate_particle_velocities(self): time_steps = np.diff(self.particle_positions['time']) position_steps = np.diff(self.particle_positions['position'] * self.pixel_width) return position_steps / time_steps @staticmethod def _remove_non_unique_values(array): return np.unique(array) @staticmethod def _sort_values_low_to_high(array): return np.sort(array)
[docs] def calculate_diffusion_coefficient_from_mean_square_displacement_function(self, fit_range=None): """ Fits a straight line to the mean square displacement function and calculates the diffusion coefficient from the gradient of the line. The mean squared displacement of the particle position is proportional to :math:`2Dt` where :math:`D` is the diffusion coefficient and :math:`t` is the time. fit_range: list, None (default) Define the range of the fit, the data for the fit will be `time[fit_range[0]:fit_range[1]`` and `mean_squared_displacement[fit_range[0]:fit_range[1]]`. Returns ------- diffusion_coefficient: float error: float """ time, mean_square_displacement = self.calculate_mean_square_displacement_function() if fit_range is None: polynomial_coefficients, error_estimate = self._fit_straight_line_to_data(time, mean_square_displacement) else: polynomial_coefficients, error_estimate = self._fit_straight_line_to_data(time[fit_range[0]:fit_range[1]], mean_square_displacement[fit_range[0]:fit_range[1]]) return polynomial_coefficients[0] / 2, error_estimate[0] / 2
@staticmethod def _fit_straight_line_to_data(x, y): polynomial_coefficients, covariance_matrix = np.polyfit(x, y, 1, cov=True) error_estimate = [np.sqrt(covariance_matrix[0, 0]), np.sqrt(covariance_matrix[1, 1])] return polynomial_coefficients, error_estimate
[docs] def calculate_diffusion_coefficient_using_covariance_based_estimator(self, R=None): """ Unbiased estimator of the diffusion coefficient. More info at `https://www.nature.com/articles/nmeth.2904`. If the motion blur coefficient is entered a variance estimate is also calculated. R: float, motion blur coefficient Returns ------- diffusion_coefficient: float variance_estimate: float """ squared_displacements = [] covariance_term = [] for index, first_position in enumerate(self.particle_positions[:-2]): second_position = self.particle_positions[index + 1] third_position = self.particle_positions[index + 2] if first_position['frame_index'] - second_position['frame_index'] == -1 and first_position['frame_index'] - third_position['frame_index'] == -2: squared_displacements.append((self.pixel_width * (second_position['position'] - first_position['position'])) ** 2) covariance_term.append((second_position['position'] - first_position['position']) * ( third_position['position'] - second_position['position']) * self.pixel_width ** 2) time_step = self._calculate_time_step() number_of_points_used = len(squared_displacements) diffusion_coefficient = np.mean(squared_displacements) / (2 * time_step) + np.mean(covariance_term) / time_step if R is not None: localisation_error = R * np.mean(squared_displacements) + (2 * R - 1) * np.mean(covariance_term) epsilon = localisation_error ** 2 / (diffusion_coefficient * time_step) - 2 * R variance_estimate = diffusion_coefficient ** 2 * ((6 + 4 * epsilon + 2 * epsilon ** 2) / number_of_points_used + 4 * (1 + epsilon) ** 2 / (number_of_points_used ** 2)) return diffusion_coefficient, variance_estimate return diffusion_coefficient
def _calculate_time_step(self): return (self.particle_positions['time'][1] - self.particle_positions['time'][0]) / (self.particle_positions['frame_index'][1] - self.particle_positions['frame_index'][0]) @staticmethod def _find_index_of_first_not_equal_element(p1, p2): n = 0 length_of_shortest_trajectory = min(p1.shape[0], p2.shape[0]) while (n < length_of_shortest_trajectory) and (p1[n] == p2[n]): n += 1 return n @staticmethod def _find_last_index_where_no_overlaps_occurs(p1, p2): if p1[0] == p2[0]: return None, None elif (p1.shape[0] == 1) and (p2.shape[0] == 1): return 0, 0 elif p1.shape[0] == 1: n2 = 0 while n2 < (p2.shape[0] - 1) and (p1[0] != p2[n2]): n2 += 1 if p1[0] == p2[n2]: return None, n2 - 1 else: return 0, n2 elif p2.shape[0] == 1: n1 = 0 while n1 < (p1.shape[0] - 1) and (p1[n1] != p2[0]): n1 += 1 if p1[n1] == p2[0]: return n1 - 1, None else: return n1, 0 else: n1 = 0 n2 = 0 while ((n1 < (p1.shape[0] - 1)) or (n2 < (p2.shape[0] - 1))) and (p1[n1] != p2[n2]): if ( (p1[n1]['frame_index'] < p2[n2]['frame_index']) and (n1 < (p1.shape[0] - 1)) ): n1 += 1 elif ( (p2[n2]['frame_index'] < p1[n1]['frame_index']) and (n2 < (p2.shape[0] - 1)) ): n2 += 1 elif ( (p2[n2]['frame_index'] == p1[n1]['frame_index']) and (n1 < (p1.shape[0]) - 1) and (n2 < (p2.shape[0]) - 1) ): n1 += 1 n2 += 1 elif n1 < (p1.shape[0] - 1): n1 += 1 elif n2 < (p2.shape[0] - 1): n2 += 1 if p1[n1] == p2[n2]: return n1 - 1, n2 - 1 return n1, n2