nanopyx.core.transform.convolution

  1import warnings
  2import numpy as np
  3
  4
  5try:
  6    from numba import njit
  7except ImportError:
  8    print("Optional dependency Numba is not installed. Numba implementations will be ignored.")
  9
 10    def njit(*args, **kwargs):
 11        def wrapper(func):
 12            warnings.warn(f"Numba is not installed. Using pure python for {func.__name__}")
 13            return func
 14
 15        return wrapper
 16
 17try:
 18    import cupy as cp
 19    from cupyx.scipy.signal import convolve2d as cupyx_convolve
 20except ImportError:
 21    print("Cupy implementation is not available. Make sure you have the right version of Cupy and CUDA installed.")
 22
 23try:
 24    import dask.array as da
 25
 26except ImportError:
 27    print("Optional dependency Dask is not installed. Dask implementations will be ignored.")
 28
 29try:
 30    from dask_image.ndfilters import convolve as dask_convolve
 31except ImportError:
 32    print("Optional dependecy Dask_image is not installed. Implementations using it will be ignored.")
 33
 34
 35# def check_array(image: np.ndarray):
 36#     """
 37#     Check the given image and ensure it meets the required conditions.
 38
 39#     Parameters:
 40#         image (numpy.ndarray): The image to be checked.
 41
 42#     Returns:
 43#         numpy.ndarray: The checked and potentially modified image.
 44
 45#     Raises:
 46#         TypeError: If the image is not of type numpy.ndarray.
 47#         ValueError: If the image is not 2D or 3D.
 48#     """
 49#     image = np.asarray(image)
 50#     if type(image) is not np.ndarray:
 51#         raise TypeError("Image must be of type np.ndarray")
 52#     if image.ndim != 3:
 53#         raise ValueError("Image must be 2D")
 54#     if image.dtype != np.float32:
 55#         image = image.astype(np.float32, copy=False)
 56#     return image
 57
 58
 59def convolution2D_python(image: np.ndarray, kernel: np.ndarray):
 60    nFrames = image.shape[0]
 61    nRows = image.shape[1]
 62    nCols = image.shape[2]
 63
 64    nRows_kernel = kernel.shape[0]
 65    nCols_kernel = kernel.shape[1]
 66
 67    center_r = (nRows_kernel-1) // 2
 68    center_c = (nCols_kernel-1) // 2
 69
 70    acc = 0.0
 71
 72    conv_out = np.zeros((nFrames, nRows, nCols), dtype=np.float32)
 73    
 74    for f in range(nFrames):
 75        for r in range(nRows):
 76            for c in range(nCols):
 77                acc = 0
 78                for kr in range(nRows_kernel):
 79                    for kc in range(nCols_kernel):
 80                        local_row = min(max(r+(kr-center_r), 0), nRows-1)
 81                        local_col = min(max(c+(kc-center_c), 0), nCols-1)
 82                        acc = acc + kernel[kr, kc] * image[f,local_row, local_col]
 83                conv_out[f,r, c] = acc
 84
 85    return conv_out
 86
 87
 88def convolution2D_transonic(image, kernel):
 89    try:
 90        import transonic
 91        from ._transonic import convolution2D
 92        return convolution2D(image, kernel)
 93    except ModuleNotFoundError:
 94        print("Transonic is not installed, defaulting to Python")
 95        return convolution2D_python(image, kernel)
 96    except ImportError:
 97        print("Transonic is not installed, defaulting to Python")
 98        return convolution2D_python(image, kernel)
 99
