from ..particle_tracker import ParticleTracker
import numpy as np
from ..trajectory import Trajectory
from ..frames.frames import Frames
from scipy.signal import find_peaks
[docs]class ShortestPathFinder:
"""
Class for finding shortest path between points in a set of frames.
Parameters
----------
frames: np.array
The frames in which trajectories are to be found. The shape of the np.array should be (nFrames,xPixels). The intensity of the frames should be normalised according to
:math:`I_n = (I-I_{min})/(I_{max}-I_{min})`, where :math:`I` is the intensity of the frames, :math:`I_{min}`, :math:`I_{max}` are the global intensity minima and maxima of the
frames.
time: np.array
The corresponding time of each frame.
automatic_update: bool
Choose if the class should update itself when changing properties.
Attributes
----------
frames
time
boxcar_width
integration_radius_of_intensity_peaks
particle_detection_threshold
particle_positions
static_points
start_point
end_point
"""
def __init__(self, frames, time, automatic_update=True):
ParticleTracker._validate_class_arguments(frames, time, automatic_update)
self._Frames = Frames(frames=frames, time=time, automatic_update=automatic_update)
self._automatic_update = automatic_update
self._integration_radius_of_intensity_peaks = 1
self._particle_detection_threshold = 0
self._averaged_intensity = frames
self._particle_positions = [None] * self.frames.shape[0]
self._cost_matrix = []
self._association_matrix = []
self._start_point = (0, 0)
self._end_point = (0, 0)
self._static_points = []
self._shortest_path = None
self._trajectory = None
self._cost_coefficients = np.array([0.1, 1, 0], dtype=np.float32)
@property
def frames(self):
"""
np.array:
The frames which the particle tracker tries to find trajectories in. If the property boxcar_width!=0 it will return the smoothed frames.
"""
return self._Frames.frames
@property
def boxcar_width(self):
"""
int:
Number of values used in the boxcar averaging of the frames.
"""
return self._Frames.boxcar_width
@boxcar_width.setter
def boxcar_width(self, width):
if not width == self._Frames.boxcar_width:
self._Frames.boxcar_width = width
if self._automatic_update:
self._find_particle_positions()
self._update_shortest_path()
@property
def integration_radius_of_intensity_peaks(self):
"""
int:
Number of pixels used when integrating the intensity peaks. No particles closer than twice this value will be found. If two peaks are found within twice this value,
the one with highest intensity moment will be kept.
"""
return self._integration_radius_of_intensity_peaks
@integration_radius_of_intensity_peaks.setter
def integration_radius_of_intensity_peaks(self, radius):
if type(radius) is not int:
raise TypeError('Attribute integration_radius_of_intensity_peaks should be of type int')
if not -1 < radius <= self.frames.shape[1] / 2:
raise ValueError(
'Attribute integration_radius_of_intensity_peaks should be a positive integer less or equal the half of the number of pixels in each frame.')
if not radius == self._integration_radius_of_intensity_peaks:
self._integration_radius_of_intensity_peaks = radius
if self._automatic_update:
self._find_particle_positions()
self._update_shortest_path()
@property
def particle_detection_threshold(self):
"""
float:
Defines the threshold value for finding intensity peaks. Local maximas below this threshold will not be
considered as particles.
"""
return self._particle_detection_threshold
@particle_detection_threshold.setter
def particle_detection_threshold(self, threshold):
if not (type(threshold) == int or type(threshold) == float):
raise TypeError('Attribute particle_detection_threshold should be a numerical value between 0 and 1.')
if not 0 <= threshold <= 1:
raise ValueError('Attribute particle_detection_threshold should be a value between 0 and 1.')
if not threshold == self._particle_detection_threshold:
self._particle_detection_threshold = threshold
if self._automatic_update:
self._find_particle_positions()
self._update_shortest_path()
@property
def start_point(self):
"""
tuple:
(frame_index, position_index), The start point of the path you want to find.
"""
return self._start_point
@start_point.setter
def start_point(self, start_point):
if type(start_point) not in [list, tuple, np.ndarray]:
raise TypeError('Start point must be list, tuple or np.array with form (frame_index, position)')
else:
if type(start_point[0]) not in [int, np.int16, np.int32, np.int64]:
raise TypeError('frame_index must be an integer')
if start_point[0] < 0 or start_point[0] > self._Frames.time.shape[0] - 1:
raise ValueError('First value in start point must be an integer between 0 and self.time.shape[0] - 1')
elif start_point[1] < 0 or start_point[1] > self.frames.shape[1] - 1:
raise ValueError(
'Second value in start point must be an integer between 0 and self.frames.shape[1] - 1')
if not (start_point[0] == self._start_point[0] and start_point[1] == self._start_point[1]):
self._start_point = (start_point[0], start_point[1])
if self._automatic_update:
self._find_particle_positions()
self._update_shortest_path()
@property
def end_point(self):
"""
tuple:
(frame_index, position_index), The end point of the path you want to find.
"""
return self._end_point
@end_point.setter
def end_point(self, end_point):
if type(end_point) not in [list, tuple, np.ndarray]:
raise TypeError('End point must be list, tuple or np.array with form (frame_index, position_index)')
else:
if type(end_point[0]) not in [int, np.int16, np.int32, np.int64]:
raise TypeError('frame_index must be an integer.')
if end_point[0] < 0 or end_point[0] > self._Frames.time.shape[0] - 1:
raise ValueError('First value in end point must be an integer between 0 and self.time.shape[0] - 1')
elif end_point[1] < 0 or end_point[1] > self.frames.shape[1] - 1:
raise ValueError('Second value in end point must be an integer between 0 and self.frames.shape[1] - 1')
if not (end_point[0] == self._end_point[0] and end_point[1] == self._end_point[1]):
self._end_point = (end_point[0], end_point[1])
if self._automatic_update:
self._find_particle_positions()
self._update_shortest_path()
@property
def static_points(self):
return self._static_points
@static_points.setter
def static_points(self, static_points):
if type(static_points) not in [list, np.ndarray]:
raise TypeError('End point must be list, or np.array with form [(frame_index, position_index), ...]')
self._static_points = static_points
@property
def particle_positions(self):
"""
np.array:
Numpy array with all particle positions on the form `np.array((nParticles,), dtype=[('frame_index', np.int16),
('time', np.float32),('integer_position', np.int16), ('refined_position', np.float32)])`
"""
return self._particle_positions
@property
def shortest_path(self):
"""
dict:
The shortest path between the start and end point, defined by the cost function. Cost, length and association matrix.
"""
return self._shortest_path
@property
def trajectory(self):
"""
trajectory:
The shortest path between the start and end point represented as a trajectory.
"""
return self._trajectory
[docs] def plot_all_frames(self, ax=None, **kwargs):
"""
ax: matplotlib axes instance
The axes which you want the frames to plotted on. If none is provided a new instance will be created.
**kwargs:
Plot settings, any settings which can be used in matplotlib.pyplot.imshow method.
Returns
-------
matplotlib axes instance
Returns the axes input argument or creates and returns a new instance of an matplotlib axes object.
"""
if ax is None:
ax = plt.axes()
ax.imshow(self.frames, **kwargs)
return ax
[docs] def change_cost_coefficients(self, a=1, b=1, c=1):
"""
Change the coefficients of the cost function :math:`c(p_1,p_2) = a\\cdot (x_{p_1} - x_{p_2})^2 + b \\cdot (m_0(p_1)-m_0(p_2))^2 + b \\cdot (m_2(p_1)-m_2(p_2))^2)`
a: float
b: float
c: float
"""
new_cost_coefficients = np.array([a, b, c], dtype=np.float32)
if np.array_equal(new_cost_coefficients, np.array([0, 0, 0])):
raise ValueError('All cost coefficients can\'t be zero')
if not np.array_equal(new_cost_coefficients, self._cost_coefficients):
self._cost_coefficients = new_cost_coefficients
if self._automatic_update:
self._update_shortest_path()
[docs] def plot_moments(self, ax=None, **kwargs):
"""
ax: matplotlib axes instance
The axes which you want the frames to plotted on. If none is provided a new instance will be created.
**kwargs:
Plot settings, any settings which can be used in matplotlib.pyplot.scatter method.
Returns
-------
matplotlib axes instance
Returns the axes input argument or creates and returns a new instance of an matplotlib axes object.
"""
if ax is None:
ax = plt.axes()
zeroth_order_moments = [m for moments in self._zeroth_order_moments for m in moments]
second_order_moments = [m for moments in self._second_order_moments for m in moments]
ax.scatter(zeroth_order_moments, second_order_moments, **kwargs)
ax.set_xlabel('$m_0$')
ax.set_ylabel('$m_2$')
return ax
@property
def _intensity_of_interest(self):
return self.frames[self.start_point[0]:self.end_point[0] + 1]
@property
def _time_of_interest(self):
return self._Frames.time[self.start_point[0]:self.end_point[0] + 1]
def _find_particle_positions(self):
if self.start_point and self.end_point and self.start_point[0] < self.end_point[0]:
self._find_initial_particle_positions()
self._refine_particle_positions()
self._add_static_points()
def _add_static_points(self):
for position in self._static_points:
self._particle_positions[position[0] - self.start_point[0]] = [position[1]]
def _refine_particle_positions(self):
if self._integration_radius_of_intensity_peaks == 0:
return
for frame_index, positions in enumerate(self._particle_positions):
for index, position in enumerate(positions):
if position == 0 or position + 1 == self._intensity_of_interest.shape[1]:
continue
elif position < self.integration_radius_of_intensity_peaks:
integration_radius = position
elif position > self._intensity_of_interest.shape[1] - self._integration_radius_of_intensity_peaks - 1:
integration_radius = self._intensity_of_interest.shape[1] - position
else:
integration_radius = self.integration_radius_of_intensity_peaks
intensity = self._intensity_of_interest[frame_index][
int(position - integration_radius):int(position + integration_radius + 1)]
intensity = intensity - np.min(intensity)
self._particle_positions[frame_index][index] = position + ParticleTracker._calculate_center_of_mass(
intensity) - integration_radius
def _find_initial_particle_positions(self):
self._particle_positions = [None] * (self.end_point[0] - self.start_point[0] + 1)
for index, frame in enumerate(self._intensity_of_interest):
self._particle_positions[index] = self._find_local_maximas_larger_than_threshold(frame)
self._particle_positions[0] = np.array([self.start_point[1]], dtype=np.float32)
self._particle_positions[-1] = np.array([self.end_point[1]], dtype=np.float32)
def _initialise_association_and_cost_matrix(self):
number_of_frames = len(self._particle_positions) - 1
self._association_matrix = [[] for _ in range(number_of_frames)]
self._cost_matrix = [[] for _ in range(number_of_frames)]
for frame_index in range(0, number_of_frames):
self._association_matrix[frame_index] = np.zeros(
(len(self._particle_positions[frame_index]), len(self._particle_positions[frame_index + 1])),
dtype=bool
)
self._cost_matrix[frame_index] = np.zeros(
(len(self._particle_positions[frame_index]), len(self._particle_positions[frame_index + 1])),
dtype=np.float32
)
def _calculate_cost_matrix(self):
for frame_index, _ in enumerate(self._cost_matrix):
for particle_index, _ in enumerate(self._cost_matrix[frame_index]):
for future_particle_index, _ in enumerate(self._cost_matrix[frame_index][particle_index]):
if frame_index == 0:
self._cost_matrix[frame_index][particle_index][
future_particle_index] = self._calculate_linking_cost(
frame_index,
particle_index,
frame_index + 1,
future_particle_index
)
else:
self._cost_matrix[frame_index][particle_index][future_particle_index] = (
self._calculate_linking_cost(
frame_index,
particle_index,
frame_index + 1,
future_particle_index
) +
np.amin(self._cost_matrix[frame_index - 1].T[particle_index])
)
def _calculate_linking_cost(self, frame_index, particle_index, future_frame_index, future_particle_index):
return (
self._cost_coefficients[0] * (
self.particle_positions[frame_index][particle_index] -
self.particle_positions[future_frame_index][future_particle_index]
) ** 2 +
self._cost_coefficients[1] * (
self._zeroth_order_moments[frame_index][particle_index] -
self._zeroth_order_moments[future_frame_index][future_particle_index]
) ** 2 +
self._cost_coefficients[2] * (
self._second_order_moments[frame_index][particle_index] -
self._second_order_moments[future_frame_index][future_particle_index]
) ** 2
)
def _update_shortest_path(self):
if self.start_point and self.end_point and self.start_point[0] < self.end_point[0]:
self._calculate_particle_moments()
self._initialise_association_and_cost_matrix()
self._calculate_cost_matrix()
self._create_trajectory_from_cost_matrix()
def _calculate_particle_moments(self):
self._zeroth_order_moments = [None] * (self.end_point[0] - self.start_point[0] + 1)
self._second_order_moments = [None] * (self.end_point[0] - self.start_point[0] + 1)
for frame_index, positions in enumerate(self._particle_positions):
self._zeroth_order_moments[frame_index] = np.array(
[self._calculate_zeroth_order_intensity_moment(position, frame_index + self.start_point[0]) for position
in positions], dtype=np.float64)
self._second_order_moments[frame_index] = np.array(
[self._calculate_second_order_intensity_moment(position, frame_index + self.start_point[0]) for position
in positions], dtype=np.float64)
def _create_trajectory_from_cost_matrix(self):
particle_indices = []
prev_lowest_cost_index = 0
for frame_index in range(len(self._cost_matrix)):
lowest_cost_index = np.where(
self._cost_matrix[-frame_index - 1].T[prev_lowest_cost_index] == np.amin(
self._cost_matrix[-frame_index - 1].T[prev_lowest_cost_index])
)[0][0]
particle_indices.append(
lowest_cost_index
)
prev_lowest_cost_index = lowest_cost_index
particle_positions = np.empty((len(particle_indices[::-1]),),
dtype=[('frame_index', np.int16), ('time', np.float32), ('position', np.float32),
('zeroth_order_moment', np.float32), ('second_order_moment', np.float32)])
self._trajectory = Trajectory()
for frame_index, particle_index in enumerate(particle_indices[::-1]):
particle_positions[frame_index]['frame_index'] = frame_index + self.start_point[0]
particle_positions[frame_index]['time'] = self._Frames.time[frame_index + self.start_point[0]]
particle_positions[frame_index]['position'] = self._particle_positions[frame_index][particle_index]
particle_positions[frame_index]['zeroth_order_moment'] = self._zeroth_order_moments[frame_index][
particle_index]
particle_positions[frame_index]['second_order_moment'] = self._second_order_moments[frame_index][
particle_index]
self._trajectory.particle_positions = particle_positions
def _calculate_second_order_intensity_moment(self, position, frame_index):
position = int(round(position))
if self._integration_radius_of_intensity_peaks == 0:
return 0
elif position == 0:
if self._integration_radius_of_intensity_peaks == 1:
return 2 * self.frames[frame_index, 1] / self._calculate_zeroth_order_intensity_moment(position,
frame_index)
else:
second_order_index_array = np.arange(0, self._integration_radius_of_intensity_peaks + 1) ** 2
return (
2 * np.dot(self.frames[frame_index, :self.integration_radius_of_intensity_peaks + 1],
second_order_index_array)
) / self._calculate_zeroth_order_intensity_moment(position, frame_index)
elif position == self.frames.shape[1] - 1:
if self._integration_radius_of_intensity_peaks == 1:
return 2 * self.frames[frame_index, -2] / self._calculate_zeroth_order_intensity_moment(position,
frame_index)
else:
second_order_index_array = np.arange(-self._integration_radius_of_intensity_peaks, 0) ** 2
return 2 * np.dot(self.frames[frame_index, -self._integration_radius_of_intensity_peaks - 1:-1],
second_order_index_array) / self._calculate_zeroth_order_intensity_moment(position,
frame_index)
elif position < self._integration_radius_of_intensity_peaks:
w = self._integration_radius_of_intensity_peaks - position
if w == 1:
second_order_index_array = np.arange(-position, position + 1) ** 2
return (
(
np.dot(self.frames[frame_index, :2 * position + 1], second_order_index_array) +
2 * self._integration_radius_of_intensity_peaks ** 2 * self.frames[
frame_index, position + self._integration_radius_of_intensity_peaks]
) / self._calculate_zeroth_order_intensity_moment(position, frame_index)
)
else:
second_order_index_array = np.arange(-position, position + 1) ** 2
second_order_index_array_big = np.arange(position + 1,
position + self._integration_radius_of_intensity_peaks) ** 2
return (
np.dot(second_order_index_array, self.frames[frame_index, :2 * position + 1]) +
2 * np.dot(second_order_index_array_big,
self.frames[frame_index,
2 * position + 1:2 * position + self._integration_radius_of_intensity_peaks])
) / self._calculate_zeroth_order_intensity_moment(position, frame_index)
elif position > self.frames.shape[1] - 1 - self._integration_radius_of_intensity_peaks:
w = self._integration_radius_of_intensity_peaks - (self.frames.shape[1] - position - 1)
if w == 1:
second_order_index_array = np.arange(-(self.frames.shape[1] - 1 - position),
self.frames.shape[1] - position) ** 2
return (
np.dot(second_order_index_array,
self.frames[frame_index, 2 * position - self.frames.shape[1] + 1:])
+ 2 * self._integration_radius_of_intensity_peaks ** 2 * self.frames[
frame_index, position - self._integration_radius_of_intensity_peaks]
) / self._calculate_zeroth_order_intensity_moment(position, frame_index)
else:
second_order_index_array = np.arange(-(self.frames.shape[1] - 1 - position),
self.frames.shape[1] - position) ** 2
second_order_index_array_big = np.arange(- self._integration_radius_of_intensity_peaks,
-(self.frames.shape[1] - position) + 1) ** 2
return (
np.dot(second_order_index_array,
self.frames[frame_index, -2 * (self.frames.shape[1] - position) + 1:]) +
2 * np.dot(second_order_index_array_big,
self.frames[frame_index,
position - self._integration_radius_of_intensity_peaks:-2 * (
self.frames.shape[1] - position) + 1])
) / self._calculate_zeroth_order_intensity_moment(position, frame_index)
else:
w = self._integration_radius_of_intensity_peaks
second_order_index_array = np.arange(-w, w + 1) ** 2
return np.dot(self.frames[frame_index, position - w:position + w + 1],
second_order_index_array) / self._calculate_zeroth_order_intensity_moment(position,
frame_index)
def _calculate_zeroth_order_intensity_moment(self, position, frame_index):
position = int(round(position))
if self._integration_radius_of_intensity_peaks == 0:
return self.frames[frame_index, position]
elif position == 0:
if self._integration_radius_of_intensity_peaks == 1:
return self.frames[frame_index, position] + 2 * self.frames[frame_index, position + 1]
else:
return (
self.frames[frame_index, position] +
2 * np.sum(
self.frames[frame_index, position + 1:position + 1 + self._integration_radius_of_intensity_peaks])
)
elif position == self.frames.shape[1] - 1:
if self._integration_radius_of_intensity_peaks == 1:
return self.frames[frame_index, position] + 2 * self.frames[frame_index, position - 1]
else:
return (
self.frames[frame_index, position] +
2 * np.sum(
self.frames[frame_index, position - self._integration_radius_of_intensity_peaks:position])
)
elif position < self._integration_radius_of_intensity_peaks:
w = self._integration_radius_of_intensity_peaks - position
if w == 1:
return np.sum(self.frames[frame_index, :2 * position + 1]) + 2 * self.frames[
frame_index, position + w + 1]
else:
return (
np.sum(self.frames[frame_index, :2 * position + 1]) +
2 * np.sum(self.frames[frame_index, 2 * position + 1:2 * position + w + 1])
)
elif position > self.frames.shape[1] - 1 - self._integration_radius_of_intensity_peaks:
w = self._integration_radius_of_intensity_peaks - (self.frames.shape[1] - position - 1)
if w == 1:
return (
np.sum(self.frames[frame_index, -2 * (self.frames.shape[1] - position) + 1:]) +
2 * self.frames[frame_index, -2 * (self.frames.shape[1] - position)]
)
else:
return (
np.sum(self.frames[frame_index, -2 * (self.frames.shape[1] - position) + 1:]) +
2 * np.sum(
self.frames[frame_index, position - self._integration_radius_of_intensity_peaks:position - 1])
)
else:
w = self._integration_radius_of_intensity_peaks
return np.sum(self.frames[frame_index, position - w:position + w + 1])
def _find_local_maximas_larger_than_threshold(self, y):
local_maximas, _ = find_peaks(y, height=self.particle_detection_threshold,
distance=2 * self.integration_radius_of_intensity_peaks)
return local_maximas.astype(np.float32)
@staticmethod
def _validate_class_arguments(frames, time, automatic_update):
ParticleTracker._test_if_frames_have_correct_format(frames)
ParticleTracker._test_if_time_has_correct_format(time)
ParticleTracker._test_if_time_and_frames_has_same_length(time, frames)
ParticleTracker._test_if_automatic_update_has_correct_format(automatic_update)
@staticmethod
def _test_if_frames_have_correct_format(frames):
if type(frames) is not np.ndarray:
raise TypeError('Class argument frames not of type np.ndarray')
if not (len(frames.shape) == 2 and frames.shape[0] > 1 and frames.shape[1] > 2):
raise ValueError(
'Class argument frames need to be of shape (nFrames,nPixels) with nFrames > 1 and nPixels >2')
if not (np.max(frames.flatten()) == 1 and np.min(frames.flatten()) == 0):
raise ValueError(
'Class argument frames not normalised. Max value of frames should be 1 and min value should be 0.')
return True
@staticmethod
def _test_if_time_has_correct_format(time):
if type(time) is not np.ndarray:
raise TypeError('Class argument frames not of type np.ndarray')
if not (len(time.shape) == 1 and time.shape[0] > 1):
raise ValueError('Class argument time need to be of shape (nFrames,) with nFrames > 1.')
if not all(np.diff(time) > 0):
raise ValueError('Class argument time not increasing monotonically.')
return True
@staticmethod
def _test_if_time_and_frames_has_same_length(time, frames):
if not time.shape[0] == frames.shape[0]:
raise ValueError('Class arguments time and frames does not of equal length.')
return True
@staticmethod
def _test_if_automatic_update_has_correct_format(automatic_update):
if not type(automatic_update) == bool:
raise ValueError('Class argument automatic_update must be True or False.')
return True