diff --git a/navis/io/base.py b/navis/io/base.py index 4dfd4edb..a8242c29 100644 --- a/navis/io/base.py +++ b/navis/io/base.py @@ -29,12 +29,14 @@ from typing import List, Union, Iterable, Dict, Optional, Any, IO from typing_extensions import Literal from zipfile import ZipFile, ZipInfo +from ftplib import FTP from .. import config, utils, core try: import zlib import zipfile + compression = zipfile.ZIP_DEFLATED except ImportError: compression = zipfile.ZIP_STORED @@ -77,7 +79,7 @@ class Writer: def __init__(self, write_func, ext): assert callable(write_func) if ext: - assert isinstance(ext, str) and ext.startswith('.') + assert isinstance(ext, str) and ext.startswith(".") self.write_func = write_func self.ext = ext @@ -87,7 +89,9 @@ def write_single(self, x, filepath, **kwargs): try: as_str = os.fspath(filepath) except TypeError: - raise ValueError(f'`filepath` must be str or pathlib.Path, got "{type(filepath)}"') + raise ValueError( + f'`filepath` must be str or pathlib.Path, got "{type(filepath)}"' + ) # Format filename (e.g. "{neuron.name}.swc") formatted_str = as_str.format(neuron=x) @@ -103,11 +107,11 @@ def write_single(self, x, filepath, **kwargs): # If not specified, generate filename if self.ext and not str(filepath).endswith(self.ext): - filepath = filepath / f'{x.id}{self.ext}' + filepath = filepath / f"{x.id}{self.ext}" # Make sure the parent directory exists if not filepath.parent.exists(): - raise ValueError(f'Parent folder {filepath.parent} must exist.') + raise ValueError(f"Parent folder {filepath.parent} must exist.") # Track the path we put this (and presumably all other files in) self.path = Path(filepath) @@ -126,39 +130,50 @@ def write_many(self, x, filepath, **kwargs): if not is_filename or is_single or is_formattable: filepath = [filepath] * len(x) else: - raise ValueError('`filepath` must either be a folder, a ' - 'formattable filepath or a list of filepaths' - 'when saving multiple neurons.') + raise ValueError( + "`filepath` must either be a folder, a " + "formattable filepath or a list of filepaths" + "when saving multiple neurons." + ) if len(filepath) != len(x): - raise ValueError(f'Got {len(filepath)} file names for ' - f'{len(x)} neurons.') + raise ValueError( + f"Got {len(filepath)} file names for " f"{len(x)} neurons." + ) # At this point filepath is iterable filepath: Iterable[str] - for n, f in config.tqdm(zip(x, filepath), disable=config.pbar_hide, - leave=config.pbar_leave, total=len(x), - desc='Writing'): + for n, f in config.tqdm( + zip(x, filepath), + disable=config.pbar_hide, + leave=config.pbar_leave, + total=len(x), + desc="Writing", + ): self.write_single(n, filepath=f, **kwargs) def write_zip(self, x, filepath, **kwargs): """Write files to zip.""" filepath = Path(filepath).expanduser() # Parse pattern, if given - pattern = '{neuron.id}' + (self.ext if self.ext else '') - if '@' in str(filepath): - pattern, filename = filepath.name.split('@') + pattern = "{neuron.id}" + (self.ext if self.ext else "") + if "@" in str(filepath): + pattern, filename = filepath.name.split("@") filepath = filepath.parent / filename # Make sure we have an iterable x = core.NeuronList(x) - with ZipFile(filepath, mode='w') as zf: + with ZipFile(filepath, mode="w") as zf: # Context-manager will remove temporary directory and its contents with tempfile.TemporaryDirectory() as tempdir: - for n in config.tqdm(x, disable=config.pbar_hide, - leave=config.pbar_leave, total=len(x), - desc='Writing'): + for n in config.tqdm( + x, + disable=config.pbar_hide, + leave=config.pbar_leave, + total=len(x), + desc="Writing", + ): # Save to temporary file f = None try: @@ -167,8 +182,11 @@ def write_zip(self, x, filepath, **kwargs): # Write to temporary file self.write_single(n, filepath=f, **kwargs) # Add file to zip - zf.write(f, arcname=pattern.format(neuron=n), - compress_type=compression) + zf.write( + f, + arcname=pattern.format(neuron=n), + compress_type=compression, + ) except BaseException: raise finally: @@ -184,7 +202,7 @@ def write_zip(self, x, filepath, **kwargs): def write_any(self, x, filepath, **kwargs): """Write any to file. Default entry point.""" # If target is a zipfile - if isinstance(filepath, (str, Path)) and str(filepath).endswith('.zip'): + if isinstance(filepath, (str, Path)) and str(filepath).endswith(".zip"): return self.write_zip(x, filepath=filepath, **kwargs) elif isinstance(x, core.NeuronList): return self.write_many(x, filepath=filepath, **kwargs) @@ -237,15 +255,14 @@ def __init__( if self.file_ext.startswith("*"): raise ValueError('File extension must be ".ext", not "*.ext"') - def files_in_dir(self, - dpath: Path, - include_subdirs: bool = DEFAULT_INCLUDE_SUBDIRS - ) -> Iterable[Path]: + def files_in_dir( + self, dpath: Path, include_subdirs: bool = DEFAULT_INCLUDE_SUBDIRS + ) -> Iterable[Path]: """List files to read in directory.""" if not isinstance(dpath, Path): dpath = Path(dpath) dpath = dpath.expanduser() - pattern = '*' + pattern = "*" if include_subdirs: pattern = os.path.join("**", pattern) @@ -285,9 +302,7 @@ def _make_attributes( Arbitrary string-keyed attributes. """ return merge_dicts( - dict( - created_at=str(datetime.datetime.now()) - ), + dict(created_at=str(datetime.datetime.now())), self.attrs, *dicts, **kwargs, @@ -295,7 +310,7 @@ def _make_attributes( def read_buffer( self, f: IO, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read buffer into a single neuron. Parameters @@ -309,12 +324,13 @@ def read_buffer( ------- core.NeuronObject """ - raise NotImplementedError('Reading from buffer not implemented for ' - f'{type(self)}') + raise NotImplementedError( + "Reading from buffer not implemented for " f"{type(self)}" + ) def read_file_path( self, fpath: os.PathLike, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read single file from path into a neuron. Parameters @@ -338,11 +354,12 @@ def read_file_path( raise ValueError(f"Error reading file {p}") from e def read_from_zip( - self, files: Union[str, List[str]], + self, + files: Union[str, List[str]], zippath: os.PathLike, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read given files from a zip into a NeuronList. Typically not used directly but via `read_zip()` dispatcher. @@ -367,17 +384,16 @@ def read_from_zip( files = utils.make_iterable(files) neurons = [] - with ZipFile(p, 'r') as zip: + with ZipFile(p, "r") as zip: for file in files: # Note the `file` is of type zipfile.ZipInfo here props = self.parse_filename(file.orig_filename) - props['origin'] = str(p) + props["origin"] = str(p) try: - n = self.read_bytes(zip.read(file), - merge_dicts(props, attrs)) + n = self.read_bytes(zip.read(file), merge_dicts(props, attrs)) neurons.append(n) except BaseException: - if on_error == 'ignore': + if on_error == "ignore": logger.warning(f'Failed to read "{file.filename}" from zip.') else: raise @@ -385,12 +401,13 @@ def read_from_zip( return core.NeuronList(neurons) def read_zip( - self, fpath: os.PathLike, + self, + fpath: os.PathLike, parallel="auto", limit: Optional[int] = None, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read files from a zip into a NeuronList. This is a dispatcher for `.read_from_zip`. @@ -412,22 +429,25 @@ def read_zip( """ fpath = Path(fpath).expanduser() - read_fn = partial(self.read_from_zip, - zippath=fpath, attrs=attrs, - on_error=on_error) - neurons = parallel_read_archive(read_fn=read_fn, - fpath=fpath, - file_ext=self.is_valid_file, - limit=limit, - parallel=parallel) + read_fn = partial( + self.read_from_zip, zippath=fpath, attrs=attrs, on_error=on_error + ) + neurons = parallel_read_archive( + read_fn=read_fn, + fpath=fpath, + file_ext=self.is_valid_file, + limit=limit, + parallel=parallel, + ) return core.NeuronList(neurons) def read_from_tar( - self, files: Union[str, List[str]], + self, + files: Union[str, List[str]], tarpath: os.PathLike, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read given files from a tar into a NeuronList. Typically not used directly but via `read_tar()` dispatcher. @@ -452,17 +472,18 @@ def read_from_tar( files = utils.make_iterable(files) neurons = [] - with tarfile.open(p, 'r') as tf: + with tarfile.open(p, "r") as tf: for file in files: # Note the `file` is of type tarfile.TarInfo here - props = self.parse_filename(file.name.split('/')[-1]) - props['origin'] = str(p) + props = self.parse_filename(file.name.split("/")[-1]) + props["origin"] = str(p) try: - n = self.read_bytes(tf.extractfile(file).read(), - merge_dicts(props, attrs)) + n = self.read_bytes( + tf.extractfile(file).read(), merge_dicts(props, attrs) + ) neurons.append(n) except BaseException: - if on_error == 'ignore': + if on_error == "ignore": logger.warning(f'Failed to read "{file.filename}" from tar.') else: raise @@ -470,12 +491,13 @@ def read_from_tar( return core.NeuronList(neurons) def read_tar( - self, fpath: os.PathLike, + self, + fpath: os.PathLike, parallel="auto", limit: Optional[int] = None, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read files from a tar archive into a NeuronList. This is a dispatcher for `.read_from_tar`. @@ -497,23 +519,134 @@ def read_tar( """ fpath = Path(fpath).expanduser() - read_fn = partial(self.read_from_tar, - tarpath=fpath, attrs=attrs, - on_error=on_error) - neurons = parallel_read_archive(read_fn=read_fn, - fpath=fpath, - file_ext=self.is_valid_file, - limit=limit, - parallel=parallel) + read_fn = partial( + self.read_from_tar, tarpath=fpath, attrs=attrs, on_error=on_error + ) + neurons = parallel_read_archive( + read_fn=read_fn, + fpath=fpath, + file_ext=self.is_valid_file, + limit=limit, + parallel=parallel, + ) + return core.NeuronList(neurons) + + def read_ftp( + self, + url, + parallel="auto", + limit: Optional[int] = None, + attrs: Optional[Dict[str, Any]] = None, + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": + """Read files from an FTP server. + + This is a dispatcher for `.read_from_tar`. + + Parameters + ---------- + url : str + Can be the path to a single file or a directory. + limit : int, optional + Limit the number of files read from this directory. + attrs : dict or None + Arbitrary attributes to include in the TreeNeuron. + on_error : 'ignore' | 'raise' + What do do when error is encountered. + + Returns + ------- + core.NeuronList + + """ + # Remove the ftp:// prefix + url = url.replace("ftp://", "") + + # Split into server and path + server, path = url.split("/", 1) + + # Check if server contains a port + if ":" in server: + server, port = server.split(":") + port = int(port) + else: + port = 21 # default port + + read_fn = partial(self.read_from_ftp, attrs=attrs, on_error=on_error) + neurons = parallel_read_ftp( + read_fn=read_fn, + server=server, + port=port, + path=path, + file_ext=self.is_valid_file, + limit=limit, + parallel=parallel, + ) + return core.NeuronList(neurons) + + def read_from_ftp( + self, + files: Union[str, List[str]], + ftp: FTP, + attrs: Optional[Dict[str, Any]] = None, + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": + """Read given files from an FTP server into a NeuronList. + + Typically not used directly but via `read_ftp()` dispatcher. + + Parameters + ---------- + files : tarfile.TarInfo | list thereof + Files inside the tar file to read. + ftp : ftplib.FTP | "GLOBAL" + The FTP client. This should already be connected, logged in + and in the correct directory. If "GLOBAL", we will look for a + `_FTP` global variable. + attrs : dict or None + Arbitrary attributes to include in the TreeNeuron. + on_error : 'ignore' | 'raise' + What do do when error is encountered. + + Returns + ------- + core.NeuronList + + """ + if ftp == "GLOBAL": + if "_FTP" not in globals(): + raise ValueError("No global FTP connection found.") + ftp = _FTP + + files = utils.make_iterable(files) + + neurons = [] + for file in files: + # Read the file into a bytes + with io.BytesIO() as f: + ftp.retrbinary("RETR " + file, f.write) + f.seek(0) + props = self.parse_filename(file) + props["origin"] = f"{ftp.host}:{ftp.port}{ftp.pwd()}/{file}" + try: + n = self.read_buffer(f, merge_dicts(props, attrs)) + neurons.append(n) + except BaseException: + if on_error == "ignore": + logger.warning(f'Failed to read "{file}" from FTP.') + else: + raise + return core.NeuronList(neurons) def read_directory( - self, path: os.PathLike, + self, + path: os.PathLike, include_subdirs=DEFAULT_INCLUDE_SUBDIRS, parallel="auto", limit: Optional[int] = None, - attrs: Optional[Dict[str, Any]] = None - ) -> 'core.NeuronList': + attrs: Optional[Dict[str, Any]] = None, + ) -> "core.NeuronList": """Read directory of files into a NeuronList. Parameters @@ -544,7 +677,7 @@ def read_directory( def read_url( self, url: str, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read file from URL into a neuron. Parameters @@ -569,16 +702,13 @@ def read_url( # the wrong format. with requests.get(url, stream=False) as r: r.raise_for_status() - props = self.parse_filename(url.split('/')[-1]) - props['origin'] = url - return self.read_buffer( - io.BytesIO(r.content), - merge_dicts(props, attrs) - ) + props = self.parse_filename(url.split("/")[-1]) + props["origin"] = url + return self.read_buffer(io.BytesIO(r.content), merge_dicts(props, attrs)) def read_string( self, s: str, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read single string into a Neuron. Parameters @@ -594,13 +724,12 @@ def read_string( """ sio = io.StringIO(s) return self.read_buffer( - sio, - merge_dicts({'name': self.name_fallback, 'origin': 'string'}, attrs) + sio, merge_dicts({"name": self.name_fallback, "origin": "string"}, attrs) ) def read_bytes( self, s: str, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read bytes into a Neuron. Parameters @@ -616,13 +745,12 @@ def read_bytes( """ sio = io.BytesIO(s) return self.read_buffer( - sio, - merge_dicts({'name': self.name_fallback, 'origin': 'string'}, attrs) + sio, merge_dicts({"name": self.name_fallback, "origin": "string"}, attrs) ) def read_dataframe( self, nodes: pd.DataFrame, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Convert a DataFrame into a neuron. Parameters @@ -635,12 +763,13 @@ def read_dataframe( ------- core.BaseNeuron """ - raise NotImplementedError('Reading DataFrames not implemented for ' - f'{type(self)}') + raise NotImplementedError( + "Reading DataFrames not implemented for " f"{type(self)}" + ) def read_any_single( self, obj, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Attempt to convert an arbitrary object into a neuron. Parameters @@ -660,7 +789,7 @@ def read_any_single( if isinstance(obj, pd.DataFrame): return self.read_dataframe(obj, attrs) if isinstance(obj, os.PathLike): - if str(obj).endswith('.zip'): + if str(obj).endswith(".zip"): return self.read_zip(obj, attrs=attrs) elif ".tar" in str(obj): return self.read_tar(obj, attrs=attrs) @@ -669,17 +798,17 @@ def read_any_single( # See if this might be a file (make sure to expand user) if os.path.isfile(os.path.expanduser(obj)): p = Path(obj).expanduser() - if p.suffix == '.zip': + if p.suffix == ".zip": return self.read_zip(p, attrs=attrs) return self.read_file_path(p, attrs) if obj.startswith("http://") or obj.startswith("https://"): return self.read_url(obj, attrs) + if obj.startswith("ftp://"): + return self.read_ftp(obj, attrs=attrs) return self.read_string(obj, attrs) if isinstance(obj, bytes): return self.read_bytes(obj, attrs) - raise ValueError( - f"Could not read neuron from object of type '{type(obj)}'" - ) + raise ValueError(f"Could not read neuron from object of type '{type(obj)}'") def read_any_multi( self, @@ -687,7 +816,7 @@ def read_any_multi( include_subdirs=DEFAULT_INCLUDE_SUBDIRS, parallel="auto", attrs: Optional[Dict[str, Any]] = None, - ) -> 'core.NeuronList': + ) -> "core.NeuronList": """Attempt to convert an arbitrary object into a NeuronList, potentially in parallel. @@ -730,7 +859,7 @@ def read_any_multi( if ( isinstance(parallel, str) - and parallel.lower() == 'auto' + and parallel.lower() == "auto" and len(new_objs) < 200 ): parallel = False @@ -746,7 +875,7 @@ def read_any( parallel="auto", limit=None, attrs: Optional[Dict[str, Any]] = None, - ) -> 'core.NeuronObject': + ) -> "core.NeuronObject": """Attempt to read an arbitrary object into a neuron. Parameters @@ -770,17 +899,19 @@ def read_any( except TypeError: pass try: - if os.path.isfile(os.path.expanduser(obj)) and str(obj).endswith('.zip'): + if os.path.isfile(os.path.expanduser(obj)) and str(obj).endswith( + ".zip" + ): return self.read_zip(obj, parallel, limit, attrs) if os.path.isfile(os.path.expanduser(obj)) and ".tar" in str(obj): return self.read_tar(obj, parallel, limit, attrs) + if isinstance(obj, str) and obj.startswith("ftp://"): + return self.read_ftp(obj, parallel, limit, attrs) except TypeError: pass return self.read_any_single(obj, attrs) - def parse_filename( - self, filename: str - ) -> dict: + def parse_filename(self, filename: str) -> dict: """Extract properties from filename according to specified formatter. Parameters @@ -800,7 +931,7 @@ def parse_filename( fmt = re.escape(self.fmt) # Unescape { and } - fmt = fmt.replace('\\{', '{').replace('\\}', '}') + fmt = fmt.replace("\\{", "{").replace("\\}", "}") # Replace all e.g. {name} with {.*} prop_names = [] @@ -814,37 +945,36 @@ def parse_filename( if not match: raise ValueError(f'Unable to match "{self.fmt}" to filename "{filename}"') - props = {'file': filename} + props = {"file": filename} for i, prop in enumerate(prop_names): - for p in prop.split(','): + for p in prop.split(","): # Ignore empty ("{}") if not p: continue # If datatype was specified if ":" in p: - p, dt = p.split(':') + p, dt = p.split(":") props[p] = match.group(i + 1) - if dt == 'int': + if dt == "int": props[p] = int(props[p]) - elif dt == 'float': + elif dt == "float": props[p] = float(props[p]) - elif dt == 'bool': + elif dt == "bool": props[p] = bool(props[p]) - elif dt == 'str': + elif dt == "str": props[p] = str(props[p]) else: - raise ValueError(f'Unable to interpret datatype "{dt}" ' - f'for property {p}') + raise ValueError( + f'Unable to interpret datatype "{dt}" ' f"for property {p}" + ) else: props[p] = match.group(i + 1) return props - def _extract_connectors( - self, nodes: pd.DataFrame - ) -> Optional[pd.DataFrame]: + def _extract_connectors(self, nodes: pd.DataFrame) -> Optional[pd.DataFrame]: """Infer outgoing/incoming connectors from data. Parameters @@ -859,7 +989,7 @@ def _extract_connectors( return -def parallel_read(read_fn, objs, parallel="auto") -> List['core.NeuronList']: +def parallel_read(read_fn, objs, parallel="auto") -> List["core.NeuronList"]: """Read neurons from some objects with the given reader function, potentially in parallel. @@ -885,15 +1015,15 @@ def parallel_read(read_fn, objs, parallel="auto") -> List['core.NeuronList']: prog = partial( config.tqdm, - desc='Importing', + desc="Importing", total=length, disable=config.pbar_hide, - leave=config.pbar_leave + leave=config.pbar_leave, ) if ( isinstance(parallel, str) - and parallel.lower() == 'auto' + and parallel.lower() == "auto" and not isinstance(length, type(None)) and length < 200 ): @@ -915,10 +1045,9 @@ def parallel_read(read_fn, objs, parallel="auto") -> List['core.NeuronList']: return neurons -def parallel_read_archive(read_fn, fpath, file_ext, - limit=None, - parallel="auto", - ignore_hidden=True) -> List['core.NeuronList']: +def parallel_read_archive( + read_fn, fpath, file_ext, limit=None, parallel="auto", ignore_hidden=True +) -> List["core.NeuronList"]: """Read neurons from a archive (zip or tar), potentially in parallel. Reader function must be picklable. @@ -955,38 +1084,38 @@ def parallel_read_archive(read_fn, fpath, file_ext, p = Path(fpath) to_read = [] - if p.name.endswith('.zip'): - with ZipFile(p, 'r') as zip: + if p.name.endswith(".zip"): + with ZipFile(p, "r") as zip: for i, file in enumerate(zip.filelist): - fname = file.filename.split('/')[-1] - if ignore_hidden and fname.startswith('._'): + fname = file.filename.split("/")[-1] + if ignore_hidden and fname.startswith("._"): continue if callable(file_ext): if file_ext(file): to_read.append(file) - elif file_ext == '*': + elif file_ext == "*": to_read.append(file) elif file_ext and fname.endswith(file_ext): to_read.append(file) - elif '.' not in file.filename: + elif "." not in file.filename: to_read.append(file) if isinstance(limit, int) and i >= limit: break - elif '.tar' in p.name: # can be ".tar", "tar.gz" or "tar.bz" - with tarfile.open(p, 'r') as tf: + elif ".tar" in p.name: # can be ".tar", "tar.gz" or "tar.bz" + with tarfile.open(p, "r") as tf: for i, file in enumerate(tf): - fname = file.name.split('/')[-1] - if ignore_hidden and fname.startswith('._'): + fname = file.name.split("/")[-1] + if ignore_hidden and fname.startswith("._"): continue if callable(file_ext): if file_ext(file): to_read.append(file) - elif file_ext == '*': + elif file_ext == "*": to_read.append(file) elif file_ext and fname.endswith(file_ext): to_read.append(file) - elif '.' not in file.filename: + elif "." not in file.filename: to_read.append(file) if isinstance(limit, int) and i >= limit: @@ -997,17 +1126,13 @@ def parallel_read_archive(read_fn, fpath, file_ext, prog = partial( config.tqdm, - desc='Importing', + desc="Importing", total=len(to_read), disable=config.pbar_hide, - leave=config.pbar_leave + leave=config.pbar_leave, ) - if ( - isinstance(parallel, str) - and parallel.lower() == 'auto' - and len(to_read) < 200 - ): + if isinstance(parallel, str) and parallel.lower() == "auto" and len(to_read) < 200: parallel = False if parallel: @@ -1026,6 +1151,148 @@ def parallel_read_archive(read_fn, fpath, file_ext, return neurons +def parallel_read_ftp( + read_fn, + server, + port, + path, + file_ext, + limit=None, + parallel="auto", + ignore_hidden=True, +) -> List["core.NeuronList"]: + """Read neurons from an FTP server, potentially in parallel. + + Reader function must be picklable. + + Parameters + ---------- + read_fn : Callable + server : str + FTP server address. + port : int + FTP server port. + path : str + Path to directory containing files or single file. + file_ext : str | callable + File extension to search for - e.g. ".swc". `None` or `''` + are interpreted as looking for filenames without extension. + To include all files use `'*'`. Can also be callable that + accepts a filename and returns True or False depending on + if it should be included. + limit : int, optional + Limit the number of files read from this directory. + parallel : str | bool | int + "auto" or True for n_cores // 2, otherwise int for number of + jobs, or false for serial. + ignore_hidden : bool + Archives zipped on OSX can end up containing a + `__MACOSX` folder with files that mirror the name of other + files. For example if there is a `123456.swc` in the archive + you might also find a `__MACOSX/._123456.swc`. Reading the + latter will result in an error. If ignore_hidden=True + we will simply ignore all file that starts with "._". + + Returns + ------- + core.NeuronList + + """ + # Check if this is a single file + is_single_file = False + if "*" not in path: + if isinstance(file_ext, str) and path.endswith(file_ext): + is_single_file = True + elif callable(file_ext) and file_ext(path.rsplit("/", 1)[1]): + is_single_file = True + + if is_single_file: + path, fname = path.rsplit("/", 1) + to_read = [fname] + else: + pattern = "" + # Check if path contains a "*." pattern - e.g. something like "*_raw.swc" + if "*" in path: + path, fname = path.rsplit("/", 1) + pattern = fname + + # Remove leading / + if path.startswith("/"): + path = path[1:] + + # First check content + with FTP() as ftp: + ftp.connect(server, port) # connect to server + ftp.login() # anonymous login + ftp.cwd(path) # change to path + + # Read content + content = [] + ftp.retrlines(f"LIST {pattern}", content.append) + + # Parse content into filenames + to_read = [] + for line in content: + if not line: + continue + file = line.split()[-1].strip() + + if callable(file_ext): + if file_ext(file): + to_read.append(file) + elif file_ext == "*": + to_read.append(file) + elif file_ext and fname.endswith(file_ext): + to_read.append(file) + + if isinstance(limit, int) and len(to_read) >= limit: + break + + if isinstance(limit, list): + to_read = [f for f in to_read if f in limit] + + prog = partial( + config.tqdm, + desc="Loading", + total=len(to_read), + disable=config.pbar_hide, + leave=config.pbar_leave, + ) + + if isinstance(parallel, str) and parallel.lower() == "auto" and len(to_read) < 200: + parallel = False + + if parallel: + # Do not swap this as `isinstance(True, int)` returns `True` + if isinstance(parallel, (bool, str)): + n_cores = max(1, os.cpu_count() // 2) + else: + n_cores = int(parallel) + + with mp.Pool( + processes=n_cores, initializer=_ftp_pool_init, initargs=(server, port, path) + ) as pool: + results = pool.imap(partial(read_fn, ftp="GLOBAL"), to_read) + neurons = list(prog(results)) + else: + with FTP() as ftp: + ftp.connect(server, port) + ftp.login() + ftp.cwd(path) + + neurons = [read_fn(file, ftp=ftp) for file in prog(to_read)] + + return neurons + + +def _ftp_pool_init(server, port, path): + global _FTP + _FTP = FTP() + _FTP.connect(server, port) + _FTP.login() + _FTP.cwd(path) + + def parse_precision(precision: Optional[int]): """Convert bit width into int and float dtypes. @@ -1051,5 +1318,5 @@ def parse_precision(precision: Optional[int]): return (INT_DTYPES[precision], FLOAT_DTYPES[precision]) except KeyError: raise ValueError( - f'Unknown precision {precision}. Expected on of the following: 16, 32 (default), 64 or None' + f"Unknown precision {precision}. Expected on of the following: 16, 32 (default), 64 or None" ) diff --git a/navis/io/swc_io.py b/navis/io/swc_io.py index 5a0c65d9..b6524381 100644 --- a/navis/io/swc_io.py +++ b/navis/io/swc_io.py @@ -368,12 +368,16 @@ def read_swc(f: Union[str, pd.DataFrame, Iterable], >>> s = navis.read_swc('skeletons.zip') # doctest: +SKIP - Sample first 100 SWC files a zip archive: + Sample first 100 SWC files in a zip archive: >>> s = navis.read_swc('skeletons.zip', limit=100) # doctest: +SKIP + Read first all SWC files an ftp folder: + + >>> s = navis.read_swc('ftp://server:port/path/to/swc/') # doctest: +SKIP + """ - # SwcReader will try its best to read whatever you throw at it - with limit + # SwcReader will try its best to read whatever you throw at it - with limited # sanity checks. For example: if you misspell a filepath, it will assume # that it's a SWC string (because anything that's a string but doesn't # point to an existing file or a folder MUST be a SWC) which will lead to