Source code for pynucastro.networks.python_network

"""Support modules to write a pure python reaction network ODE source."""

import io
import sys
import types
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
from scipy.integrate import solve_ivp

from pynucastro.constants import constants
from pynucastro.networks.rate_collection import RateCollection
from pynucastro.nucdata import Composition
from pynucastro.rates import ApproximateRate, ModifiedRate
from pynucastro.screening import get_screening_func, get_screening_pair_set


[docs] class PythonNetwork(RateCollection): """A pure python reaction network. This can create a python module as a file that contains everything needed to evaluate the reaction rates and construct the righthand side and Jacobian functions. """
[docs] def full_ydot_string(self, nucleus, indent=""): """Construct a string containing the python code for dY(nucleus)/dt by considering every reaction that involves nucleus, adding terms that result in its creation and subtracting terms representing its destruction. Parameters ---------- nucleus : Nucleus The nucleus we are constructing the time derivative of. indent : str A string that will be prepended to each line of the output, typically consisting of just spaces representing the amount of indent desired. Returns ------- str """ ostr = "" if not self.nuclei_consumed[nucleus] + self.nuclei_produced[nucleus]: # this captures an inert nucleus ostr += f"{indent}dYdt[j{nucleus.raw}] = 0.0\n\n" else: ostr += f"{indent}dYdt[j{nucleus.raw}] = (\n" for ipair, rp in enumerate(self.nuclei_rate_pairs[nucleus]): # when we are working with rate pairs, one or more of the # rates may be missing. We also have not clearly separated # them into creation / destruction, so we'll figure that out rlist = [r for r in [rp.forward, rp.reverse] if r is not None] ostr += f"{indent} " if len(rlist) > 1: ostr += "( " for rate in rlist: c_reac = rate.reactant_count(nucleus) c_prod = rate.product_count(nucleus) c = c_prod - c_reac if c == 1: ostr += f"+{rate.ydot_string_py()} " elif c == -1: ostr += f"-{rate.ydot_string_py()} " else: ostr += f"+ {c}*{rate.ydot_string_py()} " if len(rlist) > 1: ostr += ")" if ipair < len(self.nuclei_rate_pairs[nucleus]) - 1: ostr += " +" ostr = ostr.rstrip() + "\n" ostr += f"{indent} )\n\n" return ostr
[docs] def full_jacobian_element_string(self, ydot_i_nucleus, y_j_nucleus, indent=""): """Construct a string containing the python code for a single element of the Jacobian, dYdot(ydot_i_nucleus)/dY(y_j_nucleus) Parameters ---------- ydot_i_nucleus: Nucleus The nucleus representing the dY/dt term we are differentiating. This is the row of the Jacobian. ydot_j_nucleus: Nucleus The nucleus we are differentiating with respect to. This is the column of the Jacobian. indent : str A string that will be prepended to each line of the output, typically consisting of just spaces representing the amount of indent desired. Returns ------- str """ # this is the jac(i,j) string idx_str = f"jac[j{ydot_i_nucleus.raw}, j{y_j_nucleus.raw}]" ostr = "" if not self.nuclei_consumed[ydot_i_nucleus] + self.nuclei_produced[ydot_i_nucleus]: # this covers the case where a nucleus is not created or # destroyed in the entire network, but is just passive ostr += f"{indent}{idx_str} = 0.0\n\n" else: ostr += f"{indent}{idx_str} = (\n" rate_terms_str = "" for r in self.nuclei_consumed[ydot_i_nucleus]: c = r.reactant_count(ydot_i_nucleus) jac_str = r.jacobian_string_py(y_j_nucleus) if jac_str == "": continue if c == 1: rate_terms_str += f"{indent} -{jac_str}\n" else: rate_terms_str += f"{indent} -{c}*{jac_str}\n" for r in self.nuclei_produced[ydot_i_nucleus]: c = r.product_count(ydot_i_nucleus) jac_str = r.jacobian_string_py(y_j_nucleus) if jac_str == "": continue if c == 1: rate_terms_str += f"{indent} +{jac_str}\n" else: rate_terms_str += f"{indent} +{c}*{jac_str}\n" if rate_terms_str == "": return "" ostr += rate_terms_str ostr += f"{indent} )\n\n" return ostr
[docs] def screening_string(self, indent=""): """Create a string containing the python code that sets up the screening (PlasmaState) and calls the screening function on every set of reactants in our network. This computes log(screening) terms and store them in local variables. Parameters ---------- indent : str A string that will be prepended to each line of the output, typically consisting of just spaces representing the amount of indent desired. Returns ------- str """ ostr = "" screening_pair_set = get_screening_pair_set(self.get_rates()) # Initialize log_scor to 0.0 for n1, n2 in screening_pair_set: screen_var = f"log_scor_{n1}_{n2}" ostr += f"{indent}{screen_var} = 0.0\n" # Check if we're doing screening, return early if not screening if not self.do_screening: return ostr ostr += "\n" ostr += f"{indent}if screen_func is not None:\n" indent += " " ostr += f"{indent}plasma_state = PlasmaState(T, rho, Y, Z)\n\n" for n1, n2 in screening_pair_set: screen_var = f"log_scor_{n1}_{n2}" ostr += f"{indent}scn_fac = ScreenFactors({n1.Z}, {n1.A}, {n2.Z}, {n2.A})\n" ostr += f"{indent}{screen_var} = screen_func(plasma_state, scn_fac)\n" return ostr
[docs] def rates_string(self, indent=""): """Create the python code that calls the evaluation function for each rate. Typically this is of the form ``name(rate_eval, ...)``, where ``rate_eval`` is a container class in the output network that stores the rate values. This also calls ``screening_string()`` after the main rates are evaluated but before the approximate rates are constructed. Parameters ---------- indent : str A string that will be prepended to each line of the output, typically consisting of just spaces representing the amount of indent desired. Returns ------- str """ def format_rate_call(r, use_tf=True): args = ["rate_eval"] if use_tf: args.append("tf") else: args.append("T") if r.rate_eval_needs_rho: args.append("rho=rho") if r.rate_eval_needs_comp: args.append("Y=Y") if r.screening_pairs: screen_terms = [f"log_scor_{r1}_{r2}" for r1, r2 in r.screening_pairs] args.append("log_scor=" + " + ".join(screen_terms)) return f"{indent}{r.fname}({', '.join(args)})\n" ostr = "" # Precompute screening terms. Note here we compute log_screening ostr += self.screening_string(indent=indent) ostr += "\n" ostr += f"{indent}# reaclib rates\n" for r in self.reaclib_rates: ostr += format_rate_call(r) if self.tabular_rates: ostr += f"\n{indent}# tabular rates\n" for r in self.tabular_rates: ostr += format_rate_call(r, use_tf=False) if self.temperature_tabular_rates: ostr += f"\n{indent}# temperature tabular rates\n" for r in self.temperature_tabular_rates: ostr += format_rate_call(r, use_tf=False) if self.starlib_rates: ostr += f"\n{indent}# starlib rates\n" for r in self.starlib_rates: ostr += format_rate_call(r, use_tf=False) if self.custom_rates: ostr += f"\n{indent}# custom rates\n" for r in self.custom_rates: ostr += format_rate_call(r) # modified rates will have their own screening, # either using the original rate or any modified # form. Therefore we call them before applying # screening factors. if self.modified_rates: ostr += f"\n{indent}# modified rates\n" for r in self.modified_rates: ostr += format_rate_call(r) # Derived rate should go last (before approx rates) # since the inverse rate should be evaluated first. if self.derived_rates: ostr += f"\n{indent}# derived rates\n" for r in self.derived_rates: ostr += format_rate_call(r) if self.approx_rates: ostr += f"\n{indent}# approximate rates\n" for r in self.approx_rates: ostr += format_rate_call(r) return ostr
def _write_network(self, outfile: str | Path = None): """Create the entire python code representing this network. This includes defining the nuclei properties, evaluating partition functions, defining the ``RateEval`` class, defining the tabular rate data, writing the functions that evaluate each of the rates, and then calling the functions that construct the RHS and Jacobian. Parameters ---------- outfile : str, Path The name of the output file to write to. If this is ``None``, then the output is written to stdout """ # pylint: disable=arguments-differ if outfile is None: of = sys.stdout close_file = False elif hasattr(outfile, "write"): # already a file-like object of = outfile close_file = False else: outfile = Path(outfile) of = outfile.open("w") close_file = True indent = 4*" " of.write("import numba\n") of.write("import numpy as np\n") of.write("from pynucastro.constants import constants\n") of.write("from numba.experimental import jitclass\n\n") of.write("from pynucastro.rates import (TableIndex, TableInterpolator, TabularRate,\n") of.write(" TempTableInterpolator, TemperatureTabularRate,\n") of.write(" Tfactors)\n") of.write("from pynucastro.screening import PlasmaState, ScreenFactors\n\n") # integer keys for i, n in enumerate(self.unique_nuclei): of.write(f"j{n.raw} = {i}\n") of.write(f"nnuc = {len(self.unique_nuclei)}\n\n") # nuclei properties of.write("A = np.zeros((nnuc), dtype=np.int32)\n\n") for n in self.unique_nuclei: of.write(f"A[j{n.raw}] = {n.A}\n") of.write("\n") of.write("Z = np.zeros((nnuc), dtype=np.int32)\n\n") for n in self.unique_nuclei: of.write(f"Z[j{n.raw}] = {n.Z}\n") # we'll compute the masses here in erg of.write("\n") of.write("# masses in ergs\n") of.write("mass = np.zeros((nnuc), dtype=np.float64)\n\n") for n in self.unique_nuclei: mass = n.A_nuc * constants.m_u_MeV_C18 * constants.MeV2erg of.write(f"mass[j{n.raw}] = {mass}\n") of.write("\n") of.write("names = []\n") for n in self.unique_nuclei: name = n.short_spec_name if name != "n": name = name.capitalize() of.write(f"names.append(\"{name}\")\n") of.write("\n") of.write("def to_composition(Y):\n") of.write(f'{indent}''"""Convert an array of molar fractions to a Composition object."""\n') of.write(f'{indent}'"from pynucastro import Composition, Nucleus\n") of.write(f'{indent}'"nuclei = [Nucleus.from_cache(name) for name in names]\n") of.write(f'{indent}'"comp = Composition(nuclei)\n") of.write(f'{indent}'"for i, nuc in enumerate(nuclei):\n") of.write(f'{indent*2}'"comp.X[nuc] = Y[i] * A[i]\n") of.write(f'{indent}'"return comp\n\n") of.write("\n") of.write("def energy_release(dY):\n") of.write(f'{indent}''"""return the energy release in erg/g (/s if dY is actually dY/dt)"""\n') of.write(f'{indent}'"enuc = 0.0\n") of.write(f'{indent}'"for i, y in enumerate(dY):\n") of.write(f'{indent*2}'"enuc += y * mass[i]\n") of.write(f'{indent}'"enuc *= -1*constants.N_A\n") of.write(f'{indent}'"return enuc\n\n") # partition function data (if needed) nuclei_pfs = self.get_nuclei_needing_partition_functions() for n in nuclei_pfs: of.write(f"{n}_temp_array = np.array({list(n.partition_function.T9_points)})\n") of.write(f"{n}_log_pf_array = np.array({list(n.partition_function.log_pf_data)})\n") of.write("\n") # rate_eval class of.write("@jitclass([\n") for r in self.all_rates: of.write(f'{indent}("{r.fname}", numba.float64),\n') of.write("])\n") of.write("class RateEval:\n") of.write(f"{indent}def __init__(self):\n") for r in self.all_rates: of.write(f"{indent*2}self.{r.fname} = np.nan\n") of.write("\n") # tabular rate data if self.tabular_rates: of.write("# note: we cannot make the TableInterpolator global, since numba doesn't like global jitclass\n") for r in self.tabular_rates: of.write(f"# load data for {r.rid}\n") of.write(f"{r.fname}_info = (\n") of.write(f" {r.table_rhoy_lines}, # table_rhoy_lines\n") of.write(f" {r.table_temp_lines}, # table_temp_lines\n") of.write(" # tabular_data_table\n") of.write(f" np.array({r.tabular_data_table.tolist()})\n") of.write(")\n\n") # temperature tabular / starlib rate data if self.temperature_tabular_rates + self.starlib_rates: of.write("# note: we cannot make the TempTableInterpolator global, since numba doesn't like global jitclass\n") for r in self.temperature_tabular_rates + self.starlib_rates: of.write(f"# temperature / rate tabulation for {r.rid}\n") log_temp_str = np.array2string(r.log_t9_data, max_line_width=70, precision=17, separator=", ") of.write(f"{r.fname}_log_t9_data = np.array(\n") for line in log_temp_str.split("\n"): of.write(f" {line}\n") of.write(" )\n") log_rate_str = np.array2string(r.log_rate_data, max_line_width=70, precision=17, separator=", ") of.write(f"{r.fname}_log_rate_data = np.array(\n") for line in log_rate_str.split("\n"): of.write(f" {line}\n") of.write(" )\n") of.write(f"{r.fname}_info = ({r.fname}_log_t9_data, {r.fname}_log_rate_data)\n\n") # Ye helper function of.write("@numba.njit()\n") of.write("def ye(Y):\n") of.write(f"{indent}return np.sum(Z * Y)/np.sum(A * Y)\n\n") # the functions to evaluate the temperature dependence of the rates _rate_func_written = [] for r in self.rates: if isinstance(r, ApproximateRate): # write out the function string for all of the rates we depend on for cr in r.get_child_rates(): if cr in _rate_func_written: continue of.write(cr.function_string_py()) _rate_func_written.append(cr) # now write out the function that computes the # approximate rate of.write(r.function_string_py()) elif isinstance(r, ModifiedRate): orig_rate = r.original_rate if r in _rate_func_written: continue of.write(orig_rate.function_string_py()) _rate_func_written.append(orig_rate) # now write out the function that computes the # modified rate of.write(r.function_string_py()) _rate_func_written.append(r) else: if r in _rate_func_written: continue of.write(r.function_string_py()) _rate_func_written.append(r) # the rhs() function of.write("def rhs(t, Y, rho, T, screen_func=None):\n") of.write(f"{indent}return rhs_eq(t, Y, rho, T, screen_func)\n\n") of.write("@numba.njit()\n") of.write("def rhs_eq(t, Y, rho, T, screen_func):\n\n") # get the rates of.write(f"{indent}tf = Tfactors(T)\n") of.write(f"{indent}rate_eval = RateEval()\n\n") of.write(self.rates_string(indent=indent)) of.write("\n") of.write(f"{indent}dYdt = np.zeros((nnuc), dtype=np.float64)\n\n") # now make the RHSs for n in self.unique_nuclei: of.write(self.full_ydot_string(n, indent=indent)) of.write(f"{indent}return dYdt\n\n") # the jacobian() function of.write("def jacobian(t, Y, rho, T, screen_func=None):\n") of.write(f"{indent}return jacobian_eq(t, Y, rho, T, screen_func)\n\n") of.write("@numba.njit()\n") of.write("def jacobian_eq(t, Y, rho, T, screen_func):\n\n") # get the rates of.write(f"{indent}tf = Tfactors(T)\n") of.write(f"{indent}rate_eval = RateEval()\n\n") of.write(self.rates_string(indent=indent)) of.write("\n") of.write(f"{indent}jac = np.zeros((nnuc, nnuc), dtype=np.float64)\n\n") # now fill each Jacobian element for n_i in self.unique_nuclei: for n_j in self.unique_nuclei: of.write(self.full_jacobian_element_string(n_i, n_j, indent=indent)) of.write(f"{indent}return jac\n") if close_file: of.close()
[docs] def integrate_network(self, tmax, rho, T, Y0=None, screen_method=None, initial_comp="uniform", rtol=1e-8, atol=1e-8): """Integrate the network to tmax given (rho, T, Y0) using SciPy's solve_ivp() with BDF method. Parameters ---------- tmax: float final integration time. rho : float density used to integrate the network T : float temperature used to integrate the network Y0 : numpy.ndarray initial molar abundance of the nuclei. If not provided, the initial composition is initialized according to `initial_comp` screen_method : str name of the screening function used to evaluate rates when integrating the network. Valid choices are: `screen5`, `chugunov_2007`, `chugunov_2009`, `potekhin_1998`, and `debye_huckel`. If `None`, no screening is applied. initial_comp : str different modes to use to set up the initial composition if Y0 is None. Valid choices are: `uniform`, `random`, and `solar`. rtol : float relative tolerance for SciPy's solve_ivp() atol : float absolute tolerance for SciPy's solve_ivp() Returns ------- sol : object Solution returned by :func:`scipy.integrate.solve_ivp`. """ # Write the network module as a string f = io.StringIO() self.write_network(outfile=f) network_code = f.getvalue() # Create a new in-memory module called `network` network = types.ModuleType("network") # Execute the code inside the module namespace exec(network_code, network.__dict__) # pylint: disable=exec-used # Get RHS and Jacobian. Use getattr to avoid pylint warning. rhs = getattr(network, "rhs") jacobian = getattr(network, "jacobian") # Get the appropriate screening function screen_func = get_screening_func(screen_method) # Setup the initial molar abundance if Y0 is None if Y0 is None: if initial_comp is None: raise ValueError("Valid initial compositions are ['uniform', 'random', 'solar']") comp = Composition(self.unique_nuclei, init=initial_comp) ys = comp.get_molar() Y0 = np.array([ys[nuc] for nuc in self.unique_nuclei]) # Integrate using SciPy's solve_ivp() using BDF method -- good for stiff system. sol = solve_ivp(rhs, [0, tmax], Y0, method="BDF", dense_output=True, args=(rho, T, screen_func), rtol=rtol, atol=atol, jac=jacobian) return sol
[docs] def plot_evolution(self, sol, tmin=None, tmax=None, ymin=None, ymax=None, size=(800, 600), dpi=100, X_cutoff_value=None, label_size=14, legend_size=10, three_level_style=False, outfile=None): """Plot the time evolution of nuclei mass fractions using the solution returned by SciPy's solve_ivp(). Parameters ---------- sol : object Solution object returned by :func:`scipy.integrate.solve_ivp`. The array `sol.y` is assumed to contain the molar abundances, `Y_i`, ordered consistently with `unique_nuclei`. tmin : float Minimum time shown on the x-axis. If `None`, the first value of `sol.t` is used. tmax : float Maximum time shown on the x-axis. If `None`, the last value of `sol.t` is used. ymin : float Minimum mass fraction shown on the y-axis. If `None`, use the Matplotlib autoscaled value. ymax : float Maximum mass fraction shown on the y-axis. If `None`, use the Matplotlib autoscaled value. The autoscaled value is capped at 1.2 dpi : int dots per inch used with size to set output image size size : (tuple, list) (width, height) of the plot in pixels X_cutoff_value : float Minimum peak mass fraction required for a nucleus to be plotted. label_size : int Font size for axis labels. legend_size : int Font size for the legend. three_level_style : bool If `True`, use three-level linestyle and linewidth based on the peak mass fraction to help distinguish different curves. If `False`, all curves use the same line style and linewidth. outfile : str output name of the plot (extension determines the type) Returns ------- matplotlib.figure.Figure """ fig, ax = plt.subplots(figsize=(size[0]/dpi, size[1]/dpi)) for i, nuc in enumerate(self.unique_nuclei): X = sol.y[i, :] * nuc.A max_X = X.max() if X_cutoff_value is None and ymin is not None: X_cutoff_value = ymin if X_cutoff_value is not None and max_X <= X_cutoff_value: continue # Set linestyle and linewidth lw = 1.5 ls = "-" if three_level_style: # Set 3 levels of visual levels depending on maximum mass fraction lw = 1 ls = "--" if max_X > 0.5: lw = 2.5 ls = "-" elif max_X > 0.01: lw = 1.5 ls = "-" ax.loglog(sol.t, X, lw=lw, ls=ls, label=rf"X(${nuc.pretty}$)") if tmin is None: tmin = sol.t[0] if tmax is None: tmax = sol.t[-1] # Auto set number of legend column ncol = max(1, len(ax.lines) // 8 + 1) ax.set_xlim(tmin, tmax) ax.set_ylim(ymin, ymax) cur_ymin, cur_ymax = ax.get_ylim() if ymax is None and cur_ymax > 1.2: # Make sure the autoscaled ymax is not greater than 1.2 cur_ymax = 1.2 ax.set_ylim(cur_ymin, cur_ymax) ax.set_xlabel("time [s]", fontsize=label_size) ax.set_ylabel("X", fontsize=label_size) ax.legend(loc="best", fontsize=legend_size, ncol=ncol) ax.grid(ls=":") fig.tight_layout() if outfile is not None: fig.savefig(outfile, dpi=dpi) return fig