nanopyx.data.download
1import os 2import shutil 3import tempfile 4import zipfile 5from urllib.request import ProxyHandler, build_opener, install_opener 6 7import numpy as np 8import yaml 9from gdown import download as gdrive_download 10from onedrivedownloader import download as onedrive_download 11from ..core.io.downloader import download 12 13from ..core.io.zip_image_loader import ZipTiffIterator 14from .examples import get_path as get_examples_path 15 16 17class ExampleDataManager: 18 _base_path = get_examples_path() 19 _temp_dir = os.path.join(tempfile.gettempdir(), "nanopyx_data") 20 _to_download_path = None 21 22 def __init__(self, to_download_path: str = None): 23 """ 24 Helper class for downloading example test data 25 26 :param to_download_path: path to download the data to. If to_download_path is None, a temporary directory 27 will be created. Note that it will not be automatically deleted. 28 :type to_download_path: str, optional 29 :raises ValueError: If to_download_path is not None and does not exist 30 31 To clear downloads use self._clear_download() 32 """ 33 34 # Set download path 35 if to_download_path is None: 36 self._to_download_path = self._temp_dir 37 else: 38 self._to_download_path = to_download_path 39 40 # Lets check on how many examples we have available 41 self._datasets = [] 42 for path in os.listdir(self._base_path): 43 full_path = os.path.join(self._base_path, path) 44 info_file_path = os.path.join(full_path, "info.yaml") 45 if os.path.isdir(full_path) and os.path.exists(info_file_path): 46 info_data = None 47 with open(os.path.join(info_file_path), "r") as f: 48 # Load the YAML contents 49 info_data = yaml.load(f, Loader=yaml.FullLoader) 50 51 info = { 52 "info_path": info_file_path, 53 "thumbnail_path": os.path.join(self._base_path, path, "thumbnail.jpg"), 54 "tiff_sequence_path": None, 55 } 56 tiff_sequence_path = os.path.join(self._to_download_path, path, "tiff_sequence.zip") 57 if os.path.exists(tiff_sequence_path): 58 info["tiff_sequence_path"] = tiff_sequence_path 59 60 for key in info_data: 61 info[key] = info_data[key] 62 63 info["shape"] = tuple([int(v) for v in info["data_shape"].split(",")]) 64 65 info["dtype"] = np.dtype(info["data_dtype"]) 66 67 self._datasets.append(info) 68 69 # Fix agent 70 proxy = ProxyHandler({}) 71 opener = build_opener(proxy) 72 opener.addheaders = [ 73 ( 74 "User-Agent", 75 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30" 76 + " (KHTML, like Gecko) Version/10.1 Safari/603.1.30", 77 ) 78 ] 79 install_opener(opener) 80 81 def list_datasets(self) -> tuple: 82 """ 83 :return: list of dataset labels 84 """ 85 return sorted([dataset["label"] for dataset in self._datasets]) 86 87 def list_datasets_nickname(self) -> tuple: 88 """ 89 :return: list of dataset nicknames 90 """ 91 return [(dataset["nickname"], dataset["label"]) for dataset in self._datasets] 92 93 def get_dataset_info(self, dataset_name: str) -> dict: 94 """ 95 :param dataset_name: can be a dataset label or nickname 96 :type dataset_name: str 97 :return: dictionary with information about the dataset 98 """ 99 100 for dataset in self._datasets: 101 if dataset_name in (dataset["label"], dataset["nickname"]): 102 return dataset 103 raise ValueError(f"{dataset_name} not found in example datasets") 104 105 def _download(self, url, file_path, download_type=None, unzip=False): 106 if os.path.exists(file_path): # or os.path.exists(os.path.splitext(file_path)[0]): 107 # raise Warning(f"already exists, no need to download: {file_path}") 108 return 109 110 if not os.path.exists(self._temp_dir): 111 os.mkdir(self._temp_dir) 112 113 base_path = os.path.split(file_path)[0] 114 if not os.path.exists(base_path): 115 os.mkdir(base_path) 116 117 if download_type == "onedrive": 118 onedrive_download(url, file_path, unzip=unzip, clean=True) 119 elif download_type == "gdrive": 120 gdrive_download(url, file_path, fuzzy=True, quiet=False) 121 else: 122 download(url, file_path) 123 124 def _copy_auxiliary_files(self, info: dict): 125 if not os.path.exists(self._to_download_path): 126 os.mkdir(self._to_download_path) 127 128 path = os.path.join(self._to_download_path, info["label"]) 129 if not os.path.exists(path): 130 os.mkdir(path) 131 132 # thumbnail_path = os.path.join(path, "thumbnail.jpg") 133 # if not os.path.exists(thumbnail_path): 134 # shutil.copyfile(info["thumbnail_path"], thumbnail_path) 135 136 info_path = os.path.join(path, "info.yaml") 137 if not os.path.exists(info_path): 138 shutil.copyfile(info["info_path"], info_path) 139 140 def download_tiff_sequence(self, dataset_name: str) -> str: 141 """ 142 Downloads the tiff sequence and returns the path to the zip file 143 144 :param dataset_name: can be a dataset label or nickname 145 :type dataset_name: str 146 :return: path to the zip file 147 """ 148 info = self.get_dataset_info(dataset_name) 149 path = os.path.join(self._to_download_path, info["label"]) 150 151 file_path = os.path.join(path, "tiff_sequence.zip") 152 url = info["tiff_sequence_url"] 153 download_type = info["tiff_sequence_url_type"] 154 155 self._copy_auxiliary_files(info) 156 self._download(url, file_path, download_type) 157 info["tiff_sequence_path"] = file_path 158 159 return file_path 160 161 def is_downloaded(self, dataset_name: str) -> bool: 162 """ 163 :param dataset_name: can be a dataset label or nickname 164 :type dataset_name: str 165 :return: True if the dataset is downloaded 166 """ 167 info = self.get_dataset_info(dataset_name) 168 return info["tiff_sequence_path"] is not None 169 170 def get_ZipTiffIterator(self, dataset_name: str, as_ndarray: bool = False) -> ZipTiffIterator: 171 """ 172 Downloads the tiff sequence and returns the ZipTiffIterator 173 174 :param dataset_name: can be a dataset label or nickname 175 :type dataset_name: str 176 :param as_ndarray: if True, returns a numpy array instead of a ZipTiffIterator 177 :type as_ndarray: bool 178 :return: ZipTiffIterator or numpy array 179 """ 180 self._show_citation_notice(dataset_name) 181 file_path = self.download_tiff_sequence(dataset_name) 182 try: 183 zti = ZipTiffIterator(file_path) 184 except zipfile.BadZipFile: 185 self.clear_downloads() 186 # try once more 187 file_path = self.download_tiff_sequence(dataset_name) 188 zti = ZipTiffIterator(file_path) 189 if not as_ndarray: 190 return zti 191 else: 192 arr = np.asarray(zti) 193 zti.close() 194 return arr 195 196 def get_thumbnail(self, dataset_name: str) -> str: 197 """ 198 Returns the path to the thumbnail 199 200 :param dataset_name: can be a dataset label or nickname 201 :type dataset_name: str 202 :return: path to the thumbnail 203 """ 204 info = self.get_dataset_info(dataset_name) 205 return info["thumbnail_path"] 206 207 def clear_downloads(self): 208 """ 209 Deletes all downloaded datasets 210 """ 211 if os.path.exists(self._temp_dir): 212 shutil.rmtree(self._temp_dir) 213 214 def _show_citation_notice(self, dataset_name: str): 215 info = self.get_dataset_info(dataset_name) 216 if info["reference"] not in [None, ""]: 217 print( 218 f"If you find the '{dataset_name}' dataset useful, please cite: " 219 + f"{info['reference']} - {info['reference_doi']}" 220 )
class
ExampleDataManager:
18class ExampleDataManager: 19 _base_path = get_examples_path() 20 _temp_dir = os.path.join(tempfile.gettempdir(), "nanopyx_data") 21 _to_download_path = None 22 23 def __init__(self, to_download_path: str = None): 24 """ 25 Helper class for downloading example test data 26 27 :param to_download_path: path to download the data to. If to_download_path is None, a temporary directory 28 will be created. Note that it will not be automatically deleted. 29 :type to_download_path: str, optional 30 :raises ValueError: If to_download_path is not None and does not exist 31 32 To clear downloads use self._clear_download() 33 """ 34 35 # Set download path 36 if to_download_path is None: 37 self._to_download_path = self._temp_dir 38 else: 39 self._to_download_path = to_download_path 40 41 # Lets check on how many examples we have available 42 self._datasets = [] 43 for path in os.listdir(self._base_path): 44 full_path = os.path.join(self._base_path, path) 45 info_file_path = os.path.join(full_path, "info.yaml") 46 if os.path.isdir(full_path) and os.path.exists(info_file_path): 47 info_data = None 48 with open(os.path.join(info_file_path), "r") as f: 49 # Load the YAML contents 50 info_data = yaml.load(f, Loader=yaml.FullLoader) 51 52 info = { 53 "info_path": info_file_path, 54 "thumbnail_path": os.path.join(self._base_path, path, "thumbnail.jpg"), 55 "tiff_sequence_path": None, 56 } 57 tiff_sequence_path = os.path.join(self._to_download_path, path, "tiff_sequence.zip") 58 if os.path.exists(tiff_sequence_path): 59 info["tiff_sequence_path"] = tiff_sequence_path 60 61 for key in info_data: 62 info[key] = info_data[key] 63 64 info["shape"] = tuple([int(v) for v in info["data_shape"].split(",")]) 65 66 info["dtype"] = np.dtype(info["data_dtype"]) 67 68 self._datasets.append(info) 69 70 # Fix agent 71 proxy = ProxyHandler({}) 72 opener = build_opener(proxy) 73 opener.addheaders = [ 74 ( 75 "User-Agent", 76 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30" 77 + " (KHTML, like Gecko) Version/10.1 Safari/603.1.30", 78 ) 79 ] 80 install_opener(opener) 81 82 def list_datasets(self) -> tuple: 83 """ 84 :return: list of dataset labels 85 """ 86 return sorted([dataset["label"] for dataset in self._datasets]) 87 88 def list_datasets_nickname(self) -> tuple: 89 """ 90 :return: list of dataset nicknames 91 """ 92 return [(dataset["nickname"], dataset["label"]) for dataset in self._datasets] 93 94 def get_dataset_info(self, dataset_name: str) -> dict: 95 """ 96 :param dataset_name: can be a dataset label or nickname 97 :type dataset_name: str 98 :return: dictionary with information about the dataset 99 """ 100 101 for dataset in self._datasets: 102 if dataset_name in (dataset["label"], dataset["nickname"]): 103 return dataset 104 raise ValueError(f"{dataset_name} not found in example datasets") 105 106 def _download(self, url, file_path, download_type=None, unzip=False): 107 if os.path.exists(file_path): # or os.path.exists(os.path.splitext(file_path)[0]): 108 # raise Warning(f"already exists, no need to download: {file_path}") 109 return 110 111 if not os.path.exists(self._temp_dir): 112 os.mkdir(self._temp_dir) 113 114 base_path = os.path.split(file_path)[0] 115 if not os.path.exists(base_path): 116 os.mkdir(base_path) 117 118 if download_type == "onedrive": 119 onedrive_download(url, file_path, unzip=unzip, clean=True) 120 elif download_type == "gdrive": 121 gdrive_download(url, file_path, fuzzy=True, quiet=False) 122 else: 123 download(url, file_path) 124 125 def _copy_auxiliary_files(self, info: dict): 126 if not os.path.exists(self._to_download_path): 127 os.mkdir(self._to_download_path) 128 129 path = os.path.join(self._to_download_path, info["label"]) 130 if not os.path.exists(path): 131 os.mkdir(path) 132 133 # thumbnail_path = os.path.join(path, "thumbnail.jpg") 134 # if not os.path.exists(thumbnail_path): 135 # shutil.copyfile(info["thumbnail_path"], thumbnail_path) 136 137 info_path = os.path.join(path, "info.yaml") 138 if not os.path.exists(info_path): 139 shutil.copyfile(info["info_path"], info_path) 140 141 def download_tiff_sequence(self, dataset_name: str) -> str: 142 """ 143 Downloads the tiff sequence and returns the path to the zip file 144 145 :param dataset_name: can be a dataset label or nickname 146 :type dataset_name: str 147 :return: path to the zip file 148 """ 149 info = self.get_dataset_info(dataset_name) 150 path = os.path.join(self._to_download_path, info["label"]) 151 152 file_path = os.path.join(path, "tiff_sequence.zip") 153 url = info["tiff_sequence_url"] 154 download_type = info["tiff_sequence_url_type"] 155 156 self._copy_auxiliary_files(info) 157 self._download(url, file_path, download_type) 158 info["tiff_sequence_path"] = file_path 159 160 return file_path 161 162 def is_downloaded(self, dataset_name: str) -> bool: 163 """ 164 :param dataset_name: can be a dataset label or nickname 165 :type dataset_name: str 166 :return: True if the dataset is downloaded 167 """ 168 info = self.get_dataset_info(dataset_name) 169 return info["tiff_sequence_path"] is not None 170 171 def get_ZipTiffIterator(self, dataset_name: str, as_ndarray: bool = False) -> ZipTiffIterator: 172 """ 173 Downloads the tiff sequence and returns the ZipTiffIterator 174 175 :param dataset_name: can be a dataset label or nickname 176 :type dataset_name: str 177 :param as_ndarray: if True, returns a numpy array instead of a ZipTiffIterator 178 :type as_ndarray: bool 179 :return: ZipTiffIterator or numpy array 180 """ 181 self._show_citation_notice(dataset_name) 182 file_path = self.download_tiff_sequence(dataset_name) 183 try: 184 zti = ZipTiffIterator(file_path) 185 except zipfile.BadZipFile: 186 self.clear_downloads() 187 # try once more 188 file_path = self.download_tiff_sequence(dataset_name) 189 zti = ZipTiffIterator(file_path) 190 if not as_ndarray: 191 return zti 192 else: 193 arr = np.asarray(zti) 194 zti.close() 195 return arr 196 197 def get_thumbnail(self, dataset_name: str) -> str: 198 """ 199 Returns the path to the thumbnail 200 201 :param dataset_name: can be a dataset label or nickname 202 :type dataset_name: str 203 :return: path to the thumbnail 204 """ 205 info = self.get_dataset_info(dataset_name) 206 return info["thumbnail_path"] 207 208 def clear_downloads(self): 209 """ 210 Deletes all downloaded datasets 211 """ 212 if os.path.exists(self._temp_dir): 213 shutil.rmtree(self._temp_dir) 214 215 def _show_citation_notice(self, dataset_name: str): 216 info = self.get_dataset_info(dataset_name) 217 if info["reference"] not in [None, ""]: 218 print( 219 f"If you find the '{dataset_name}' dataset useful, please cite: " 220 + f"{info['reference']} - {info['reference_doi']}" 221 )
ExampleDataManager(to_download_path: str = None)
23 def __init__(self, to_download_path: str = None): 24 """ 25 Helper class for downloading example test data 26 27 :param to_download_path: path to download the data to. If to_download_path is None, a temporary directory 28 will be created. Note that it will not be automatically deleted. 29 :type to_download_path: str, optional 30 :raises ValueError: If to_download_path is not None and does not exist 31 32 To clear downloads use self._clear_download() 33 """ 34 35 # Set download path 36 if to_download_path is None: 37 self._to_download_path = self._temp_dir 38 else: 39 self._to_download_path = to_download_path 40 41 # Lets check on how many examples we have available 42 self._datasets = [] 43 for path in os.listdir(self._base_path): 44 full_path = os.path.join(self._base_path, path) 45 info_file_path = os.path.join(full_path, "info.yaml") 46 if os.path.isdir(full_path) and os.path.exists(info_file_path): 47 info_data = None 48 with open(os.path.join(info_file_path), "r") as f: 49 # Load the YAML contents 50 info_data = yaml.load(f, Loader=yaml.FullLoader) 51 52 info = { 53 "info_path": info_file_path, 54 "thumbnail_path": os.path.join(self._base_path, path, "thumbnail.jpg"), 55 "tiff_sequence_path": None, 56 } 57 tiff_sequence_path = os.path.join(self._to_download_path, path, "tiff_sequence.zip") 58 if os.path.exists(tiff_sequence_path): 59 info["tiff_sequence_path"] = tiff_sequence_path 60 61 for key in info_data: 62 info[key] = info_data[key] 63 64 info["shape"] = tuple([int(v) for v in info["data_shape"].split(",")]) 65 66 info["dtype"] = np.dtype(info["data_dtype"]) 67 68 self._datasets.append(info) 69 70 # Fix agent 71 proxy = ProxyHandler({}) 72 opener = build_opener(proxy) 73 opener.addheaders = [ 74 ( 75 "User-Agent", 76 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30" 77 + " (KHTML, like Gecko) Version/10.1 Safari/603.1.30", 78 ) 79 ] 80 install_opener(opener)
Helper class for downloading example test data
Parameters
- to_download_path: path to download the data to. If to_download_path is None, a temporary directory will be created. Note that it will not be automatically deleted.
Raises
- ValueError: If to_download_path is not None and does not exist
To clear downloads use self._clear_download()
def
list_datasets(self) -> tuple:
82 def list_datasets(self) -> tuple: 83 """ 84 :return: list of dataset labels 85 """ 86 return sorted([dataset["label"] for dataset in self._datasets])
Returns
list of dataset labels
def
list_datasets_nickname(self) -> tuple:
88 def list_datasets_nickname(self) -> tuple: 89 """ 90 :return: list of dataset nicknames 91 """ 92 return [(dataset["nickname"], dataset["label"]) for dataset in self._datasets]
Returns
list of dataset nicknames
def
get_dataset_info(self, dataset_name: str) -> dict:
94 def get_dataset_info(self, dataset_name: str) -> dict: 95 """ 96 :param dataset_name: can be a dataset label or nickname 97 :type dataset_name: str 98 :return: dictionary with information about the dataset 99 """ 100 101 for dataset in self._datasets: 102 if dataset_name in (dataset["label"], dataset["nickname"]): 103 return dataset 104 raise ValueError(f"{dataset_name} not found in example datasets")
Parameters
- dataset_name: can be a dataset label or nickname
Returns
dictionary with information about the dataset
def
download_tiff_sequence(self, dataset_name: str) -> str:
141 def download_tiff_sequence(self, dataset_name: str) -> str: 142 """ 143 Downloads the tiff sequence and returns the path to the zip file 144 145 :param dataset_name: can be a dataset label or nickname 146 :type dataset_name: str 147 :return: path to the zip file 148 """ 149 info = self.get_dataset_info(dataset_name) 150 path = os.path.join(self._to_download_path, info["label"]) 151 152 file_path = os.path.join(path, "tiff_sequence.zip") 153 url = info["tiff_sequence_url"] 154 download_type = info["tiff_sequence_url_type"] 155 156 self._copy_auxiliary_files(info) 157 self._download(url, file_path, download_type) 158 info["tiff_sequence_path"] = file_path 159 160 return file_path
Downloads the tiff sequence and returns the path to the zip file
Parameters
- dataset_name: can be a dataset label or nickname
Returns
path to the zip file
def
is_downloaded(self, dataset_name: str) -> bool:
162 def is_downloaded(self, dataset_name: str) -> bool: 163 """ 164 :param dataset_name: can be a dataset label or nickname 165 :type dataset_name: str 166 :return: True if the dataset is downloaded 167 """ 168 info = self.get_dataset_info(dataset_name) 169 return info["tiff_sequence_path"] is not None
Parameters
- dataset_name: can be a dataset label or nickname
Returns
True if the dataset is downloaded
def
get_ZipTiffIterator( self, dataset_name: str, as_ndarray: bool = False) -> nanopyx.core.io.zip_image_loader.ZipTiffIterator:
171 def get_ZipTiffIterator(self, dataset_name: str, as_ndarray: bool = False) -> ZipTiffIterator: 172 """ 173 Downloads the tiff sequence and returns the ZipTiffIterator 174 175 :param dataset_name: can be a dataset label or nickname 176 :type dataset_name: str 177 :param as_ndarray: if True, returns a numpy array instead of a ZipTiffIterator 178 :type as_ndarray: bool 179 :return: ZipTiffIterator or numpy array 180 """ 181 self._show_citation_notice(dataset_name) 182 file_path = self.download_tiff_sequence(dataset_name) 183 try: 184 zti = ZipTiffIterator(file_path) 185 except zipfile.BadZipFile: 186 self.clear_downloads() 187 # try once more 188 file_path = self.download_tiff_sequence(dataset_name) 189 zti = ZipTiffIterator(file_path) 190 if not as_ndarray: 191 return zti 192 else: 193 arr = np.asarray(zti) 194 zti.close() 195 return arr
Downloads the tiff sequence and returns the ZipTiffIterator
Parameters
- dataset_name: can be a dataset label or nickname
- as_ndarray: if True, returns a numpy array instead of a ZipTiffIterator
Returns
ZipTiffIterator or numpy array
def
get_thumbnail(self, dataset_name: str) -> str:
197 def get_thumbnail(self, dataset_name: str) -> str: 198 """ 199 Returns the path to the thumbnail 200 201 :param dataset_name: can be a dataset label or nickname 202 :type dataset_name: str 203 :return: path to the thumbnail 204 """ 205 info = self.get_dataset_info(dataset_name) 206 return info["thumbnail_path"]
Returns the path to the thumbnail
Parameters
- dataset_name: can be a dataset label or nickname
Returns
path to the thumbnail