100
101@njit(cache=True, parallel=True)
102def convolution2D_numba(image, kernel):
103    nFrames = image.shape[0]
104    nRows = image.shape[1]
105    nCols = image.shape[2]
106
107    nRows_kernel = kernel.shape[0]
108    nCols_kernel = kernel.shape[1]
109
110    center_r = (nRows_kernel-1) // 2
111    center_c = (nCols_kernel-1) // 2
112
113    acc = 0.0
114
115    conv_out = np.zeros((nFrames, nRows, nCols), dtype=np.float32)
116
117    for f in range(nFrames):
118        for r in range(nRows):
119            for c in range(nCols):
120                acc = 0
121                for kr in range(nRows_kernel):
122                    for kc in range(nCols_kernel):
123                        local_row = min(max(r+(kr-center_r), 0), nRows-1)
124                        local_col = min(max(c+(kc-center_c), 0), nCols-1)
125                        acc = acc + kernel[kr, kc] * image[f,local_row, local_col]
126                conv_out[f,r, c] = acc
127
128    return conv_out
129
130
131def convolution2D_dask(image, kernel):
132
133    conv_out = np.zeros_like(image)
134    for i in range(image.shape[0]):
135        conv_out[i] = dask_convolve(da.from_array(image[i]), da.from_array(kernel))
136    return conv_out
137
138
139def convolution2D_cuda(image, kernel):
140    with cp.cuda.Device(0):
141        output = cp.asnumpy(cupyx_convolve(cp.asarray(image), cp.asarray(kernel), mode="same", boundary="symm"))
142    return output
def convolution2D_python(image: numpy.ndarray, kernel: numpy.ndarray):
60def convolution2D_python(image: np.ndarray, kernel: np.ndarray):
61    nFrames = image.shape[0]
62    nRows = image.shape[1]
63    nCols = image.shape[2]
64
65    nRows_kernel = kernel.shape[0]
66    nCols_kernel = kernel.shape[1]
67
68    center_r = (nRows_kernel-1) // 2
69    center_c = (nCols_kernel-1) // 2
70
71    acc = 0.0
72
73    conv_out = np.zeros((nFrames, nRows, nCols), dtype=np.float32)
74    
75    for f in range(nFrames):
76        for r in range(nRows):
77            for c in range(nCols):
78                acc = 0
79                for kr in range(nRows_kernel):
80                    for kc in range(nCols_kernel):
81                        local_row = min(max(r+(kr-center_r), 0), nRows-1)
82                        local_col = min(max(c+(kc-center_c), 0), nCols-1)
83                        acc = acc + kernel[kr, kc] * image[f,local_row, local_col]
84                conv_out[f,r, c] = acc
85
86    return conv_out
def convolution2D_transonic(image, kernel):
89def convolution2D_transonic(image, kernel):
90    try:
91        import transonic
92        from ._transonic import convolution2D
93        return convolution2D(image, kernel)
94    except ModuleNotFoundError:
95        print("Transonic is not installed, defaulting to Python")
96        return convolution2D_python(image, kernel)
97    except ImportError:
98        print("Transonic is not installed, defaulting to Python")
99        return convolution2D_python(image, kernel)
@njit(cache=True, parallel=True)
def convolution2D_numba(image, kernel):
102@njit(cache=True, parallel=True)
103def convolution2D_numba(image, kernel):
104    nFrames = image.shape[0]
105    nRows = image.shape[1]
106    nCols = image.shape[2]
107
108    nRows_kernel = kernel.shape[0]
109    nCols_kernel = kernel.shape[1]
110
111    center_r = (nRows_kernel-1) // 2
112    center_c = (nCols_kernel-1) // 2
113
114    acc = 0.0
115
116    conv_out = np.zeros((nFrames, nRows, nCols), dtype=np.float32)
117
118    for f in range(nFrames):
119        for r in range(nRows):
120            for c in range(nCols):
121                acc = 0
122                for kr in range(nRows_kernel):
123                    for kc in range(nCols_kernel):
124                        local_row = min(max(r+(kr-center_r), 0), nRows-1)
125                        local_col = min(max(c+(kc-center_c), 0), nCols-1)
126                        acc = acc + kernel[kr, kc] * image[f,local_row, local_col]
127                conv_out[f,r, c] = acc
128
129    return conv_out
def convolution2D_dask(image, kernel):
132def convolution2D_dask(image, kernel):
133
134    conv_out = np.zeros_like(image)
135    for i in range(image.shape[0]):
136        conv_out[i] = dask_convolve(da.from_array(image[i]), da.from_array(kernel))
137    return conv_out
def convolution2D_cuda(image, kernel):
140def convolution2D_cuda(image, kernel):
141    with cp.cuda.Device(0):
142        output = cp.asnumpy(cupyx_convolve(cp.asarray(image), cp.asarray(kernel), mode="same", boundary="symm"))
143    return output