""" Implements classes to analyze outputs from exhaustive template matching.
Copyright (c) 2023 European Molecular Biology Laboratory
Author: Valentin Maurer <valentin.maurer@embl-hamburg.de>
"""
from functools import wraps
from abc import ABC, abstractmethod
from typing import Tuple, List, Dict, Generator
import numpy as np
from skimage.feature import peak_local_max
from skimage.registration._phase_cross_correlation import _upsampled_dft
from ._utils import score_to_cart
from ..backends import backend as be
from ..matching_utils import split_shape
from ..types import BackendArray, NDArray
from ..rotations import euler_to_rotationmatrix
__all__ = [
"PeakCaller",
"PeakCallerSort",
"PeakCallerMaximumFilter",
"PeakCallerFast",
"PeakCallerRecursiveMasking",
"PeakCallerScipy",
"PeakClustering",
"filter_points",
"filter_points_indices",
]
PeakType = Tuple[BackendArray, BackendArray]
def _filter_bucket(
coordinates: BackendArray, min_distance: Tuple[float], scores: BackendArray = None
) -> BackendArray:
coordinates = be.subtract(coordinates, be.min(coordinates, axis=0))
bucket_indices = be.astype(be.divide(coordinates, min_distance), int)
multiplier = be.power(
be.max(bucket_indices, axis=0) + 1, be.arange(bucket_indices.shape[1])
)
bucket_indices = be.multiply(bucket_indices, multiplier, out=bucket_indices)
flattened_indices = be.sum(bucket_indices, axis=1)
if scores is not None:
_, inverse_indices = be.unique(flattened_indices, return_inverse=True)
# Avoid bucket index overlap
scores = be.subtract(scores, be.min(scores))
scores = be.divide(scores, be.max(scores) + 0.1, out=scores)
scores = be.subtract(inverse_indices, scores)
indices = be.argsort(scores)
sorted_buckets = inverse_indices[indices]
mask = sorted_buckets[1:] != sorted_buckets[:-1]
mask = be.concatenate((be.full((1,), fill_value=1, dtype=mask.dtype), mask))
return indices[mask]
_, unique_indices = be.unique(flattened_indices, return_index=True)
return unique_indices[be.argsort(unique_indices)]
def filter_points_indices(
coordinates: BackendArray,
min_distance: float,
bucket_cutoff: int = 1e5,
batch_dims: Tuple[int] = None,
scores: BackendArray = None,
) -> BackendArray:
from ..extensions import find_candidate_indices
if min_distance <= 0:
return be.arange(coordinates.shape[0])
n_coords = coordinates.shape[0]
if n_coords == 0:
return ()
if batch_dims is not None:
coordinates_new = be.zeros(coordinates.shape, coordinates.dtype)
coordinates_new[:] = coordinates
coordinates_new[..., batch_dims] = be.astype(
coordinates[..., batch_dims] * (2 * min_distance), coordinates_new.dtype
)
coordinates = coordinates_new
if isinstance(coordinates, np.ndarray) and n_coords < bucket_cutoff:
if scores is not None:
sorted_indices = np.argsort(-scores)
coordinates = coordinates[sorted_indices]
indices = find_candidate_indices(coordinates, min_distance)
if scores is not None:
return sorted_indices[indices]
elif n_coords > bucket_cutoff or not isinstance(coordinates, np.ndarray):
return _filter_bucket(coordinates, min_distance, scores)
distances = be.linalg.norm(coordinates[:, None] - coordinates, axis=-1)
distances = be.tril(distances)
keep = be.sum(distances > min_distance, axis=1)
indices = be.arange(coordinates.shape[0])
return indices[keep == indices]
def filter_points(
coordinates: NDArray, min_distance: Tuple[int], batch_dims: Tuple[int] = None
) -> BackendArray:
unique_indices = filter_points_indices(coordinates, min_distance, batch_dims)
coordinates = coordinates[unique_indices]
return coordinates
def batchify(shape: Tuple[int], batch_dims: Tuple[int] = None) -> List:
if batch_dims is None:
yield (tuple(slice(None) for _ in shape), tuple(0 for _ in shape))
return None
batch_ranges = [range(shape[dim]) for dim in batch_dims]
def _generate_slices_recursive(current_dim, current_indices):
if current_dim == len(batch_dims):
slice_list, offset_list, batch_index = [], [], 0
for i in range(len(shape)):
if i in batch_dims:
index = current_indices[batch_index]
slice_list.append(slice(index, index + 1))
offset_list.append(index)
batch_index += 1
else:
slice_list.append(slice(None))
offset_list.append(0)
yield (tuple(slice_list), tuple(offset_list))
else:
for index in batch_ranges[current_dim]:
yield from _generate_slices_recursive(
current_dim + 1, current_indices + (index,)
)
yield from _generate_slices_recursive(0, ())
[docs]
class PeakCaller(ABC):
"""
Base class for peak calling algorithms.
Parameters
----------
shape : tuple of int
Score space shape. Used to determine dimension of peak calling problem.
num_peaks : int, optional
Number of candidate peaks to consider.
min_distance : int, optional
Minimum distance between peaks, 1 by default
min_boundary_distance : int, optional
Minimum distance to array boundaries, 0 by default.
min_score : float, optional
Minimum score from which to consider peaks.
max_score : float, optional
Maximum score upon which to consider peaks.
batch_dims : int, optional
Peak calling batch dimensions.
**kwargs
Optional keyword arguments.
Raises
------
ValueError
If num_peaks is less than or equal to zero.
If min_distances is less than zero.
"""
def __init__(
self,
shape: int,
num_peaks: int = 1000,
min_distance: int = 1,
min_boundary_distance: int = 0,
min_score: float = None,
max_score: float = None,
batch_dims: Tuple[int] = None,
shm_handler: object = None,
**kwargs,
):
if num_peaks <= 0:
raise ValueError("num_peaks has to be larger than 0.")
if min_distance < 0:
raise ValueError("min_distance has to be non-negative.")
if min_boundary_distance < 0:
raise ValueError("min_boundary_distance has to be non-negative.")
ndim = len(shape)
self.translations = be.full(
(num_peaks, ndim), fill_value=-1, dtype=be._int_dtype
)
self.rotations = be.full(
(num_peaks, ndim, ndim), fill_value=0, dtype=be._float_dtype
)
for i in range(ndim):
self.rotations[:, i, i] = 1.0
self.scores = be.full((num_peaks,), fill_value=0, dtype=be._float_dtype)
self.details = be.full((num_peaks,), fill_value=0, dtype=be._float_dtype)
self.num_peaks = int(num_peaks)
self.min_distance = int(min_distance)
self.min_boundary_distance = int(min_boundary_distance)
self.batch_dims = batch_dims
if batch_dims is not None:
self.batch_dims = tuple(int(x) for x in self.batch_dims)
self.min_score, self.max_score = min_score, max_score
# Postprocessing arguments
self.fourier_shift = kwargs.get("fourier_shift", None)
self.convolution_mode = kwargs.get("convolution_mode", None)
self.targetshape = kwargs.get("targetshape", None)
self.templateshape = kwargs.get("templateshape", None)
def __iter__(self) -> Generator:
"""
Returns a generator to list objects containing translation,
rotation, score and details of a given candidate.
"""
self.peak_list = [
be.to_cpu_array(self.translations),
be.to_cpu_array(self.rotations),
be.to_cpu_array(self.scores),
be.to_cpu_array(self.details),
]
yield from self.peak_list
def _get_peak_mask(self, peaks: BackendArray, scores: BackendArray) -> BackendArray:
if not len(peaks):
return None
valid_peaks = be.full((peaks.shape[0],), fill_value=1) == 1
if self.min_boundary_distance > 0:
upper_limit = be.subtract(
be.to_backend_array(scores.shape), self.min_boundary_distance
)
valid_peaks = be.multiply(
peaks < upper_limit,
peaks >= self.min_boundary_distance,
)
if self.batch_dims is not None:
valid_peaks[..., self.batch_dims] = True
valid_peaks = be.sum(valid_peaks, axis=1) == peaks.shape[1]
# Score thresholds and nan removal
peak_scores = scores[tuple(peaks.T)]
valid_peaks = be.multiply(peak_scores == peak_scores, valid_peaks)
if self.min_score is not None:
valid_peaks = be.multiply(peak_scores >= self.min_score, valid_peaks)
if self.max_score is not None:
valid_peaks = be.multiply(peak_scores <= self.max_score, valid_peaks)
if be.sum(valid_peaks) == 0:
return None
# Ensure consistent upper limit of input peaks for _update step
if (
be.sum(valid_peaks) > self.num_peaks
or peak_scores.shape[0] > self.num_peaks
):
peak_indices = self._top_peaks(
peaks, scores=peak_scores * valid_peaks, num_peaks=2 * self.num_peaks
)
valid_peaks = be.full(peak_scores.shape, 0, bool)
valid_peaks[peak_indices] = True
if self.min_score is not None:
valid_peaks = be.multiply(peak_scores >= self.min_score, valid_peaks)
if self.max_score is not None:
valid_peaks = be.multiply(peak_scores <= self.max_score, valid_peaks)
if valid_peaks.shape[0] != peaks.shape[0]:
return None
return valid_peaks
def _apply_over_batch(func):
@wraps(func)
def wrapper(self, scores, rotation_matrix, **kwargs):
for subset, batch_offset in batchify(scores.shape, self.batch_dims):
yield func(
self,
scores=scores[subset],
rotation_matrix=rotation_matrix,
batch_offset=batch_offset,
**kwargs,
)
return wrapper
@_apply_over_batch
def _call_peaks(self, scores, rotation_matrix, batch_offset=None, **kwargs):
peak_positions, peak_details = self.call_peaks(
scores=scores,
rotation_matrix=rotation_matrix,
min_score=self.min_score,
max_score=self.max_score,
batch_offset=batch_offset,
**kwargs,
)
if peak_positions is None:
return None, None
peak_positions = be.to_backend_array(peak_positions)
if batch_offset is not None:
batch_offset = be.to_backend_array(batch_offset)
peak_positions = be.add(peak_positions, batch_offset, out=peak_positions)
peak_positions = be.astype(peak_positions, int)
return peak_positions, peak_details
[docs]
def __call__(self, scores: BackendArray, rotation_matrix: BackendArray, **kwargs):
"""
Update the internal parameter store based on input array.
Parameters
----------
scores : BackendArray
Score space data.
rotation_matrix : BackendArray
Rotation matrix used to obtain the score array.
**kwargs
Optional keyword aguments passed to :py:meth:`PeakCaller.call_peaks`.
"""
for ret in self._call_peaks(scores=scores, rotation_matrix=rotation_matrix):
peak_positions, peak_details = ret
if peak_positions is None:
continue
valid_peaks = self._get_peak_mask(peaks=peak_positions, scores=scores)
if valid_peaks is None:
continue
peak_positions = peak_positions[valid_peaks]
peak_scores = scores[tuple(peak_positions.T)]
if peak_details is not None:
peak_details = peak_details[valid_peaks]
# peak_details, peak_scores = peak_scores, -peak_details
else:
peak_details = be.full(peak_scores.shape, fill_value=-1)
rotations = be.repeat(
rotation_matrix.reshape(1, *rotation_matrix.shape),
peak_positions.shape[0],
axis=0,
)
self._update(
peak_positions=peak_positions,
peak_details=peak_details,
peak_scores=peak_scores,
rotations=rotations,
)
return None
[docs]
@abstractmethod
def call_peaks(self, scores: BackendArray, **kwargs) -> PeakType:
"""
Call peaks in the score space.
Parameters
----------
scores : BackendArray
Score array.
**kwargs : dict
Optional keyword arguments passed to underlying implementations.
Returns
-------
Tuple[BackendArray, BackendArray]
Array of peak coordinates and peak details.
"""
[docs]
@classmethod
def merge(cls, candidates=List[List], **kwargs) -> Tuple:
"""
Merge multiple instances of :py:class:`PeakCaller`.
Parameters
----------
candidates : list of lists
Obtained by invoking list on the generator returned by __iter__.
**kwargs
Optional keyword arguments.
Returns
-------
Tuple
Tuple of translation, rotation, score and details of candidates.
"""
if "shape" not in kwargs:
kwargs["shape"] = tuple(1 for _ in range(candidates[0][0].shape[1]))
base = cls(**kwargs)
for candidate in candidates:
if len(candidate) == 0:
continue
peak_positions, rotations, peak_scores, peak_details = candidate
base._update(
peak_positions=be.to_backend_array(peak_positions),
peak_details=be.to_backend_array(peak_details),
peak_scores=be.to_backend_array(peak_scores),
rotations=be.to_backend_array(rotations),
offset=kwargs.get("offset", None),
)
return tuple(base)
[docs]
@staticmethod
def oversample_peaks(
scores: BackendArray, peak_positions: BackendArray, oversampling_factor: int = 8
):
"""
Refines peaks positions in the corresponding score space.
Parameters
----------
scores : BackendArray
The d-dimensional array representing the score space.
peak_positions : BackendArray
An array of shape (n, d) containing the peak coordinates
to be refined, where n is the number of peaks and d is the
dimensionality of the score space.
oversampling_factor : int, optional
The oversampling factor for Fourier transforms. Defaults to 8.
Returns
-------
BackendArray
An array of shape (n, d) containing the refined subpixel
coordinates of the peaks.
Notes
-----
Floating point peak positions are determined by oversampling the
scores around peak_positions. The accuracy
of refinement scales with 1 / oversampling_factor.
References
----------
.. [1] https://scikit-image.org/docs/stable/api/skimage.registration.html
.. [2] Manuel Guizar-Sicairos, Samuel T. Thurman, and
James R. Fienup, “Efficient subpixel image registration
algorithms,” Optics Letters 33, 156-158 (2008).
DOI:10.1364/OL.33.000156
"""
scores = be.to_numpy_array(scores)
peak_positions = be.to_numpy_array(peak_positions)
peak_positions = np.round(
np.divide(
np.multiply(peak_positions, oversampling_factor), oversampling_factor
)
)
upsampled_region_size = np.ceil(np.multiply(oversampling_factor, 1.5))
dftshift = np.round(np.divide(upsampled_region_size, 2.0))
sample_region_offset = np.subtract(
dftshift, np.multiply(peak_positions, oversampling_factor)
)
scores_ft = np.fft.fftn(scores).conj()
for index in range(sample_region_offset.shape[0]):
cross_correlation_upsampled = _upsampled_dft(
data=scores_ft,
upsampled_region_size=upsampled_region_size,
upsample_factor=oversampling_factor,
axis_offsets=sample_region_offset[index],
).conj()
maxima = np.unravel_index(
np.argmax(np.abs(cross_correlation_upsampled)),
cross_correlation_upsampled.shape,
)
maxima = np.divide(np.subtract(maxima, dftshift), oversampling_factor)
peak_positions[index] = np.add(peak_positions[index], maxima)
peak_positions = be.to_backend_array(peak_positions)
return peak_positions
def _top_peaks(self, positions, scores, num_peaks: int = None):
num_peaks = be.size(scores) if not num_peaks else num_peaks
if self.batch_dims is None:
top_n = min(be.size(scores), num_peaks)
top_scores, *_ = be.topk_indices(scores, top_n)
return top_scores
# Not very performant but fairly robust
batch_indices = positions[..., self.batch_dims]
batch_indices = be.subtract(batch_indices, be.min(batch_indices, axis=0))
multiplier = be.power(
be.max(batch_indices, axis=0) + 1,
be.arange(batch_indices.shape[1]),
)
batch_indices = be.multiply(batch_indices, multiplier, out=batch_indices)
batch_indices = be.sum(batch_indices, axis=1)
unique_indices, batch_counts = be.unique(batch_indices, return_counts=True)
total_indices = be.arange(scores.shape[0])
batch_indices = [total_indices[batch_indices == x] for x in unique_indices]
top_scores = be.concatenate(
[
total_indices[indices][
be.topk_indices(scores[indices], min(y, num_peaks))
]
for indices, y in zip(batch_indices, batch_counts)
]
)
return top_scores
def _update(
self,
peak_positions: BackendArray,
peak_details: BackendArray,
peak_scores: BackendArray,
rotations: BackendArray,
offset: BackendArray = None,
):
"""
Update internal parameter store.
Parameters
----------
peak_positions : BackendArray
Position of peaks (n, d).
peak_details : BackendArray
Details of each peak (n, ).
peak_scores: BackendArray
Score at each peak (n,).
rotations: BackendArray
Rotation at each peak (n, d, d).
offset : BackendArray, optional
Translation offset, e.g. from splitting, (d, ).
"""
if offset is not None:
offset = be.astype(be.to_backend_array(offset), peak_positions.dtype)
peak_positions = be.add(peak_positions, offset, out=peak_positions)
positions = be.concatenate((self.translations, peak_positions))
rotations = be.concatenate((self.rotations, rotations))
scores = be.concatenate((self.scores, peak_scores))
details = be.concatenate((self.details, peak_details))
# topk filtering after distances yields more distributed peak calls
distance_order = filter_points_indices(
coordinates=positions,
min_distance=self.min_distance,
batch_dims=self.batch_dims,
scores=scores,
)
top_scores = self._top_peaks(
positions[distance_order, :], scores[distance_order], self.num_peaks
)
final_order = distance_order[top_scores]
self.translations = positions[final_order, :]
self.rotations = rotations[final_order, :]
self.scores = scores[final_order]
self.details = details[final_order]
def _postprocess(self, **kwargs):
if not len(self.translations):
return self
positions, valid_peaks = score_to_cart(self.translations, **kwargs)
self.translations = positions[valid_peaks]
self.rotations = self.rotations[valid_peaks]
self.scores = self.scores[valid_peaks]
self.details = self.details[valid_peaks]
return self
[docs]
class PeakCallerSort(PeakCaller):
"""
A :py:class:`PeakCaller` subclass that first selects ``num_peaks``
highest scores.
"""
[docs]
def call_peaks(self, scores: BackendArray, **kwargs) -> PeakType:
flat_scores = scores.reshape(-1)
k = min(self.num_peaks, be.size(flat_scores))
top_k_indices, *_ = be.topk_indices(flat_scores, k)
coordinates = be.unravel_index(top_k_indices, scores.shape)
coordinates = be.transpose(be.stack(coordinates))
return coordinates, None
[docs]
class PeakCallerMaximumFilter(PeakCaller):
"""
Find local maxima by applying a maximum filter and enforcing a distance
constraint subsequently. This is similar to the strategy implemented in
:obj:`skimage.feature.peak_local_max`.
"""
[docs]
def call_peaks(self, scores: BackendArray, **kwargs) -> PeakType:
return be.max_filter_coordinates(scores, self.min_distance), None
[docs]
class PeakCallerFast(PeakCaller):
"""
Subdivides the score space into squares with edge length ``min_distance``
and determiens maximum value for each. In a second pass, all local maxima
that are not the local maxima in a ``min_distance`` square centered around them
are removed.
"""
[docs]
def call_peaks(self, scores: BackendArray, **kwargs) -> PeakType:
splits = {i: x // self.min_distance for i, x in enumerate(scores.shape)}
slices = split_shape(scores.shape, splits)
coordinates = be.to_backend_array(
[
be.unravel_index(be.argmax(scores[subvol]), scores[subvol].shape)
for subvol in slices
]
)
offset = be.to_backend_array(
[tuple(x.start for x in subvol) for subvol in slices]
)
be.add(coordinates, offset, out=coordinates)
coordinates = coordinates[be.argsort(-scores[tuple(coordinates.T)])]
if coordinates.shape[0] == 0:
return None
starts = be.maximum(coordinates - self.min_distance, 0)
stops = be.minimum(coordinates + self.min_distance, scores.shape)
slices_list = [
tuple(slice(*coord) for coord in zip(start_row, stop_row))
for start_row, stop_row in zip(starts, stops)
]
keep = [
score_subvol >= be.max(scores[subvol])
for subvol, score_subvol in zip(slices_list, scores[tuple(coordinates.T)])
]
coordinates = coordinates[keep,]
if len(coordinates) == 0:
return coordinates, None
return coordinates, None
[docs]
class PeakCallerRecursiveMasking(PeakCaller):
"""
Identifies peaks iteratively by selecting the top score and masking
a region around it.
"""
[docs]
def call_peaks(
self,
scores: BackendArray,
rotation_matrix: BackendArray,
mask: BackendArray = None,
min_score: float = None,
rotations: BackendArray = None,
rotation_mapping: Dict = None,
**kwargs,
) -> PeakType:
"""
Call peaks in the score space.
Parameters
----------
scores : BackendArray
Data array of scores.
rotation_matrix : BackendArray
Rotation matrix.
mask : BackendArray, optional
Mask array, by default None.
rotations : BackendArray, optional
Rotation space array, by default None.
rotation_mapping : Dict optional
Dictionary mapping values in rotations to Euler angles.
By default None
min_score : float
Minimum score value to consider. If provided, superseeds limit given
by :py:attr:`PeakCaller.num_peaks`.
Returns
-------
Tuple[BackendArray, BackendArray]
Array of peak coordinates and peak details.
Notes
-----
By default, scores are masked using a box with edge length self.min_distance.
If mask is provided, elements around each peak will be multiplied by the mask
values. If rotations and rotation_mapping is provided, the respective
rotation will be applied to the mask, otherwise rotation_matrix is used.
"""
coordinates, masking_function = [], self._mask_scores_rotate
if mask is None:
masking_function = self._mask_scores_box
shape = tuple(self.min_distance for _ in range(scores.ndim))
mask = be.zeros(shape, dtype=be._float_dtype)
rotated_template = be.zeros(mask.shape, dtype=mask.dtype)
peak_limit = self.num_peaks
if min_score is not None:
peak_limit = be.size(scores)
else:
min_score = be.min(scores) - 1
scores_copy = be.zeros(scores.shape, dtype=scores.dtype)
scores_copy[:] = scores
while True:
be.argmax(scores_copy)
peak = be.unravel_index(
indices=be.argmax(scores_copy), shape=scores_copy.shape
)
if scores_copy[tuple(peak)] < min_score:
break
coordinates.append(peak)
current_rotation_matrix = self._get_rotation_matrix(
peak=peak,
rotation_space=rotations,
rotation_mapping=rotation_mapping,
rotation_matrix=rotation_matrix,
)
masking_function(
scores=scores_copy,
rotation_matrix=current_rotation_matrix,
peak=peak,
mask=mask,
rotated_template=rotated_template,
)
if len(coordinates) >= peak_limit:
break
peaks = be.to_backend_array(coordinates)
return peaks, None
@staticmethod
def _get_rotation_matrix(
peak: BackendArray,
rotation_space: BackendArray,
rotation_mapping: BackendArray,
rotation_matrix: BackendArray,
) -> BackendArray:
"""
Get rotation matrix based on peak and rotation data.
Parameters
----------
peak : BackendArray
Peak coordinates.
rotation_space : BackendArray
Rotation space array.
rotation_mapping : Dict
Dictionary mapping values in rotation_space to Euler angles.
rotation_matrix : BackendArray
Current rotation matrix.
Returns
-------
BackendArray
Rotation matrix.
"""
if rotation_space is None or rotation_mapping is None:
return rotation_matrix
rotation = rotation_mapping[rotation_space[tuple(peak)]]
# TODO: Newer versions of rotation mapping contain rotation matrices not angles
if rotation.ndim != 2:
rotation = be.to_backend_array(
euler_to_rotationmatrix(be.to_numpy_array(rotation))
)
return rotation
@staticmethod
def _mask_scores_box(
scores: BackendArray, peak: BackendArray, mask: BackendArray, **kwargs: Dict
) -> None:
"""
Mask scores in a box around a peak.
Parameters
----------
scores : BackendArray
Data array of scores.
peak : BackendArray
Peak coordinates.
mask : BackendArray
Mask array.
"""
start = be.maximum(be.subtract(peak, mask.shape), 0)
stop = be.minimum(be.add(peak, mask.shape), scores.shape)
start, stop = be.astype(start, int), be.astype(stop, int)
coords = tuple(slice(*pos) for pos in zip(start, stop))
scores[coords] = 0
return None
@staticmethod
def _mask_scores_rotate(
scores: BackendArray,
peak: BackendArray,
mask: BackendArray,
rotated_template: BackendArray,
rotation_matrix: BackendArray,
**kwargs: Dict,
) -> None:
"""
Mask scores using mask rotation around a peak.
Parameters
----------
scores : BackendArray
Data array of scores.
peak : BackendArray
Peak coordinates.
mask : BackendArray
Mask array.
rotated_template : BackendArray
Empty array to write mask rotations to.
rotation_matrix : BackendArray
Rotation matrix.
"""
left_pad = be.divide(mask.shape, 2).astype(int)
right_pad = be.add(left_pad, be.mod(mask.shape, 2).astype(int))
score_start = be.subtract(peak, left_pad)
score_stop = be.add(peak, right_pad)
template_start = be.subtract(be.maximum(score_start, 0), score_start)
template_stop = be.subtract(score_stop, be.minimum(score_stop, scores.shape))
template_stop = be.subtract(mask.shape, template_stop)
score_start = be.maximum(score_start, 0)
score_stop = be.minimum(score_stop, scores.shape)
score_start = be.astype(score_start, int)
score_stop = be.astype(score_stop, int)
template_start = be.astype(template_start, int)
template_stop = be.astype(template_stop, int)
coords_score = tuple(slice(*pos) for pos in zip(score_start, score_stop))
coords_template = tuple(
slice(*pos) for pos in zip(template_start, template_stop)
)
rotated_template.fill(0)
be.rigid_transform(
arr=mask, rotation_matrix=rotation_matrix, order=1, out=rotated_template
)
scores[coords_score] = be.multiply(
scores[coords_score], (rotated_template[coords_template] <= 0.1)
)
return None
[docs]
class PeakCallerScipy(PeakCaller):
"""
Peak calling using :obj:`skimage.feature.peak_local_max` to compute local maxima.
"""
[docs]
def call_peaks(
self, scores: BackendArray, min_score: float = None, **kwargs
) -> PeakType:
scores = be.to_numpy_array(scores)
num_peaks = self.num_peaks
if min_score is not None:
num_peaks = np.inf
non_squeezable_dims = tuple(i for i, x in enumerate(scores.shape) if x != 1)
peaks = peak_local_max(
np.squeeze(scores),
num_peaks=num_peaks,
min_distance=self.min_distance,
threshold_abs=min_score,
)
peaks_full = np.zeros((peaks.shape[0], scores.ndim), peaks.dtype)
peaks_full[..., non_squeezable_dims] = peaks[:]
peaks = be.to_backend_array(peaks_full)
return peaks, None
class PeakClustering(PeakCallerSort):
"""
Use DBScan clustering to identify more reliable peaks.
"""
def __init__(
self,
num_peaks: int = 1000,
**kwargs,
):
kwargs["min_distance"] = 0
super().__init__(num_peaks=num_peaks, **kwargs)
@classmethod
def merge(cls, **kwargs) -> NDArray:
"""
Merge multiple instances of Analyzer.
Parameters
----------
**kwargs
Optional keyword arguments passed to :py:meth:`PeakCaller.merge`.
Returns
-------
NDArray
NDArray of candidates.
"""
from sklearn.cluster import DBSCAN
from ..extensions import max_index_by_label
peaks, rotations, scores, details = super().merge(**kwargs)
scores = np.array([candidate[2] for candidate in peaks])
clusters = DBSCAN(eps=np.finfo(float).eps, min_samples=8).fit(peaks)
labels = clusters.labels_.astype(int)
label_max = max_index_by_label(labels=labels, scores=scores)
if -1 in label_max:
_ = label_max.pop(-1)
representatives = set(label_max.values())
keep = np.array(
[
True if index in representatives else False
for index in range(peaks.shape[0])
]
)
peaks = peaks[keep,]
rotations = rotations[keep,]
scores = scores[keep]
details = details[keep]
return peaks, rotations, scores, details