Source code for tme.backends.cupy_backend

""" Backend using cupy for template matching.

    Copyright (c) 2023 European Molecular Biology Laboratory

    Author: Valentin Maurer <valentin.maurer@embl-hamburg.de>
"""
import warnings
from importlib.util import find_spec
from contextlib import contextmanager
from typing import Tuple, Callable, List

import numpy as np

from .npfftw_backend import NumpyFFTWBackend
from ..types import CupyArray, NDArray, shm_type

PLAN_CACHE = {}
TEXTURE_CACHE = {}


[docs] class CupyBackend(NumpyFFTWBackend): """ A cupy-based matching backend. """ def __init__( self, float_dtype: type = None, complex_dtype: type = None, int_dtype: type = None, overflow_safe_dtype: type = None, **kwargs, ): import cupy as cp from cupyx.scipy.fft import get_fft_plan from cupyx.scipy.ndimage import affine_transform from cupyx.scipy.ndimage import maximum_filter float_dtype = cp.float32 if float_dtype is None else float_dtype complex_dtype = cp.complex64 if complex_dtype is None else complex_dtype int_dtype = cp.int32 if int_dtype is None else int_dtype if overflow_safe_dtype is None: overflow_safe_dtype = cp.float32 super().__init__( array_backend=cp, float_dtype=float_dtype, complex_dtype=complex_dtype, int_dtype=int_dtype, overflow_safe_dtype=overflow_safe_dtype, ) self.get_fft_plan = get_fft_plan self.affine_transform = affine_transform self.maximum_filter = maximum_filter itype = f"int{self.datatype_bytes(int_dtype) * 8}" ftype = f"float{self.datatype_bytes(float_dtype) * 8}" self._max_score_over_rotations = self._array_backend.ElementwiseKernel( f"{ftype} internal_scores, {ftype} scores, {itype} rot_index", f"{ftype} out1, {itype} rotations", "if (internal_scores < scores) {out1 = scores; rotations = rot_index;}", "max_score_over_rotations", ) self.norm_scores = cp.ElementwiseKernel( f"{ftype} arr, {ftype} exp_sq, {ftype} sq_exp, {ftype} n_obs, {ftype} eps", f"{ftype} out", """ // tmp1 = E(X)^2; tmp2 = E(X^2) float tmp1 = sq_exp / n_obs; float tmp2 = exp_sq / n_obs; tmp1 *= tmp1; tmp2 = sqrt(max(tmp2 - tmp1, 0.0)); // out = (tmp2 < eps) ? 0.0 : arr / (tmp2 * n_obs); tmp1 = arr; if (tmp2 < eps){ tmp1 = 0; } tmp2 *= n_obs; out = tmp1 / tmp2; """, "norm_scores", ) self.texture_available = find_spec("voltools") is not None
[docs] def to_backend_array(self, arr: NDArray) -> CupyArray: current_device = self._array_backend.cuda.device.get_device_id() if ( isinstance(arr, self._array_backend.ndarray) and arr.device.id == current_device ): return arr return self._array_backend.asarray(arr)
[docs] def to_numpy_array(self, arr: CupyArray) -> NDArray: return self._array_backend.asnumpy(arr)
[docs] def to_cpu_array(self, arr: NDArray) -> NDArray: return self.to_numpy_array(arr)
[docs] def from_sharedarr(self, arr: CupyArray) -> CupyArray: return arr
[docs] @staticmethod def to_sharedarr(arr: CupyArray, shared_memory_handler: type = None) -> shm_type: return arr
[docs] def zeros(self, *args, **kwargs): return self._array_backend.zeros(*args, **kwargs)
[docs] def unravel_index(self, indices, shape): return self._array_backend.unravel_index(indices=indices, dims=shape)
[docs] def unique(self, ar, axis=None, *args, **kwargs): if axis is None: return self._array_backend.unique(ar=ar, axis=axis, *args, **kwargs) warnings.warn("Axis argument not yet supported in CupY, falling back to NumPy.") ret = np.unique(ar=self.to_numpy_array(ar), axis=axis, *args, **kwargs) if not isinstance(ret, tuple): return self.to_backend_array(ret) return tuple(self.to_backend_array(k) for k in ret)
[docs] def build_fft( self, fast_shape: Tuple[int], fast_ft_shape: Tuple[int], real_dtype: type, complex_dtype: type, inverse_fast_shape: Tuple[int] = None, **kwargs, ) -> Tuple[Callable, Callable]: import cupyx.scipy.fft as cufft cache = self._array_backend.fft.config.get_plan_cache() current_device = self._array_backend.cuda.device.get_device_id() previous_transform = [fast_shape, fast_ft_shape] if current_device in PLAN_CACHE: previous_transform = PLAN_CACHE[current_device] real_diff, cmplx_diff = True, True if len(fast_shape) == len(previous_transform[0]): real_diff = fast_shape == previous_transform[0] if len(fast_ft_shape) == len(previous_transform[1]): cmplx_diff = fast_ft_shape == previous_transform[1] if real_diff or cmplx_diff: cache.clear() def rfftn(arr: CupyArray, out: CupyArray) -> CupyArray: return cufft.rfftn(arr, s=fast_shape) def irfftn(arr: CupyArray, out: CupyArray) -> CupyArray: return cufft.irfftn(arr, s=fast_shape) PLAN_CACHE[current_device] = [fast_shape, fast_ft_shape] return rfftn, irfftn
[docs] def compute_convolution_shapes( self, arr1_shape: Tuple[int], arr2_shape: Tuple[int] ) -> Tuple[List[int], List[int], List[int]]: from cupyx.scipy.fft import next_fast_len convolution_shape = [int(x + y - 1) for x, y in zip(arr1_shape, arr2_shape)] fast_shape = [next_fast_len(x, real=True) for x in convolution_shape] fast_ft_shape = list(fast_shape[:-1]) + [fast_shape[-1] // 2 + 1] return convolution_shape, fast_shape, fast_ft_shape
[docs] def max_filter_coordinates(self, score_space, min_distance: Tuple[int]): score_box = tuple(min_distance for _ in range(score_space.ndim)) max_filter = self.maximum_filter(score_space, size=score_box, mode="constant") max_filter = max_filter == score_space peaks = self._array_backend.array(self._array_backend.nonzero(max_filter)).T return peaks
# The default methods in Cupy were oddly slow
[docs] def var(self, a, *args, **kwargs): out = a - self._array_backend.mean(a, *args, **kwargs) self._array_backend.square(out, out) out = self._array_backend.mean(out, *args, **kwargs) return out
[docs] def std(self, a, *args, **kwargs): out = self.var(a, *args, **kwargs) return self._array_backend.sqrt(out)
def _get_texture(self, arr: CupyArray, order: int = 3, prefilter: bool = False): key = id(arr) if key in TEXTURE_CACHE: return TEXTURE_CACHE[key] from voltools import StaticVolume # Only keep template and potential corresponding mask in cache if len(TEXTURE_CACHE) >= 2: TEXTURE_CACHE.clear() interpolation = "filt_bspline" if order == 1: interpolation = "linear" elif order == 3 and not prefilter: interpolation = "bspline" current_device = self._array_backend.cuda.device.get_device_id() TEXTURE_CACHE[key] = StaticVolume( arr, interpolation=interpolation, device=f"gpu:{current_device}" ) return TEXTURE_CACHE[key] def _rigid_transform( self, data: CupyArray, matrix: CupyArray, output: CupyArray, prefilter: bool, order: int, cache: bool = False, ) -> None: out_slice = tuple(slice(0, stop) for stop in data.shape) if data.ndim == 3 and cache and self.texture_available: # Device memory pool (should) come to rescue performance temp = self.empty(data.shape, data.dtype) texture = self._get_texture(data, order=order, prefilter=prefilter) texture.affine(transform_m=matrix, profile=False, output=temp) output[out_slice] = temp return None self.affine_transform( input=data, matrix=matrix, mode="constant", output=output[out_slice], order=order, prefilter=prefilter, )
[docs] def get_available_memory(self) -> int: with self._array_backend.cuda.Device(): free_memory, _ = self._array_backend.cuda.runtime.memGetInfo() return free_memory
[docs] @contextmanager def set_device(self, device_index: int): with self._array_backend.cuda.Device(device_index): yield
[docs] def device_count(self) -> int: return self._array_backend.cuda.runtime.getDeviceCount()
[docs] def max_score_over_rotations( self, scores: CupyArray, max_scores: CupyArray, rotations: CupyArray, rotation_index: int, ) -> Tuple[CupyArray, CupyArray]: return self._max_score_over_rotations( max_scores, scores, rotation_index, max_scores, rotations, )