diff --git a/MANIFEST.in b/MANIFEST.in index fb17c8c..e1fc8c1 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -4,7 +4,7 @@ include setup.cfg include LICENSE.md include pyproject.toml -recursive-include plasmapy_nei *.pyx *.c *.pxd +recursive-include plasmapy_nei *.pyx *.c *.pxd *.h5 recursive-include docs * recursive-include licenses * recursive-include cextern * diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 3f77ddd..821b212 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -51,7 +51,7 @@ jobs: # Only Upload to PyPI on tags ${{ if startsWith(variables['Build.SourceBranch'], 'refs/tags/') }}: pypi_connection_name : 'pypi_endpoint' - test_extras: 'dev' + test_extras: 'test' test_command: 'pytest --pyargs plasmapy_nei' submodules: false targets: diff --git a/plasmapy_nei/__init__.py b/plasmapy_nei/__init__.py index 6d5f8ec..4c37b70 100644 --- a/plasmapy_nei/__init__.py +++ b/plasmapy_nei/__init__.py @@ -4,15 +4,13 @@ try: from .version import __version__ + del version except Exception as exc: warnings.warn("Unable to import __version__") -else: - del version finally: del warnings from . import eigen from . import nei -# Then you can be explicit to control what ends up in the namespace, -__all__ = ["eigen"] +__all__ = ["eigen", "nei"] diff --git a/plasmapy_nei/eigen/__init__.py b/plasmapy_nei/eigen/__init__.py index 77f744f..6844556 100644 --- a/plasmapy_nei/eigen/__init__.py +++ b/plasmapy_nei/eigen/__init__.py @@ -1 +1,3 @@ """Classes for accessing eigentables for ionization and recombination rates.""" + +from .eigenclass import EigenData diff --git a/plasmapy_nei/eigen/eigenclass.py b/plasmapy_nei/eigen/eigenclass.py new file mode 100644 index 0000000..43b4919 --- /dev/null +++ b/plasmapy_nei/eigen/eigenclass.py @@ -0,0 +1,354 @@ +""" +Contains the class to provide access to eigenvalue data for the +ionization and recombination rates. +""" + +__all__ = ["EigenData"] + +import h5py +import numpy as np +from numpy import linalg as LA +import pkg_resources +import warnings + +try: + from plasmapy.particles import Particle, particle_input +except (ImportError, ModuleNotFoundError): + from plasmapy.atomic import Particle, particle_input + +max_atomic_number = 30 # TODO: double check if this is the correct number + + +def _get_equilibrium_charge_states(ioniz_rate, recomb_rate, natom): + """ + Compute the equilibrium charge state distribution for the + temperature specified using ionization and recombination rates. + + Parameters + ---------- + """ + nstates = natom + 1 + conce = np.zeros(nstates) + f = np.zeros(nstates + 1) + c = np.zeros(nstates + 1) + r = np.zeros(nstates + 1) + + # The start index is 1. + for i in range(nstates): + c[i + 1] = ioniz_rate[i] + r[i + 1] = recomb_rate[i] + + f[1] = 1.0 + f[2] = c[1] * f[1] / r[2] + + # simplest for hydrogen + if natom == 1: + f[1] = 1.0 / (1.0 + c[1] / r[2]) + f[2] = c[1] * f[1] / r[2] + conce[0:2] = f[1:3] + return conce + + # for other elements + + for k in range(2, natom): + f[k + 1] = (-c[k - 1] * f[k - 1] + (c[k] + r[k]) * f[k]) / r[k + 1] + + f[natom + 1] = c[natom] * f[natom] / r[natom + 1] + + f[1] = 1.0 / np.sum(f) + + f[2] = c[1] * f[1] / r[2] + + for k in range(2, natom): + f[k + 1] = (-c[k - 1] * f[k - 1] + (c[k] + r[k]) * f[k]) / r[k + 1] + + f[natom + 1] = c[natom] * f[natom] / r[natom + 1] + + conce[0:nstates] = f[1 : nstates + 1] + return conce + + +class EigenData: + """ + Provides access to ionization and recombination rate data. + + Parameters + ---------- + element : particle-like + Representation of the element to access data for. + + Examples + -------- + >>> eigendata = EigenData("He") + >>> eigendata.nstates + 3 + """ + + def _validate_element(self, element): + + # The following might not be needed if the check is in @particle_input + if not element.is_category(require="element", exclude=["ion", "isotope"]): + raise ValueError(f"{element} is not an element") + + if element.atomic_number > max_atomic_number: + raise ValueError("Need an element") + + self._element = element + + def _load_data(self): + """ + Retrieve data from the HDF5 file containing ionization and + recombination rates. + """ + filename = pkg_resources.resource_filename( + "plasmapy_nei", + "data/ionrecomb_rate.h5", # from Chianti database, version 8.07 + ) + + # TODO: enable different HDF5 files + + with h5py.File(filename, "r") as file: + self._temperature_grid = file["te_gird"][:] # TODO: fix typo in HDF5 file + self._ntemp = len(self.temperature_grid) + c_ori = file["ioniz_rate"][:] + r_ori = file["recomb_rate"][:] + + c_rate = np.zeros((self.ntemp, self.nstates)) + r_rate = np.zeros((self.ntemp, self.nstates)) + for temperature_index in range(self.ntemp): + for i in range(self.nstates - 1): + c_rate[temperature_index, i] = c_ori[ + i, self.element.atomic_number - 1, temperature_index + ] + for i in range(1, self.nstates): + r_rate[temperature_index, i] = r_ori[ + i - 1, self.element.atomic_number - 1, temperature_index + ] + + self._ionization_rate = np.ndarray( + shape=(self.ntemp, self.nstates), dtype=np.float64 + ) + self._recombination_rate = np.ndarray( + shape=(self.ntemp, self.nstates), dtype=np.float64 + ) + self._equilibrium_states = np.ndarray( + shape=(self.ntemp, self.nstates), dtype=np.float64 + ) + self._eigenvalues = np.ndarray( + shape=(self.ntemp, self.nstates), dtype=np.float64 + ) + + self._eigenvectors = np.ndarray( + shape=(self.ntemp, self.nstates, self.nstates), dtype=np.float64, + ) + + self._eigenvector_inverses = np.ndarray( + shape=(self.ntemp, self.nstates, self.nstates), dtype=np.float64, + ) + + self._ionization_rate[:, :] = c_rate[:, :] + self._recombination_rate[:, :] = r_rate[:, :] + + # Define the coefficients matrix A. The first dimension is + # for elements, and the second number of equations. + + nequations = self.nstates + coefficients_matrix = np.zeros( + shape=(self.nstates, nequations), dtype=np.float64 + ) + + # Enter temperature loop over the whole temperature grid + + for temperature_index in range(self.ntemp): + + # Ionization and recombination rate at Te(temperature_index) + carr = c_rate[temperature_index, :] + rarr = r_rate[temperature_index, :] + + eqi = _get_equilibrium_charge_states( + carr, rarr, self.element.atomic_number, + ) + + for ion in range(1, self.nstates - 1): + coefficients_matrix[ion, ion - 1] = carr[ion - 1] + coefficients_matrix[ion, ion] = -(carr[ion] + rarr[ion]) + coefficients_matrix[ion, ion + 1] = rarr[ion + 1] + + # The first row + coefficients_matrix[0, 0] = -carr[0] + coefficients_matrix[0, 1] = rarr[1] + + # The last row + coefficients_matrix[self.nstates - 1, self.nstates - 2] = carr[ + self.nstates - 2 + ] + coefficients_matrix[self.nstates - 1, self.nstates - 1] = -rarr[ + self.nstates - 1 + ] + + # Compute eigenvalues and eigenvectors + eigenvalues, eigenvectors = LA.eig(coefficients_matrix) + + # Rearrange the eigenvalues. + idx = np.argsort(eigenvalues) + eigenvalues = eigenvalues[idx] + eigenvectors = eigenvectors[:, idx] + + # Compute inverse of eigenvectors + v_inverse = LA.inv(eigenvectors) + + # transpose the order to as same as the Fortran Version + eigenvectors = eigenvectors.transpose() + v_inverse = v_inverse.transpose() + + # Save eigenvalues and eigenvectors into arrays + for j in range(self.nstates): + self._eigenvalues[temperature_index, j] = eigenvalues[j] + self._equilibrium_states[temperature_index, j] = eqi[j] + for i in range(self.nstates): + self._eigenvectors[temperature_index, i, j] = eigenvectors[i, j] + self._eigenvector_inverses[temperature_index, i, j] = v_inverse[ + i, j + ] + + @particle_input + def __init__(self, element: Particle): + + self._validate_element(element) + self._load_data() + + def _get_temperature_index(self, T_e): # TODO: extract this to a function + """Return the temperature index closest to a particular temperature.""" + T_e_array = self._temperature_grid + + # Check the temperature range + T_e_grid_max = np.amax(T_e_array) + T_e_grid_min = np.amin(T_e_array) + + if T_e == T_e_grid_max: + return self._ntemp - 1 + elif T_e == T_e_grid_min: + return 0 + elif T_e > T_e_grid_max: + warnings.warn( + f"Temperature exceeds the temperature grid " + f"boundary: temperature index will be reset " + f"to {self._ntemp - 1}", + UserWarning, + ) + return self._ntemp - 1 + elif T_e < T_e_grid_min: + warnings.warn( + f"Temperature is below the temperature grid " + f"boundary: temperature index will be reset to " + f"0.", + UserWarning, + ) + return 0 + + # TODO: Add a test to check that the temperature grid is monotonic + + res = np.where(T_e_array >= T_e) + res_ind = res[0] + index = res_ind[0] + dte_l = abs(T_e - T_e_array[index - 1]) # re-check the neighbor point + dte_r = abs(T_e - T_e_array[index]) + if dte_l <= dte_r: + index = index - 1 + return index + + @property + def temperature(self): + return self._temperature + + @property + def temperature(self, T_e): + self._temperature = T_e + self._te_index = self._get_temperature_index(T_e) + + @property + def temperature_grid(self): + return self._temperature_grid + + @property + def element(self) -> Particle: + """The element corresponding to an instance of this class.""" + return self._element + + @property + def nstates(self) -> int: + """Number of charge states corresponding to the element.""" + return self.element.atomic_number + 1 + + @property + def ntemp(self) -> int: + """Number of points in ``temperature_grid``.""" + return self._ntemp + + @property + def ionization_rate(self): + return self._ionization_rate + + @property + def recombination_rate(self): + return self._recombination_rate + + def eigenvalues(self, T_e=None, T_e_index=None): + """ + Returns the eigenvalues for the ionization and recombination + rates for the temperature specified in the class. + """ + if T_e_index: + return self._eigenvalues[T_e_index, :] + elif T_e: + T_e_index = self._get_temperature_index(T_e) + return self._eigenvalues[T_e_index, :] + elif self.temperature: + return self._eigenvalues[self._te_index, :] + else: + raise AttributeError("The temperature has not been set.") + + def eigenvectors(self, T_e=None, T_e_index=None): + """ + Returns the eigenvectors for the ionization and recombination + rates for the temperature specified in the class. + """ + if T_e_index: + return self._eigenvectors[T_e_index, :, :] + elif T_e: + T_e_index = self._get_temperature_index(T_e) + return self._eigenvectors[T_e_index, :, :] + elif self.temperature: + return self._eigenvectors[self._te_index, :, :] + else: + raise AttributeError("The temperature has not been set.") + + def eigenvector_inverses(self, T_e=None, T_e_index=None): + """ + Returns the inverses of the eigenvectors for the ionization and + recombination rates for the temperature specified in the class. + """ + if T_e_index: + return self._eigenvector_inverses[T_e_index, :, :] + elif T_e: + T_e_index = self._get_temperature_index(T_e) + return self._eigenvector_inverses[T_e_index, :, :] + elif self.temperature: + return self._eigenvector_inverses[self._te_index, :, :] + else: + raise AttributeError("The temperature has not been set.") + + def equilibrium_state(self, T_e=None, T_e_index=None): + """ + Return the equilibrium charge state distribution for the + temperature specified in the class. + """ + if T_e_index: + return self._equilibrium_states[T_e_index, :] + elif T_e: + T_e_index = self._get_temperature_index(T_e) + return self._equilibrium_states[T_e_index, :] + elif self.temperature: + return self._equilibrium_states[self._te_index, :] + else: + raise AttributeError("The temperature has not been set.") diff --git a/plasmapy_nei/eigen/tests/test_eigen.py b/plasmapy_nei/eigen/tests/test_eigen.py deleted file mode 100644 index 1d2d124..0000000 --- a/plasmapy_nei/eigen/tests/test_eigen.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Tests for eigentables""" - - -def test_import(): - """Test that the subpackage can be imported.""" - import plasmapy_nei.eigen diff --git a/plasmapy_nei/eigen/tests/test_eigenclass.py b/plasmapy_nei/eigen/tests/test_eigenclass.py new file mode 100644 index 0000000..18db8dd --- /dev/null +++ b/plasmapy_nei/eigen/tests/test_eigenclass.py @@ -0,0 +1,135 @@ +"""Tests of the class to store package data.""" + +from plasmapy import atomic +from plasmapy_nei.eigen import EigenData +import pytest +import numpy as np + + +@pytest.mark.parametrize("atomic_numb", np.arange(1, 30)) +def test_instantiation(atomic_numb): + try: + element_symbol = atomic.atomic_symbol(int(atomic_numb)) + EigenData(element=element_symbol) + except Exception as exc: + raise Exception( + f"Problem with instantiation for atomic number={atomic_numb}." + ) from exc + + +def time_advance_solver_for_testing(natom, te, ne, dt, f0, table): + """Testing function for performing time advance calculations""" + + common_index = table._get_temperature_index(te) + evals = table.eigenvalues( + T_e_index=common_index + ) # find eigenvalues on the chosen Te node + evect = table.eigenvectors(T_e_index=common_index) + evect_invers = table.eigenvector_inverses(T_e_index=common_index) + + # define the temperary diagonal matrix + diagona_evals = np.zeros((natom + 1, natom + 1)) + for ii in range(0, natom + 1): + diagona_evals[ii, ii] = np.exp(evals[ii] * dt * ne) + + # matrix operation + matrix_1 = np.dot(diagona_evals, evect) + matrix_2 = np.dot(evect_invers, matrix_1) + + # get ionic fraction at (time+dt) + ft = np.dot(f0, matrix_2) + + # re-check the smallest value + minconce = 1.0e-15 + for ii in np.arange(0, natom + 1, dtype=np.int): + if abs(ft[ii]) <= minconce: + ft[ii] = 0.0 + return ft + + +@pytest.mark.parametrize("natom", [1, 2, 6, 7, 8]) +def test_reachequlibrium_state(natom): + """ + Starting the random initial distribution, the charge states will reach + to equilibrium cases after a long time. + In this test, we set the ionization and recombination rates at + Te0=2.0e6 K and plasma density ne0=1.0e+7. A random charge states + distribution will be finally closed to equilibrium distribution at + 2.0e6K. + """ + # + # Initial conditions, set plasma temperature, density and dt + # + element_symbol = atomic.atomic_symbol(int(natom)) + te0 = 1.0e6 + ne0 = 1.0e8 + + # Start from any ionizaiont states, e.g., Te = 4.0d4 K, + time = 0 + table = EigenData(element=natom) + f0 = table.equilibrium_state(T_e=4.0e4) + + print("START test_reachequlibrium_state:") + print(f"time_sta = ", time) + print(f"INI: ", f0) + print(f"Sum(f0) = ", np.sum(f0)) + + # After time + dt: + dt = 1.0e7 + ft = time_advance_solver_for_testing(natom, te0, ne0, time + dt, f0, table) + + print(f"time_end = ", time + dt) + print(f"NEI:", ft) + print(f"Sum(ft) = ", np.sum(ft)) + print(f"EI :", table.equilibrium_state(T_e=te0)) + print("End Test.\n") + + assert np.isclose(np.sum(ft), 1), "np.sum(ft) is not approximately 1" + assert np.isclose(np.sum(f0), 1), "np.sum(f0) is not approximately 1" + assert np.allclose(ft, table.equilibrium_state(T_e=te0)) + + +def test_reachequlibrium_state_multisteps(natom=8): + """ + Starting the random initial distribution, the charge states will reach + to equilibrium cases after a long time (multiple steps). + In this test, we set the ionization and recombination rates at + Te0=2.0e6 K and plasma density ne0=1.0e+7. A random charge states + distribution will be finally closed to equilibrium distribution at + 2.0e6K. + """ + # + # Initial conditions, set plasma temperature, density and dt + # + te0 = 1.0e6 # unit: K + ne0 = 1.0e8 # unit: cm^-3 + + # Start from any ionization state, e.g., Te = 4.0d4 K, + time = 0 + table = EigenData(element=natom) + f0 = table.equilibrium_state(T_e=4.0e4) + + # print(f"time_sta = ", time) + # print(f"INI: ", f0) + # print(f"Sum(f0) = ", np.sum(f0)) + + # After time + dt: + dt = 100000.0 # unit: second + + # Enter the time loop: + for it in range(100): + ft = time_advance_solver_for_testing(natom, te0, ne0, time + dt, f0, table) + f0 = np.copy(ft) + time = time + dt + + # print(f"time_end = ", time + dt) + # print(f"NEI:", ft) + # print(f"Sum(ft) = ", np.sum(ft)) + + assert np.isclose(np.sum(ft), 1) + + # print(f"EI :", table.equilibrium_state(T_e=te0)) + # print("End Test.\n") + + +# TODO: Test that appropriate exceptions are raised for invalid inputs diff --git a/plasmapy_nei/example_subpkg/__init__.py b/plasmapy_nei/example_subpkg/__init__.py deleted file mode 100644 index 621b0a7..0000000 --- a/plasmapy_nei/example_subpkg/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -This is the docstring for the examplesubpkg package. Normally you would -have whatever.py files in this directory implementing some modules, but this -is just an example sub-package, so it doesn't actually do anything. -""" diff --git a/plasmapy_nei/example_subpkg/data/.gitignore b/plasmapy_nei/example_subpkg/data/.gitignore deleted file mode 100644 index e69de29..0000000 diff --git a/plasmapy_nei/example_subpkg/tests/__init__.py b/plasmapy_nei/example_subpkg/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/plasmapy_nei/nei/__init__.py b/plasmapy_nei/nei/__init__.py index da06f7b..239364c 100644 --- a/plasmapy_nei/nei/__init__.py +++ b/plasmapy_nei/nei/__init__.py @@ -1,3 +1,3 @@ """Classes that perform non-equilibrium ionization modeling""" -from plasmapy_nei.nei.nei import NEI +from plasmapy_nei.nei.nei import NEI, NEIError diff --git a/plasmapy_nei/nei/nei.py b/plasmapy_nei/nei/nei.py index 0111d02..43bb4ea 100644 --- a/plasmapy_nei/nei/nei.py +++ b/plasmapy_nei/nei/nei.py @@ -1,7 +1,1367 @@ """Contains classes to represent non-equilibrium ionization simulations.""" -__all__ = ["NEI"] +__all__ = ["NEI", "NEIError"] -class NEI: +import numpy as np +from typing import Union, Optional, List, Dict, Callable +import astropy.units as u +import plasmapy as pl +from scipy import interpolate, optimize +from plasmapy_nei.eigen import EigenData +from plasmapy.atomic import IonizationStates +import warnings + + + +# TODO: Allow this to keep track of velocity and position too, and +# eventually to have density and temperature be able to be functions of +# position. (and more complicated expressions for density and +# temperature too) + +# TODO: Expand Simulation docstring + + +# TODO: Include the methods in the original Visualize class which is a +# subclass of NEI in the NEI-modeling/NEI repo. These were deleted +# temporarily to make it possible to get the NEI class itself +# adapted into this package. + + +# TODO: In this file and test_nei.py, there are a few places with +# initial.ionic_fractions.keys(), where initial is an instance +# of IonizationStates. This workaround exists because I forgot +# to put in an `elements` attribute in IonizationStates, and +# should be corrected. + +class NEIError(Exception): pass + + +class Simulation: + """ + Results from a non-equilibrium ionization simulation. + + Parameters + ---------- + initial: plasmapy.atomic.IonizationStates + The `IonizationStates` instance representing the ionization + states of different elements and plasma properties as the + initial conditions. + + n_init: ~astropy.units.quantity + The initial number density scaling factor. + + T_e_init: ~astropy.units.quantity + The initial electron temperature. + + max_steps: int + The maximum number of time steps that the simulation can take + before stopping. + + time_start: ~astropy.units.Quantity + The time at the start of the simulation. + + """ + + def __init__( + self, + initial: IonizationStates, + n_init: u.Quantity, + T_e_init: u.Quantity, + max_steps: int, + time_start: u.Quantity, + ): + + self._elements = list(initial.ionic_fractions.keys()) + self._abundances = initial.abundances + self._max_steps = max_steps + + self._nstates = { + elem: pl.atomic.atomic_number(elem) + 1 for elem in self.elements + } + + self._ionic_fractions = { + elem: np.full((max_steps + 1, self.nstates[elem]), np.nan, dtype=np.float64) + for elem in self.elements + } + + self._number_densities = { + elem: np.full((max_steps + 1, self.nstates[elem]), np.nan, dtype=np.float64) + * u.cm ** -3 + for elem in self.elements + } + + self._n_elem = { + elem: np.full(max_steps + 1, np.nan) * u.cm ** -3 for elem in self.elements + } + + self._n_e = np.full(max_steps + 1, np.nan) * u.cm ** -3 + self._T_e = np.full(max_steps + 1, np.nan) * u.K + self._time = np.full(max_steps + 1, np.nan) * u.s + + self._index = 0 + + self._assign( + new_time=time_start, + new_ionfracs=initial.ionic_fractions, + new_n=n_init, + new_T_e=T_e_init, + ) + + def _assign( + self, + new_time: u.Quantity, + new_ionfracs: Dict[str, np.ndarray], + new_n: u.Quantity, + new_T_e: u.Quantity, + ): + """ + Store results from a time step of a non-equilibrium ionization + time advance in the `~plasmapy_nei.classes.NEI` class. + + Parameters + ---------- + new_time: ~astropy.units.Quantity + The time associated with this time step. + + new_ionfracs: dict + The new ionization fractions for this time step. The keys + of this `dict` are the atomic symbols of the elements being + tracked, and with the corresponding value being an + `~numpy.ndarray` representing the ionic fractions. Each + element's array must have a length of the atomic number plus + one, and be normalized to one with all values between zero + and one. + + new_n: ~astropy.units.Quantity + The new number density scaling factor for this time step. + The number densities of each ionic species will be the + product of this scaling factor, the element's abundance, and + the ionic fraction given in `new_ionfracs`. + + new_T_e: ~astropy.units.Quantity + The new electron temperature. + + """ + + try: + index = self._index + elements = self.elements + self._time[index] = new_time + self._T_e[index] = new_T_e + + for elem in elements: + self._ionic_fractions[elem][index, :] = new_ionfracs[elem][:] + + # Calculate elemental and ionic number densities + n_elem = {elem: new_n * self.abundances[elem] for elem in elements} + number_densities = { + elem: n_elem[elem] * new_ionfracs[elem] for elem in elements + } + + # Calculate the electron number density + n_e = 0.0 * u.cm ** -3 + for elem in elements: + integer_charges = np.linspace( + 0, self.nstates[elem] - 1, self.nstates[elem] + ) + n_e += np.sum(number_densities[elem] * integer_charges) + + # Assign densities + self._n_e[index] = n_e + for elem in elements: + self._n_elem[elem][index] = n_elem[elem] + self._number_densities[elem][index, :] = number_densities[elem] + + except Exception as exc: + raise NEIError( + f"Unable to assign parameters to Simulation instance " + f"for index {index} at time = {new_time}. The " + f"parameters are new_n = {new_n}, new_T_e = {new_T_e}, " + f"and new_ionic_fractions = {new_ionfracs}." + ) from exc + finally: + self._index += 1 + + def _cleanup(self): + """ + Clean up this class after the simulation is complete. + + This method removes the excess elements from each array that + did not end up getting used for a time step in the simulation + and sets the `last_step` attribute. + + """ + nsteps = self._index + + self._n_e = self._n_e[0:nsteps] + self._T_e = self._T_e[0:nsteps] + self._time = self._time[0:nsteps] + + for element in self.elements: + self._ionic_fractions[element] = self._ionic_fractions[element][0:nsteps, :] + self._number_densities[element] = self._number_densities[element][ + 0:nsteps, : + ] + + self._last_step = nsteps - 1 + + self._index = None + + @property + def max_steps(self) -> int: + """ + The maximum number of time steps allowed for this simulation. + """ + return self._max_steps + + @property + def last_step(self) -> int: + """ + The time index of the last step. + """ + return self._last_step + + @property + def nstates(self) -> Dict[str, int]: + """ + Return the dictionary containing atomic symbols as keys and the + number of ionic species for the corresponding element as the + value. + """ + return self._nstates + + @property + def elements(self) -> List[str]: + """The elements modeled by this simulation.""" + return self._elements + + @property + def abundances(self) -> Dict[str, float]: + """ + The relative elemental abundances of the elements modeled in + this simulation. + + The keys are the atomic symbols and the values are a `float` + representing that element's elemental abundance. + + """ + return self._abundances + + @property + def ionic_fractions(self) -> Dict[str, np.ndarray]: + """ + Return the ionic fractions over the course of the simulation. + + The keys of this dictionary are atomic symbols. The values are + 2D arrays where the first index refers to the time step and the + second index refers to the integer charge. + + """ + return self._ionic_fractions + + @property + def number_densities(self) -> Dict[str, u.Quantity]: + """ + Return the number densities over the course of the simulation. + + The keys of `number_densities` are atomic symbols. The values + are 2D arrays with units of number density where the first index + refers to the time step and the second index is the integer + charge. + + """ + return self._number_densities + + @property + def n_elem(self) -> Dict[str, u.Quantity]: + """ + The number densities of each element over the course of the + simulation. + + The keys of `n_elem` are atomic symbols. The values are 1D + arrays with units of number density where the index refers to + the time step. + + """ + return self._n_elem + + @property + def n_e(self) -> u.Quantity: + """ + The electron number density over the course of the simulation in + units of number density. + + The index of this array corresponds to the time step. + """ + return self._n_e + + @property + def T_e(self) -> u.Quantity: + """ + The electron temperature over the course of the simulation in + kelvin. + + The index of this array corresponds to the time step. + """ + return self._T_e + + @property + def time(self) -> u.Quantity: + """ + The time for each time step over the course of the simulation + in units of seconds. + """ + return self._time + + +class NEI: + r""" + Perform and analyze a non-equilibrium ionization simulation. + + Parameters + ---------- + inputs + + T_e: `~astropy.units.Quantity` or `callable` + The electron temperature, which may be a constant, an array of + temperatures corresponding to the times in `time_input`, or a + function that yields the temperature as a function of time. + + n: `~astropy.units.Quantity` or `callable` + The number density multiplicative factor. The number density of + each element will be `n` times the abundance given in + `abundances`. For example, if `abundance['H'] = 1`, then this + will correspond to the number density of hydrogen (including + neutral hydrogen and protons). This factor may be a constant, + an array of number densities over time, or a function that + yields a number density as a function of time. + + time_input: ~astropy.units.Quantity, optional + An array containing the times associated with `n` and `T_e` in + units of time. + + time_start: ~astropy.units.Quantity, optional + The start time for the simulation. If density and/or + temperature are given by arrays, then this argument must be + greater than `time_input[0]`. If this argument is not supplied, + then `time_start` defaults to `time_input[0]` (if given) and + zero seconds otherwise. + + time_max: ~astropy.units.Quantity + The maximum time for the simulation. If density and/or + temperature are given by arrays, then this argument must be less + than `time_input[-1]`. + + max_steps: `int` + The maximum number of time steps to be taken during a + simulation. + + dt: `~astropy.units.Quantity` + The time step. If `adapt_dt` is `False`, then `dt` is the time + step for the whole simulation. + + dt_max: `~astropy.units.Quantity` + The maximum time step to be used with an adaptive time step. + + dt_min: `~astropy.units.Quantity` + The minimum time step to be used with an adaptive time step. + + adapt_dt: `bool` + If `True`, change the time step based on the characteristic + ionization and recombination time scales and change in + temperature. Not yet implemented. + + safety_factor: `float` or `int` + A multiplicative factor to multiply by the time step when + `adapt_dt` is `True`. Lower values improve accuracy, whereas + higher values reduce computational time. Not yet implemented. + + tol: float + The absolute tolerance to be used in comparing ionic fractions. + + verbose: bool, optional + A flag stating whether or not to print out information for every + time step. Setting `verbose` to `True` is useful for testing. + Defaults to `False`. + + abundances: dict + + Examples + -------- + + >>> import numpy as np + >>> import astropy.units as u + + >>> inputs = {'H': [0.9, 0.1], 'He': [0.9, 0.099, 0.001]} + >>> abund = {'H': 1, 'He': 0.085} + >>> n = np.array([1e9, 1e8]) * u.cm ** -3 + >>> T_e = np.array([10000, 40000]) * u.K + >>> time = np.array([0, 300]) * u.s + >>> dt = 0.25 * u.s + + The initial conditions can be accessed using the initial attribute. + + >>> sim = NEI(inputs=inputs, abundances=abund, n=n, T_e=T_e, time_input=time, adapt_dt=False, dt=dt) + + After having inputted all of the necessary information, we can run + the simulation. + + >>> results = sim.simulate() + + The initial results are stored in the `initial` attribute. + + >>> sim.initial.ionic_fractions['H'] + array([0.9, 0.1]) + + The final results can be access with the `final` attribute. + + >>> sim.final.ionic_fractions['H'] + array([0.16665179, 0.83334821]) + >>> sim.final.ionic_fractions['He'] + array([0.88685261, 0.11218358, 0.00096381]) + >>> sim.final.T_e + + + Both `initial` and `final` are instances of the `IonizationStates` + class. + + Notes + ----- + The ionization and recombination rates are from Chianti version + 8.x. These rates include radiative and dielectronic recombination. + Photoionization is not included. + + """ + + def __init__( + self, + inputs, + abundances: Union[Dict, str] = None, + T_e: Union[Callable, u.Quantity] = None, + n: Union[Callable, u.Quantity] = None, + time_input: u.Quantity = None, + time_start: u.Quantity = None, + time_max: u.Quantity = None, + max_steps: Union[int, np.integer] = 10000, + tol: Union[int, float] = 1e-15, + dt: u.Quantity = None, + dt_max: u.Quantity = np.inf * u.s, + dt_min: u.Quantity = 0 * u.s, + adapt_dt: bool = None, + safety_factor: Union[int, float] = 1, + verbose: bool = False, + ): + + try: + + self.time_input = time_input + self.time_start = time_start + self.time_max = time_max + self.T_e_input = T_e + self.n_input = n + self.max_steps = max_steps + self.dt_input = dt + + if self.dt_input is None: + self._dt = self.time_max / max_steps + else: + self._dt = self.dt_input + + self.dt_min = dt_min + self.dt_max = dt_max + self.adapt_dt = adapt_dt + self.safety_factor = safety_factor + self.verbose = verbose + + T_e_init = self.electron_temperature(self.time_start) + n_init = self.hydrogen_number_density(self.time_start) + + self.initial = IonizationStates( + inputs=inputs, + abundances=abundances, + T_e=T_e_init, + n=n_init, + tol=tol, + ) + + self.tol = tol + + # TODO: Update IonizationStates in PlasmaPy to have elements attribute + + self.elements = list(self.initial.ionic_fractions.keys()) + + if "H" not in self.elements: + raise NEIError("Must have H in elements") + + self.abundances = self.initial.abundances + + self._EigenDataDict = { + element: EigenData(element) for element in self.elements + } + + if self.T_e_input is not None and not isinstance(inputs, dict): + for element in self.initial.ionic_fractions.keys(): + self.initial.ionic_fractions[element] = self.EigenDataDict[ + element + ].equilibrium_state(T_e_init.value) + + self._temperature_grid = self._EigenDataDict[ + self.elements[0] + ].temperature_grid + + self._get_temperature_index = self._EigenDataDict[ + self.elements[0] + ]._get_temperature_index + + except Exception: + raise NEIError( + f"Unable to create NEI instance for:\n" + f" inputs = {inputs}\n" + f" abundances = {abundances}\n" + f" T_e = {T_e}\n" + f" n = {n}\n" + f" time_input = {time_input}\n" + f" time_start = {time_start}\n" + f" time_max = {time_max}\n" + f" max_steps = {max_steps}\n" + ) + + def equil_ionic_fractions( + self, T_e: u.Quantity = None, time: u.Quantity = None, + ) -> Dict[str, np.ndarray]: + """ + Return the equilibrium ionic fractions for a temperature or at + a given time. + + Parameters + ---------- + T_e: ~astropy.units.Quantity, optional + The electron temperature in units that can be converted to + kelvin. + + time: ~astropy.units.Quantity, optional + The time in units that can be converted to seconds. + + Returns + ------- + equil_ionfracs: dict + The equilibrium ionic fractions for the elements contained + within this class + + Notes + ----- + Only one of `T_e` and `time` may be included as an argument. If + neither `T_e` or `time` is provided and the temperature for the + simulation is given by a constant, the this method will assume + that `T_e` is the temperature of the simulation. + + """ + + if T_e is not None and time is not None: + raise NEIError("Only one of T_e and time may be an argument.") + + if T_e is None and time is None: + if self.T_e_input.isscalar: + T_e = self.T_e_input + else: + raise NEIError + + try: + T_e = T_e.to(u.K) if T_e is not None else None + time = time.to(u.s) if time is not None else None + except Exception as exc: + raise NEIError("Invalid input to equilibrium_ionic_fractions.") from exc + + if time is not None: + T_e = self.electron_temperature(time) + + if not T_e.isscalar: + raise NEIError("Need scalar input for equil_ionic_fractions.") + + equil_ionfracs = {} + for element in self.elements: + equil_ionfracs[element] = self.EigenDataDict[element].equilibrium_state( + T_e.value + ) + + return equil_ionfracs + + @property + def elements(self) -> List[str]: + return self._elements + + @elements.setter + def elements(self, elements): + # TODO: Update this + self._elements = elements + + @property + def abundances(self) -> Dict[str, Union[float, int]]: + """Return the abundances.""" + return self._abundances + + @abundances.setter + def abundances(self, abund: Dict[Union[str, int], Union[float, int]]): + + # TODO: Update initial, etc. when abundances is updated. The + # checks within IonizationStates will also be checks for + + # TODO: Update initial and other attributes when abundances is + # updated. + + self._abundances = abund + + @property + def tol(self) -> float: + """ + The tolerance for comparisons between different ionization + states. + """ + return self._tol + + @tol.setter + def tol(self, value: Union[float, int]): + try: + value = float(value) + except Exception as exc: + raise TypeError("Invalid tolerance.") + if not 0 <= value < 1: + raise ValueError("Need 0 <= tol < 1.") + self._tol = value + + @property + def time_input(self) -> Optional[u.Quantity]: + return self._time_input + + @time_input.setter + def time_input(self, times: Optional[u.Quantity]): + if times is None: + self._time_input = None + elif isinstance(times, u.Quantity): + if times.isscalar: + raise ValueError("time_input must be an array.") + try: + times = times.to(u.s) + except u.UnitConversionError: + raise u.UnitsError("time_input must have units of seconds.") from None + if not np.all(times[1:] > times[:-1]): + raise ValueError("time_input must monotonically increase.") + self._time_input = times + else: + raise TypeError("Invalid time_input.") + + @property + def time_start(self) -> u.Quantity: + """The start time of the simulation.""" + return self._time_start + + @time_start.setter + def time_start(self, time: u.Quantity): + if time is None: + self._time_start = 0.0 * u.s + elif isinstance(time, u.Quantity): + if not time.isscalar: + raise ValueError("time_start must be a scalar") + try: + time = time.to(u.s) + except u.UnitConversionError: + raise u.UnitsError("time_start must have units of seconds") from None + if ( + hasattr(self, "_time_max") + and self._time_max is not None + and self._time_max <= time + ): + raise ValueError("Need time_start < time_max.") + if self.time_input is not None and self.time_input.min() > time: + raise ValueError("time_start must be less than min(time_input)") + self._time_start = time + else: + raise TypeError("Invalid time_start.") from None + + @property + def time_max(self) -> u.Quantity: + """The maximum time allowed for the simulation.""" + return self._time_max + + @time_max.setter + def time_max(self, time: u.Quantity): + if time is None: + self._time_max = ( + self.time_input[-1] if self.time_input is not None else np.inf * u.s + ) + elif isinstance(time, u.Quantity): + if not time.isscalar: + raise ValueError("time_max must be a scalar") + try: + time = time.to(u.s) + except u.UnitConversionError: + raise u.UnitsError("time_max must have units of seconds") from None + if ( + hasattr(self, "_time_start") + and self._time_start is not None + and self._time_start >= time + ): + raise ValueError("time_max must be greater than time_start") + self._time_max = time + else: + raise TypeError("Invalid time_max.") from None + + @property + def adapt_dt(self) -> Optional[bool]: + """ + Return `True` if the time step is set to be adaptive, `False` + if the time step is set to not be adapted, and `None` if this + attribute was not set. + """ + return self._adapt_dt + + @adapt_dt.setter + def adapt_dt(self, choice: Optional[bool]): + if choice is None: + self._adapt_dt = True if self.dt_input is None else False + elif choice is True or choice is False: + self._adapt_dt = choice + else: + raise TypeError("Invalid value for adapt_dt") + + @property + def dt_input(self) -> Optional[u.Quantity]: + """Return the inputted time step.""" + return self._dt_input + + @dt_input.setter + def dt_input(self, dt: Optional[u.Quantity]): + if dt is None: + self._dt_input = None + elif isinstance(dt, u.Quantity): + try: + dt = dt.to(u.s) + if dt > 0 * u.s: + self._dt_input = dt + except (AttributeError, u.UnitConversionError): + raise NEIError("Invalid dt.") + + @property + def dt_min(self) -> u.Quantity: + return self._dt_min + + @dt_min.setter + def dt_min(self, value: u.Quantity): + if not isinstance(value, u.Quantity): + raise TypeError("dt_min must be a Quantity.") + try: + value = value.to(u.s) + except u.UnitConversionError as exc: + raise u.UnitConversionError("Invalid units for dt_min.") + if ( + hasattr(self, "_dt_input") + and self.dt_input is not None + and self.dt_input < value + ): + raise ValueError("dt_min cannot exceed the inputted time step.") + if hasattr(self, "_dt_max") and self.dt_max < value: + raise ValueError("dt_min cannot exceed dt_max.") + self._dt_min = value + + @property + def dt_max(self) -> u.Quantity: + return self._dt_max + + @dt_max.setter + def dt_max(self, value: u.Quantity): + if not isinstance(value, u.Quantity): + raise TypeError("dt_max must be a Quantity.") + try: + value = value.to(u.s) + except u.UnitConversionError as exc: + raise u.UnitConversionError("Invalid units for dt_max.") + if ( + hasattr(self, "_dt_input") + and self.dt_input is not None + and self.dt_input > value + ): + raise ValueError("dt_max cannot be less the inputted time step.") + if hasattr(self, "_dt_min") and self.dt_min > value: + raise ValueError("dt_min cannot exceed dt_max.") + self._dt_max = value + + @property + def safety_factor(self): + """ + The multiplicative factor that the time step is to be multiplied + by when using an adaptive time step. + """ + return self._safety_factor + + @safety_factor.setter + def safety_factor(self, value): + if not isinstance(value, (float, np.float64, np.integer, int)): + raise TypeError + if 1e-3 <= value <= 1e3: + self._safety_factor = value + else: + raise NEIError("Invalid safety factor.") + + @property + def verbose(self) -> bool: + """ + Return `True` if verbose output during a simulation is + requested, and `False` otherwise. + """ + return self._verbose + + @verbose.setter + def verbose(self, choice: bool): + if choice is True or choice is False: + self._verbose = choice + else: + raise TypeError("Invalid choice for verbose.") + + @u.quantity_input + def in_time_interval(self, time: u.s, buffer: u.s = 1e-9 * u.s): + """ + Return `True` if the `time` is between `time_start - buffer` and + `time_max + buffer` , and `False` otherwise. + + Raises + ------ + TypeError + If `time` or `buffer` is not a `~astropy.units.Quantity` + + `~astropy.units.UnitsError` + If `time` or `buffer` is not in units of time. + + """ + return self.time_start - buffer <= time <= self.time_max + buffer + + @property + def max_steps(self) -> int: + """ + The maximum number of steps that a simulation will be allowed + to take. + """ + return self._max_steps + + @max_steps.setter + def max_steps(self, n: int): + if isinstance(n, (int, np.integer)) and 0 < n <= 1000000: + self._max_steps = n + else: + raise TypeError( + "max_steps must be an integer with 0 < max_steps <= " "1000000" + ) + + @property + def T_e_input(self) -> Union[u.Quantity, Callable]: + """ + The temperature input. + """ + return self._T_e_input + + @T_e_input.setter + def T_e_input(self, T_e: Optional[Union[Callable, u.Quantity]]): + """Set the input electron temperature.""" + if isinstance(T_e, u.Quantity): + try: + T_e = T_e.to(u.K, equivalencies=u.temperature_energy()) + except u.UnitConversionError: + raise u.UnitsError("Invalid electron temperature.") from None + if T_e.isscalar: + self._T_e_input = T_e + self._electron_temperature = lambda time: T_e + else: + if self._time_input is None: + raise TypeError("Must define time_input prior to T_e for an array.") + time_input = self.time_input + if len(time_input) != len(T_e): + raise ValueError("len(T_e) not equal to len(time_input).") + f = interpolate.interp1d( + time_input.value, + T_e.value, + bounds_error=False, + fill_value="extrapolate", + ) + self._electron_temperature = lambda time: f(time.value) * u.K + self._T_e_input = T_e + elif callable(T_e): + if self.time_start is not None: + try: + T_e(self.time_start).to(u.K) + T_e(self.time_max).to(u.K) + except Exception: + raise ValueError("Invalid electron temperature function.") + self._T_e_input = T_e + self._electron_temperature = T_e + elif T_e is None: + self._electron_temperature = lambda: None + else: + raise TypeError("Invalid T_e") + + def electron_temperature(self, time: u.Quantity) -> u.Quantity: + try: + if not self.in_time_interval(time): + warnings.warn( + f"{time} is not in the simulation time interval:" + f"[{self.time_start}, {self.time_max}]. " + f"May be extrapolating temperature." + ) + T_e = self._electron_temperature(time.to(u.s)) + if np.isnan(T_e) or np.isinf(T_e) or T_e < 0 * u.K: + raise NEIError(f"T_e = {T_e} at time = {time}.") + return T_e + except Exception as exc: + raise NEIError( + f"Unable to calculate a valid electron temperature " f"for time {time}" + ) from exc + + @property + def n_input(self) -> u.Quantity: + """The number density factor input.""" + if "H" in self.elements: + return self._n_input + else: + raise ValueError + + @n_input.setter + def n_input(self, n: u.Quantity): + if isinstance(n, u.Quantity): + try: + n = n.to(u.cm ** -3) + except u.UnitConversionError: + raise u.UnitsError("Invalid hydrogen density.") + if n.isscalar: + self._n_input = n + self.hydrogen_number_density = lambda time: n + else: + if self._time_input is None: + raise TypeError("Must define time_input prior to n for an array.") + time_input = self.time_input + if len(time_input) != len(n): + raise ValueError("len(n) is not equal to len(time_input).") + f = interpolate.interp1d( + time_input.value, + n.value, + bounds_error=False, + fill_value="extrapolate", + ) + self._hydrogen_number_density = lambda time: f(time.value) * u.cm ** -3 + self._n_input = n + elif callable(n): + if self.time_start is not None: + try: + n(self.time_start).to(u.cm ** -3) + n(self.time_max).to(u.cm ** -3) + except Exception: + raise ValueError("Invalid number density function.") + self._n_input = n + self._hydrogen_number_density = n + elif n is None: + self._hydrogen_number_density = lambda: None + else: + raise TypeError("Invalid n.") + + def hydrogen_number_density(self, time: u.Quantity) -> u.Quantity: + try: + time = time.to(u.s) + except (AttributeError, u.UnitsError): + raise NEIError("Invalid time in hydrogen_density") + return self._hydrogen_number_density(time) + + @property + def EigenDataDict(self) -> Dict[str, EigenData]: + """ + Return a `dict` containing `~nei.class + """ + return self._EigenDataDict + + @property + def initial(self) -> IonizationStates: + """ + Return the ionization states of the plasma at the beginning of + the simulation. + """ + return self._initial + + @initial.setter + def initial(self, initial_states: Optional[IonizationStates]): + if isinstance(initial_states, IonizationStates): + self._initial = initial_states + self._elements = initial_states.ionic_fractions.keys() # TODO IonizationStates + elif initial_states is None: + self._ionstates = None + else: + raise TypeError("Expecting an IonizationStates instance.") + + @property + def results(self) -> Simulation: + """ + Return the `~nei.classes.Simulation` class instance that + corresponds to the simulation results. + + """ + try: + return self._results + except Exception: + raise AttributeError("The simulation has not yet been performed.") + + @property + def final(self) -> IonizationStates: + """ + Return the ionization states of the plasma at the end of the + simulation. + """ + try: + return self._final + except AttributeError: + raise NEIError("The simulation has not yet been performed.") from None + + def _initialize_simulation(self): + + self._results = Simulation( + initial=self.initial, + n_init=self.hydrogen_number_density(self.time_start), + T_e_init=self.electron_temperature(self.time_start), + max_steps=self.max_steps, + time_start=self.time_start, + ) + self._old_time = self.time_start.to(u.s) + self._new_time = self.time_start.to(u.s) + + def simulate(self) -> Simulation: + """ + Perform a non-equilibrium ionization simulation. + + Returns + ------- + results: ~nei.classes.Simulation + The results from the simulation (which are also stored in + the `results` attribute of the `~nei.classes.NEI` instance + this method was called from. + + """ + + self._initialize_simulation() + + for step in range(self.max_steps): + try: + self.set_timestep() + self.time_advance() + except StopIteration: + break + except Exception as exc: + raise NEIError(f"Unable to complete simulation.") from exc + + self._finalize_simulation() + + # Is there a way to use the inspect package or something similar + # to only return self.results if it is in an expression where + + return self.results + + def _finalize_simulation(self): + self._results._cleanup() + + final_ionfracs = { + element: self.results.ionic_fractions[element][-1, :] + for element in self.elements + } + + self._final = IonizationStates( + inputs=final_ionfracs, + abundances=self.abundances, + n=np.sum( + self.results.number_densities["H"][-1, :] + ), # modify this later?, + T_e=self.results.T_e[-1], + tol=1e-6, + ) + + if not np.isclose(self.time_max / u.s, self.results.time[-1] / u.s): + warnings.warn( + f"The simulation ended at {self.results.time[-1]}, " + f"which is prior to time_max = {self.time_max}." + ) + + def _set_adaptive_timestep(self): + """Adapt the time step.""" + + t = self._new_time if hasattr(self, "_new_time") else self.t_start + + # We need to guess the timestep in order to narrow down what the + # timestep should be. If we are in the middle of a simulation, + # we can use the old timestep as a reasonable guess. If we are + # simulation, then we can either use the inputted timestep or + # estimate it from other inputs. + + dt_guess = ( + self._dt + if self._dt + else self._dt_input + if self._dt_input + else self.time_max / self.max_steps + ) + + # Make sure that dt_guess does not lead to a time that is out + # of the domain. + + dt_guess = dt_guess if t + dt_guess <= self.time_max - t else self.time_max - t + + # The temperature may start out exactly at the boundary of a + # bin, so we check what bin it is in just slightly after to + # figure out which temperature bin the plasma is entering. + + T = self.electron_temperature(t + 1e-9 * dt_guess) + + # Find the boundaries to the temperature bin. + + index = self._get_temperature_index(T.to(u.K).value) + T_nearby = np.array(self._temperature_grid[index - 1 : index + 2]) * u.K + T_boundary = (T_nearby[0:-1] + T_nearby[1:]) / 2 + + # In order to use Brent's method, we must bound the root's + # location. Functions may change sharply or slowly, so we test + # different times that are logarithmically spaced to find the + # first one that is outside of the boundary. + + dt_spread = ( + np.geomspace(1e-9 * dt_guess.value, (self.time_max - t).value, num=100) + * u.s + ) + time_spread = t + dt_spread + T_spread = [self.electron_temperature(time) for time in time_spread] + in_range = [T_boundary[0] <= temp <= T_boundary[1] for temp in T_spread] + + # If all of the remaining temperatures are in the same bin, then + # the temperature will be roughly constant for the rest of the + # simulation. Take one final long time step, unless it exceeds + # dt_max. + + if all(in_range): + new_dt = self.time_max - t + self._dt = new_dt if new_dt <= self.dt_max else self.dt_max + return + + # Otherwise, we need to find the first index in the spread that + # corresponds to a temperature outside of the temperature bin + # for this time step. + + first_false_index = in_range.index(False) + + # We need to figure out if the temperature is dropping so that + # it crosses the low temperature boundary of the bin, or if it + # is rising so that it crosses the high temperature of the bin. + + T_first_outside = self.electron_temperature(time_spread[first_false_index]) + + if T_first_outside >= T_boundary[1]: + boundary_index = 1 + elif T_first_outside <= T_boundary[0]: + boundary_index = 0 + + # Select the values for the time step in the spread just before + # and after the temperature leaves the temperature bin as bounds + # for the root finding method. + + dt_bounds = (dt_spread[first_false_index - 1 : first_false_index + 1]).value + + # Define a function for the difference between the temperature + # and the temperature boundary as a function of the value of the + # time step. + + T_val = lambda dtval: ( + self.electron_temperature(t + dtval * u.s) - T_boundary[boundary_index] + ).value + + # Next we find the root. This method should succeed as long as + # the root is bracketed by dt_bounds. Because astropy.units is + # not fully compatible with SciPy, we temporarily drop units and + # then reattach them. + + try: + new_dt = ( + optimize.brentq(T_val, *dt_bounds, xtol=1e-14, maxiter=1000, disp=True,) + * u.s + ) + except Exception as exc: + raise NEIError(f"Unable to find new dt at t = {t}") from exc + else: + if np.isnan(new_dt.value): + raise NEIError(f"new_dt = {new_dt}") + + # Enforce that the time step is in the interval [dt_min, dt_max]. + + if new_dt < self.dt_min: + new_dt = self.dt_min + elif new_dt > self.dt_max: + new_dt = self.dt_max + + # Store the time step as a private attribute so that it can be + # used in the time advance. + + self._dt = new_dt.to(u.s) + + def set_timestep(self, dt: u.Quantity = None): + """ + Set the time step for the next non-equilibrium ionization time + advance. + + Parameters + ---------- + dt: ~astropy.units.Quantity, optional + The time step to be used for the next time advance. + + Notes + ----- + If `dt` is not `None`, then the time step will be set to `dt`. + + If `dt` is not set and the `adapt_dt` attribute of an + `~nei.classes.NEI` instance is `True`, then this method will + calculate the time step corresponding to how long it will be + until the temperature rises or drops into the next temperature + bin. If this time step is between `dtmin` and `dtmax`, then + + If `dt` is not set and the `adapt_dt` attribute is `False`, then + this method will set the time step as what was inputted to the + `~nei.classes.NEI` class upon instantiation in the `dt` argument + or through the `~nei.classes.NEI` class's `dt_input` attribute. + + Raises + ------ + ~nei.classes.NEIError + If the time step cannot be set, for example if the `dt` + argument is invalid or the time step cannot be adapted. + + """ + + if dt is not None: + # Allow the time step to set as an argument to this method. + try: + dt = dt.to(u.s) + except Exception as exc: + raise NEIError(f"{dt} is not a valid time step.") from exc + finally: + self._dt = dt + elif self.adapt_dt: + try: + self._set_adaptive_timestep() + except Exception as exc: + raise NEIError("Unable to adapt the time step.") from exc + elif self.dt_input is not None: + self._dt = self.dt_input + else: + raise NEIError("Unable to set the time step.") + + self._old_time = self._new_time + self._new_time = self._old_time + self._dt + + if self._new_time > self.time_max: + self._new_time = self.time_max + self._dt = self._new_time - self._old_time + + def time_advance(self): + """ + Advance the simulation by one time step. + """ + # TODO: Expand docstring and include equations! + + # TODO: Fully implement units into this. + + step = self.results._index + T_e = self.results.T_e[step - 1].value + n_e = self.results.n_e[step - 1].value # set average + dt = self._dt.value + + if self.verbose: + print(f"step={step} T_e={T_e} n_e={n_e} dt={dt}") + + new_ionic_fractions = {} + + try: + for elem in self.elements: + nstates = self.results.nstates[elem] + f0 = self.results._ionic_fractions[elem][self.results._index - 1, :] + + evals = self.EigenDataDict[elem].eigenvalues(T_e=T_e) + evect = self.EigenDataDict[elem].eigenvectors(T_e=T_e) + evect_inverse = self.EigenDataDict[elem].eigenvector_inverses(T_e=T_e) + + diagonal_evals = np.zeros((nstates, nstates), dtype=np.float64) + for ii in range(0, nstates): + diagonal_evals[ii, ii] = np.exp(evals[ii] * dt * n_e) + + matrix_1 = np.dot(diagonal_evals, evect) + matrix_2 = np.dot(evect_inverse, matrix_1) + + ft = np.dot(f0, matrix_2) + + # Due to truncation errors in the solutions in the + # eigenvalues and eigenvectors, there is a chance that + # very slightly negative ionic fractions will arise. + # These are not natural and will make the code grumpy. + # For these reasons, the ionic fractions will be very + # slightly unnormalized. We set negative ionic + # fractions to zero and renormalize. + + ft[np.where(ft < 0.0)] = 0.0 + new_ionic_fractions[elem] = ft / np.sum(ft) + + except Exception as exc: + raise NEIError(f"Unable to do time advance for {elem}") from exc + else: + + new_time = self.results.time[self.results._index - 1] + self._dt + self.results._assign( + new_time=new_time, + new_ionfracs=new_ionic_fractions, + new_T_e=self.electron_temperature(new_time), + new_n=self.hydrogen_number_density(new_time), + ) + + if new_time >= self.time_max or np.isclose(new_time.value, self.time_max.value): + raise StopIteration + + def save(self, filename: str = "nei.h5"): + """ + Save the `~nei.classes.NEI` instance to an HDF5 file. Not + implemented. + """ + raise NotImplementedError + + def index_to_time(self, index): + """ + Returns the time value or array given the index/indices + + Parameters + ------ + index: array-like + A value or array of values representing the index of + the time array created by the simulation + + Returns + ------ + get_time: astropy.units.Quantity + The time value associated with index input(s) + """ + + return self.results.time[index] + + def time_to_index(self, time): + """ + Returns the closest index value or array for the given time(s) + + Parameters + ------ + time: array-like, + A value or array of values representing the values of + the time array created by the simulation + + Returns + ------ + index: int or array-like, + The index value associated with the time input(s) + """ + index = (np.abs(self.results.time.value - time)).argmin() + + return index diff --git a/plasmapy_nei/nei/tests/test_nei.py b/plasmapy_nei/nei/tests/test_nei.py index 1ae83bc..1f53944 100644 --- a/plasmapy_nei/nei/tests/test_nei.py +++ b/plasmapy_nei/nei/tests/test_nei.py @@ -1,6 +1,306 @@ """Tests for non-equilibrium ionization modeling classes.""" +import astropy.units as u +from plasmapy.atomic import IonizationStates, particle_symbol +from plasmapy_nei.nei import NEI +from plasmapy_nei.eigen import EigenData +import numpy as np +import pytest -def test_import(): - """Minimal test that importing this works.""" - from plasmapy_nei.nei import NEI +inputs_dict = {"H": [0.9, 0.1], "He": [0.5, 0.3, 0.2]} +abundances = {"H": 1, "He": 0.1} + +time_array = np.array([0, 800]) * u.s +T_e_array = np.array([4e4, 6e4]) * u.K +n_array = np.array([1e9, 5e8]) * u.cm ** -3 + + +tests = { + "basic": { + "inputs": inputs_dict, + "abundances": abundances, + "T_e": T_e_array, + "n": n_array, + "time_input": time_array, + "time_start": 0 * u.s, + "time_max": 800 * u.s, + "max_steps": 1, + "dt": 800 * u.s, + "adapt_dt": False, + "verbose": True, + }, + "T_e constant": { + "inputs": inputs_dict, + "abundances": abundances, + "T_e": 1 * u.MK, + "n": n_array, + "time_input": time_array, + "time_start": 0 * u.s, + "time_max": 800 * u.s, + "dt": 100 * u.s, + "max_steps": 2, + "adapt_dt": False, + "verbose": True, + }, + "n_e constant": { + "inputs": inputs_dict, + "abundances": abundances, + "T_e": T_e_array, + "n": 1e9 * u.cm ** -3, + "time_input": time_array, + "time_start": 0 * u.s, + "time_max": 800 * u.s, + "max_steps": 2, + "adapt_dt": False, + "dt": 100 * u.s, + "verbose": True, + }, + "T_e function": { + "inputs": inputs_dict, + "abundances": abundances, + "T_e": lambda time: 1e4 * (1 + time / u.s) * u.K, + "n": 1e15 * u.cm ** -3, + "time_max": 800 * u.s, + "max_steps": 2, + "dt": 100 * u.s, + "adapt_dt": False, + "verbose": True, + }, + "n function": { + "inputs": inputs_dict, + "abundances": abundances, + "T_e": 6e4 * u.K, + "n": lambda time: 1e9 * (1 + time / u.s) * u.cm ** -3, + "time_start": 0 * u.s, + "time_max": 800 * u.s, + "adapt_dt": False, + "dt": 200 * u.s, + "verbose": True, + "max_steps": 4, + }, + "equil test cool": { + "inputs": ["H", "He", "N"], + "abundances": {"H": 1, "He": 0.1, "C": 1e-4, "N": 1e-4, "O": 1e-4, "Fe": 1e-4}, + "T_e": 10001.0 * u.K, + "n": 1e13 * u.cm ** -3, + "time_max": 2e6 * u.s, + "tol": 1e-9, + "adapt_dt": False, + "dt": 5e5 * u.s, + "max_steps": 4, + "verbose": True, + }, + "equil test hot": { + "inputs": ["H", "He", "C"], + "abundances": { + "H": 1, + "He": 0.1, + "C": 1e-4, + "N": 1e-4, + "O": 1e-4, + "Fe": 1e-4, + "S": 2e-6, + }, + "T_e": 7e6 * u.K, + "n": 1e9 * u.cm ** -3, + "time_max": 1e8 * u.s, + "dt": 5e7 * u.s, + "max_steps": 3, + "adapt_dt": False, + "verbose": True, + }, + "equil test start far out of equil": { + "inputs": { + "H": [0.99, 0.01], + "He": [0.5, 0.0, 0.5], + "O": [0.2, 0, 0.2, 0, 0.2, 0, 0.2, 0, 0.2], + }, + "abundances": {"H": 1, "He": 0.1, "O": 1e-4}, + "T_e": 3e6 * u.K, + "n": 1e9 * u.cm ** -3, + "dt": 1e6 * u.s, + "time_start": 0 * u.s, + "time_max": 1e6 * u.s, + "adapt_dt": False, + "verbose": True, + "max_steps": 2, + }, + "adapt dt": { + "inputs": ["H", "He"], + "abundances": {"H": 1, "He": 0.1}, + "T_e": lambda t: u.K * (1e6 + 1.3e4 * np.sin(t.value)), + "n": 1e10 * u.cm ** -3, + "max_steps": 300, + "time_start": 0 * u.s, + "time_max": 2 * np.pi * u.s, + "adapt_dt": True, + }, +} + +test_names = list(tests.keys()) + + +class TestNEI: + @classmethod + def setup_class(cls): + cls.instances = {} + + @pytest.mark.parametrize("test_name", test_names) + def test_instantiate(self, test_name): + try: + instance = NEI(**tests[test_name]) + self.instances[test_name] = instance + except Exception as exc: + raise Exception(f"Problem with test {test_name}") from exc + + @pytest.mark.parametrize("test_name", test_names) + def test_time_start(self, test_name): + instance = self.instances[test_name] + if "time_start" in tests[test_name].keys(): + assert tests[test_name]["time_start"] == instance.time_start + elif "time_input" in tests[test_name].keys(): + assert tests[test_name]["time_input"].min() == instance.time_start + else: + assert instance.time_start == 0 * u.s + + @pytest.mark.parametrize("test_name", test_names) + def test_time_max(self, test_name): + instance = self.instances[test_name] + if "time_max" in tests[test_name].keys(): + assert tests[test_name]["time_max"] == instance.time_max + + @pytest.mark.parametrize("test_name", test_names) + def test_initial_type(self, test_name): + instance = self.instances[test_name] + assert isinstance(instance.initial, IonizationStates) + + @pytest.mark.parametrize("test_name", test_names) + def test_n_input(self, test_name): + actual = self.instances[test_name].n_input + expected = tests[test_name]["n"] + if isinstance(expected, u.Quantity) and not expected.isscalar: + assert all(expected == actual) + else: + assert expected == actual + + @pytest.mark.parametrize("test_name", test_names) + def test_T_e_input(self, test_name): + actual = self.instances[test_name].T_e_input + expected = tests[test_name]["T_e"] + if isinstance(expected, u.Quantity) and not expected.isscalar: + assert all(expected == actual) + else: + assert expected == actual + + @pytest.mark.parametrize("test_name", test_names) + def test_electron_temperature(self, test_name): + instance = self.instances[test_name] + T_e_input = instance.T_e_input + if isinstance(T_e_input, u.Quantity): + if T_e_input.isscalar: + assert instance.electron_temperature(instance.time_start) == T_e_input + assert instance.electron_temperature(instance.time_max) == T_e_input + else: + for time, T_e in zip(instance.time_input, T_e_input): + try: + T_e_func = instance.electron_temperature(time) + except Exception: + raise ValueError("Unable to find T_e from electron_temperature") + + assert np.isclose(T_e.value, T_e_func.value) + if callable(T_e_input): + assert instance.T_e_input( + instance.time_start + ) == instance.electron_temperature(instance.time_start) + + @pytest.mark.parametrize( + "test_name", + [ + test_name + for test_name in test_names + if isinstance(tests[test_name]["inputs"], dict) + ], + ) + def test_initial_ionfracs(self, test_name): + original_inputs = tests[test_name]["inputs"] + original_elements = original_inputs.keys() + + for element in original_elements: + assert np.allclose( + original_inputs[element], + self.instances[test_name].initial.ionic_fractions[ + particle_symbol(element) + ], + ) + + @pytest.mark.parametrize("test_name", test_names) + def test_simulate(self, test_name): + try: + self.instances[test_name].simulate() + except Exception as exc: + raise ValueError(f"Unable to simulate for test: {test_name}") from exc + + @pytest.mark.parametrize("test_name", test_names) + def test_simulation_end(self, test_name): + instance = self.instances[test_name] + time = instance.results.time + end_time = time[-1] + max_steps = instance.max_steps + + if np.isnan(end_time.value): + raise Exception("End time is NaN.") + + got_to_max_steps = len(time) == instance.max_steps + 1 + got_to_time_max = np.isclose(time[-1].value, instance.time_max.value) + + if time.isscalar or len(time) == 1: + raise Exception(f"The only element in results.time is {time}") + + if not got_to_max_steps and not got_to_time_max: + print(f"time = {time}") + print(f"max_steps = {max_steps}") + print(f"time_max = {instance.time_max}") + raise Exception("Problem with end time.") + + @pytest.mark.parametrize( + "test_name", [test_name for test_name in test_names if "equil" in test_name] + ) + def test_equilibration(self, test_name): + """ + Test that equilibration works. + """ + instance = self.instances[test_name] + T_e = instance.T_e_input + assert ( + isinstance(T_e, u.Quantity) and T_e.isscalar + ), "This test can only be used for cases where T_e is constant." + equil_dict = instance.equil_ionic_fractions(T_e) + for element in instance.elements: + assert np.allclose( + equil_dict[element], instance.results.ionic_fractions[element][-1, :] + ) + + @pytest.mark.parametrize("test_name", test_names) + def test_initial_results(self, test_name): + initial = self.instances[test_name].initial + results = self.instances[test_name].results + # Make sure that the elements are equal to each other + assert initial.ionic_fractions.keys() == results.ionic_fractions.keys() + assert initial.abundances == results.abundances + for elem in initial.ionic_fractions.keys(): # TODO: enable initial.elements + assert np.allclose( + results.ionic_fractions[elem][0, :], initial.ionic_fractions[elem] + ) + + @pytest.mark.parametrize("test_name", test_names) + def test_final_results(self, test_name): + # initial = self.instances[test_name].initial + final = self.instances[test_name].final + results = self.instances[test_name].results + # Make sure the elements are equal to each other + assert final.ionic_fractions.keys() == results.ionic_fractions.keys() + assert final.abundances == results.abundances + for elem in final.ionic_fractions.keys(): + assert np.allclose( + results.ionic_fractions[elem][-1, :], final.ionic_fractions[elem] + ) diff --git a/setup.cfg b/setup.cfg index 0d2cd85..e261197 100644 --- a/setup.cfg +++ b/setup.cfg @@ -17,6 +17,7 @@ setup_requires = install_requires = astropy plasmapy + h5py [options.entry_points] diff --git a/setup.py b/setup.py index afca96e..6fdd635 100755 --- a/setup.py +++ b/setup.py @@ -20,5 +20,6 @@ "write_to": os.path.join("plasmapy_nei", "version.py"), "write_to_template": VERSION_TEMPLATE, }, - install_requires=["plasmapy>=0.3.1", "numpy>=1.16", "astropy>=3.2"], + install_requires=["plasmapy>=0.3.1", "numpy>=1.16", "astropy>=3.2", "h5py"], + include_package_data=True, ) diff --git a/tox.ini b/tox.ini index 219de85..155c403 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - py{37,38}-test + py{37}-test build_docs codestyle isolated_build = True @@ -27,6 +27,7 @@ description = deps = # The following indicates which extras_require from setup.cfg will be installed plasmapy + h5py extras = test alldeps: all