diff --git a/navis/core/neuronlist.py b/navis/core/neuronlist.py index 38f0c140..b4fa006b 100644 --- a/navis/core/neuronlist.py +++ b/navis/core/neuronlist.py @@ -1010,6 +1010,26 @@ def unmix(self): """ return {t: self.__class__([n for n in self.neurons if isinstance(n, t)]) for t in self.types} + + + def unique_nodes(self) -> core.TreeNeuron: + """Return neuronlist with unique node IDs""" + st_node = 1 + for sk in self: + keys = list(sk.nodes['node_id']) + values = list(range(st_node,st_node+len(keys))) + res = {keys[i]: values[i] for i in range(len(keys))} + sk.nodes.replace({"node_id": res}, inplace=True) + sk.nodes.replace({"parent_id": res}, inplace=True) + st_node += len(keys) + return self + + def to_TreeNeuron(self) -> core.TreeNeuron: + """Return single treeneuron object.""" + skels = self.unique_nodes() + skels = core.TreeNeuron(self.nodes) + skels.nodes.drop(columns=['neuron'], inplace=True) + return skels class _IdIndexer(): diff --git a/navis/io/__init__.py b/navis/io/__init__.py index f3c84254..3dc6a28c 100644 --- a/navis/io/__init__.py +++ b/navis/io/__init__.py @@ -17,7 +17,7 @@ from .precomputed_io import read_precomputed, write_precomputed from .hdf_io import read_h5, write_h5, inspect_h5 from .rda_io import read_rda -from .nmx_io import read_nmx, read_nml +from .nmx_io import read_nml, read_nmx, write_nml, write_nmx from .mesh_io import read_mesh, write_mesh from .tiff_io import read_tiff from .pq_io import read_parquet, write_parquet, scan_parquet @@ -29,6 +29,7 @@ 'read_precomputed', 'write_precomputed', 'read_tiff', 'read_rda', - 'read_nmx', 'read_nml', + 'read_nml', 'write_nml', + 'read_nmx', 'write_nmx', 'read_mesh', 'write_mesh', 'read_parquet', 'write_parquet', 'scan_parquet'] diff --git a/navis/io/nmx_io.py b/navis/io/nmx_io.py index 9d11392f..825de312 100644 --- a/navis/io/nmx_io.py +++ b/navis/io/nmx_io.py @@ -12,19 +12,24 @@ # GNU General Public License for more details. import io +import os import networkx as nx import pandas as pd import xml.etree.ElementTree as ET -from typing import Union, Dict, Optional, Any, IO, Iterable +from pathlib import Path + + +from typing import Union, Dict, Optional, Any, IO, Iterable, List from zipfile import ZipFile from .. import config, core from . import base +from .swc_io import make_swc_table, read_swc, write_swc -__all__ = ["read_nmx", "read_nml"] +__all__ = ["read_nmx", "read_nml", "write_nmx", "write_nml", "swc_to_nml", "nml_to_swc"] # Set up logging logger = config.get_logger(__name__) @@ -57,9 +62,7 @@ def __init__( 'radius': float_, } - def read_buffer( - self, f: IO, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.TreeNeuron': + def read_buffer(self, f: IO, attrs: Optional[Dict[str, Any]] = None) -> 'core.TreeNeuron': """Read .nml buffer into a TreeNeuron. NML files are XML-encoded files containing data for a single neuron. @@ -77,10 +80,11 @@ def read_buffer( """ return self.read_nml(f.read(), attrs=attrs) + def read_nml( self, f: IO, attrs: Optional[Dict[str, Any]] = None ) -> 'core.TreeNeuron': - """Read .nml buffer into a TreeNeuron. + """Read .nml buffers into a NeuronList. NML files are XML files containing a single neuron. @@ -101,6 +105,7 @@ def read_nml( f = io.StringIO(f) root = ET.parse(f).getroot() + nl = core.NeuronList(None) # Copy the attributes dict for element in root: if element.tag == 'thing': @@ -111,19 +116,21 @@ def read_nml( nodes.rename({'id': 'node_id'}, axis=1, inplace=True) nodes = nodes.astype({k: v for k, v in self._dtypes.items() if k in nodes.columns}) - G = nx.Graph() - G.add_edges_from(edges.values) - tree = nx.bfs_tree(G, list(G.nodes)[0]) - edges = pd.DataFrame(list(tree.edges), columns=['source', 'target']) - nodes['parent_id'] = edges.set_index('target').reindex(nodes.node_id.values).source.values - nodes['parent_id'] = nodes.parent_id.fillna(-1).astype(self._dtypes['node_id']) - nodes.sort_values('node_id', inplace=True) + G = nx.Graph() + G.add_edges_from(edges.values) + tree = nx.bfs_tree(G, list(G.nodes)[0]) + edges = pd.DataFrame(list(tree.edges), columns=['source', 'target']) + nodes['parent_id'] = edges.set_index('target').reindex(nodes.node_id.values).source.values + nodes['parent_id'] = nodes.parent_id.fillna(-1).astype(self._dtypes['node_id']) + nodes.sort_values('node_id', inplace=True) - return core.TreeNeuron( - nodes, - **(self._make_attributes({'name': 'NML', 'origin': 'nml'}, attrs)) - ) + nl.append(core.NeuronList(nodes)) + + for key,value in attrs.items(): + nl.set_neuron_attributes(value,key) + return nl + class NMXReader(NMLReader): """This is a version of the NML file reader that reads from zipped archives.""" @@ -296,3 +303,103 @@ def read_nml(f: Union[str, pd.DataFrame, Iterable], include_subdirs=include_subdirs) return neurons + + +def write_nml(x, filepath, return_node_map=False, single_file=True): + """Write TreeNeuron(s) to NML. + Follows the format described + `here `_. + Parameters + ---------- + x : TreeNeuron | Dotprops | NeuronList + filepath : str | pathlib.Path | list thereof + It will generate a single NML file in chosen filepath(see also ``filepath``). + ``Filepath`` must be a folder. + See Also + -------- + :func:`navis.read_nml` + Import skeleton from NML files. + """ + + if single_file==True: + if filepath.endswith(".nml") == False: + raise ValueError('For a single nml file, the filepath needs to end with .nml') + + if single_file==False: + if os.path.isdir(filepath) == False: + raise ValueError('For multiple nml files, an existing directory must be provided') + + # Format datatypes + x = core.NeuronList(x) + if x.type[0] == 'navis.Dotprops': + x = x.to_skeleton() + else: + x = x.unique_nodes() + + root = ET.Element('things') + # Parameters section + parameters = ET.SubElement(root, 'parameters') + offset = ET.SubElement(parameters, 'offset', x='0', y='0', z='0') + scale = ET.SubElement(parameters, 'scale', x='1', y='1', z='1') + + for ind,sk in enumerate(x): + + if single_file == False: + root = ET.Element('things') + # Parameters section + parameters = ET.SubElement(root, 'parameters') + offset = ET.SubElement(parameters, 'offset', x='0', y='0', z='0') + scale = ET.SubElement(parameters, 'scale', x='1', y='1', z='1') + + # This neuron + thing = ET.SubElement(root, 'thing', id=str(ind+1)) + thing.attrib["name"] = str(ind+1) + thing.attrib.update({"color.r": '0.0', + "color.g": '0.0', + "color.b": '1.0', + "color.a": '1.0'}) + + nodes = ET.SubElement(thing, 'nodes') + edges = ET.SubElement(thing, 'edges') + + for index,row in sk.nodes.iterrows(): + node = ET.SubElement(nodes, 'node') + node.attrib.update({"id": str(int(row["node_id"])), + "radius": str(row["radius"]), + "x": str(row["x"]), + "y": str(row["y"]), + "z": str(row["z"])}) + if row["parent_id"] != -1: + edge = ET.SubElement(edges, 'edge', source=str(int(row["parent_id"])), + target=str(int(row["node_id"]))) + + if single_file == False: + with open(filepath+str(ind)+".nml", 'wb') as file: + tree = ET.ElementTree(root) + ET.indent(tree, space=" ", level=0) + tree.write(file) + file.close() + + if single_file == True: + with open(filepath, 'wb') as file: + tree = ET.ElementTree(root) + ET.indent(tree, space=" ", level=0) + tree.write(file) + file.close() + +def write_nmx(): + """ + TODO: Generate NMX files (collection of NML files) + """ + raise NotImplementedError("Not yet implemented") + +def swc_to_nml(filepath: Union[str, Path], + outpath: Union[str, Path]): + skels = read_swc(filepath) + for sk in core.NeuronList(skels): + write_nml(sk,os.path.join(outpath, sk.id + ".nml")) + +def nml_to_swc(filepath: Union[str, Path], + outpath: Union[str, Path]): + sk = read_nml(filepath) + write_swc(sk,outpath)