Source code for pynucastro.rates.rate

"""
Classes and methods to interface with files storing rate data.
"""

import io
import math
import os
import warnings
from collections import Counter
from enum import Enum

import matplotlib.pyplot as plt
import numpy as np

try:
    import numba
    from numba.experimental import jitclass
except ImportError:
    numba = None
    import functools

    # no-op jitclass placeholder
    def jitclass(cls_or_spec=None, spec=None):
        if (cls_or_spec is not None and
            spec is None and
                not isinstance(cls_or_spec, type)):
            # Used like
            # @jitclass([("x", intp)])
            # class Foo:
            #     ...
            spec = cls_or_spec
            cls_or_spec = None

        def wrap(cls):
            # this copies the function name and docstring to the wrapper function
            @functools.wraps(cls)
            def wrapper(*args, **kwargs):
                return cls(*args, **kwargs)
            return wrapper

        if cls_or_spec is None:
            return wrap
        return wrap(cls_or_spec)


from pynucastro.constants import constants
from pynucastro.nucdata import Nucleus, UnsupportedNucleus

_pynucastro_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
_pynucastro_rates_dir = os.path.join(_pynucastro_dir, 'library')
_pynucastro_tabular_dir = os.path.join(_pynucastro_rates_dir, 'tabular')
_pynucastro_suzuki_dir = os.path.join(_pynucastro_tabular_dir, 'suzuki')
_pynucastro_langanke_dir = os.path.join(_pynucastro_tabular_dir, 'langanke')


[docs] def get_rates_dir(): return _pynucastro_rates_dir
[docs] def get_tabular_dir(): return _pynucastro_tabular_dir
[docs] class RateFileError(Exception): """An error occurred while trying to read a Rate from a file."""
[docs] def load_rate(rfile=None): """Try to load a rate of any type. :raises: :class:`.RateFileError`, :class:`.UnsupportedNucleus` """ rate: Rate try: rate = TabularRate(rfile=rfile) except (RateFileError, UnsupportedNucleus): rate = ReacLibRate(rfile=rfile) return rate
def _find_rate_file(ratename): """locate the Reaclib or tabular rate or library file given its name. Return None if the file cannot be located, otherwise return its path.""" # check to see if the rate file is in the working dir or # is already the full path x = ratename if os.path.isfile(x): return os.path.realpath(x) # check to see if the rate file is in pynucastro/library x = os.path.join(_pynucastro_rates_dir, ratename) if os.path.isfile(x): return os.path.realpath(x) # check to see if the rate file is in pynucastro/library/tabular x = os.path.join(_pynucastro_tabular_dir, ratename) if os.path.isfile(x): return os.path.realpath(x) # check to see if the rate file is in pynucastro/library/tabular/suzuki x = os.path.join(_pynucastro_suzuki_dir, ratename) if os.path.isfile(x): return os.path.realpath(x) # check to see if the rate file is in pynucastro/library/tabular/langanke x = os.path.join(_pynucastro_langanke_dir, ratename) if os.path.isfile(x): return os.path.realpath(x) # notify user we can't find the file raise RateFileError(f'File {ratename!r} not found in the working directory, {_pynucastro_rates_dir}, or {_pynucastro_tabular_dir}') if numba is not None: Tfactor_spec = [ ('T9', numba.float64), ('T9i', numba.float64), ('T913', numba.float64), ('T913i', numba.float64), ('T953', numba.float64), ('lnT9', numba.float64) ] else: Tfactor_spec = []
[docs] @jitclass(Tfactor_spec) class Tfactors: """ precompute temperature factors for speed :param float T: input temperature (Kelvin) :var T9: T / 1.e9 K :var T9i: 1.0 / T9 :var T913i: 1.0 / T9 ** (1/3) :var T913: T9 ** (1/3) :var T953: T9 ** (5/3) :var lnT9: log(T9) """ def __init__(self, T): """ return the Tfactors object. Here, T is temperature in Kelvin """ self.T9 = T/1.e9 self.T9i = 1.0/self.T9 self.T913i = self.T9i**(1./3.) self.T913 = self.T9**(1./3.) self.T953 = self.T9**(5./3.) self.lnT9 = np.log(self.T9) @property def array(self): """return t factors as array in order of lambda function""" return np.array([1, self.T9i, self.T913i, self.T913, self.T9, self.T953, self.lnT9])
[docs] class SingleSet: """ a set in Reaclib is one piece of a rate, in the form lambda = exp[ a_0 + sum_{i=1}^5 a_i T_9**(2i-5)/3 + a_6 log T_9] A single rate in Reaclib can be composed of multiple sets :param a: the coefficients of the exponential fit :param labelprops: a collection of flags that classify a ReacLib rate """ def __init__(self, a, labelprops): """here a is iterable (e.g., list or numpy array), storing the coefficients, a0, ..., a6 """ self.a = a self.labelprops = labelprops self.label = None self.resonant = None self.weak = None self.reverse = None self._update_label_properties() def _update_label_properties(self): """ Set label and flags indicating Set is resonant, weak, or reverse. """ assert isinstance(self.labelprops, str) assert len(self.labelprops) == 6 self.label = self.labelprops[0:4] self.resonant = self.labelprops[4] == 'r' self.weak = self.labelprops[4] == 'w' self.reverse = self.labelprops[5] == 'v' def __eq__(self, other): """ Determine whether two SingleSet objects are equal to each other. """ x = True for ai, aj in zip(self.a, other.a): x = x and (ai == aj) x = x and (self.label == other.label) x = x and (self.resonant == other.resonant) x = x and (self.weak == other.weak) x = x and (self.reverse == other.reverse) return x
[docs] def f(self): """ return a function for rate(tf) where tf is a Tfactors object """ return lambda tf: np.exp(self.a[0] + self.a[1]*tf.T9i + self.a[2]*tf.T913i + self.a[3]*tf.T913 + self.a[4]*tf.T9 + self.a[5]*tf.T953 + self.a[6]*tf.lnT9)
[docs] def dfdT(self): """ return a function for this dratedT(tf), where tf is a Tfactors object """ # we have lambda = exp(f(T_9)) # so dlambda/dT9 = lambda * df/dT9 # and dlambda/dT = dlambda/dT9 / 1.e9 return lambda tf: self.f()(tf) * (-self.a[1] * tf.T9i * tf.T9i + -(1./3.) * self.a[2] * tf.T913i * tf.T9i + (1./3.) * self.a[3] * tf.T913i * tf.T913i + self.a[4] + (5./3.) * self.a[5] * tf.T913 * tf.T913 + self.a[6] * tf.T9i) / 1.e9
[docs] def set_string_py(self, prefix="set", plus_equal=False): """ return a string containing the python code for this set """ if plus_equal: string = f"{prefix} += np.exp( " else: string = f"{prefix} = np.exp( " string += f" {self.a[0]}" if not self.a[1] == 0.0: string += f" + {self.a[1]}*tf.T9i" if not self.a[2] == 0.0: string += f" + {self.a[2]}*tf.T913i" if not self.a[3] == 0.0: string += f" + {self.a[3]}*tf.T913" if not (self.a[4] == 0.0 and self.a[5] == 0.0 and self.a[6] == 0.0): indent = len(prefix)*" " string += f"\n{indent} " if not self.a[4] == 0.0: string += f" + {self.a[4]}*tf.T9" if not self.a[5] == 0.0: string += f" + {self.a[5]}*tf.T953" if not self.a[6] == 0.0: string += f" + {self.a[6]}*tf.lnT9" string += ")" return string
[docs] def set_string_cxx(self, prefix="set", plus_equal=False, with_exp=True): """ return a string containing the C++ code for this set """ if plus_equal: string = f"{prefix} += " else: string = f"{prefix} = " if with_exp: string += "std::exp( " string += f" {self.a[0]}" if not self.a[1] == 0.0: string += f" + {self.a[1]} * tfactors.T9i" if not self.a[2] == 0.0: string += f" + {self.a[2]} * tfactors.T913i" if not self.a[3] == 0.0: string += f" + {self.a[3]} * tfactors.T913" if not (self.a[4] == 0.0 and self.a[5] == 0.0 and self.a[6] == 0.0): indent = len(prefix)*" " string += f"\n{indent} " if not self.a[4] == 0.0: string += f" + {self.a[4]} * tfactors.T9" if not self.a[5] == 0.0: string += f" + {self.a[5]} * tfactors.T953" if not self.a[6] == 0.0: string += f" + {self.a[6]} * tfactors.lnT9" if with_exp: string += ");" else: string += ";" if all(q == 0.0 for q in self.a[1:]): string += "\namrex::ignore_unused(tfactors);" return string
[docs] def dln_set_string_dT9_cxx(self, prefix="dset_dT", plus_equal=False): """ return a string containing the C++ code for d/dT9 ln(set) """ if plus_equal: string = f"{prefix} += " else: string = f"{prefix} = " if all(q == 0.0 for q in self.a[1:]): string += "0.0;" return string if not self.a[1] == 0.0: string += f" {-self.a[1]} * tfactors.T9i * tfactors.T9i" if not self.a[2] == 0.0: string += f" + -(1.0/3.0) * {self.a[2]} * tfactors.T943i" if not self.a[3] == 0.0: string += f" + (1.0/3.0) * {self.a[3]} * tfactors.T923i" if not (self.a[4] == 0.0 and self.a[5] == 0.0 and self.a[6] == 0.0): indent = len(prefix)*" " string += f"\n{indent} " if not self.a[4] == 0.0: string += f" + {self.a[4]}" if not self.a[5] == 0.0: string += f" + (5.0/3.0) * {self.a[5]} * tfactors.T923" if not self.a[6] == 0.0: string += f" + {self.a[6]} * tfactors.T9i" string += ";" return string
[docs] class Rate: """The base reaction rate class. Most rate types will subclass this and extend to their particular format. """ def __init__(self, reactants=None, products=None, Q=None, weak_type="", label="generic"): """a generic Rate class that acts as a base class for specific sources. Here we only specify the reactants and products and Q value""" if reactants: self.reactants = Nucleus.cast_list(reactants) else: self.reactants = [] if products: self.products = Nucleus.cast_list(products) else: self.products = [] self.label = label # the fname is used when writing the code to evaluate the rate reactants_str = '_'.join([repr(nuc) for nuc in self.reactants]) products_str = '_'.join([repr(nuc) for nuc in self.products]) self.fname = f'{reactants_str}__{products_str}__{label}' if Q is None: self._set_q() else: self.Q = Q self.weak_type = weak_type self._set_rhs_properties() self._set_screening() self._set_print_representation() self.tabular = False self.reverse = None def __repr__(self): return self.string def __hash__(self): return hash(self.__repr__()) def __eq__(self, other): """ Determine whether two Rate objects are equal. They are equal if they contain identical reactants and products""" x = True x = x and (self.reactants == other.reactants) x = x and (self.products == other.products) return x def __lt__(self, other): """sort such that lightest reactants come first, and then look at products""" # this sort will make two nuclei with the same A be in order of Z # (assuming there are no nuclei with A > 999 # we want to compare based on the heaviest first, so we reverse self_react_sorted = sorted(self.reactants, key=lambda x: 1000*x.A + x.Z, reverse=True) other_react_sorted = sorted(other.reactants, key=lambda x: 1000*x.A + x.Z, reverse=True) if self_react_sorted != other_react_sorted: # reactants are different, so now we can check them for srn, orn in zip(self_react_sorted, other_react_sorted): if not srn == orn: return srn < orn else: # reactants are the same, so consider products self_prod_sorted = sorted(self.products, key=lambda x: 1000*x.A + x.Z, reverse=True) other_prod_sorted = sorted(other.products, key=lambda x: 1000*x.A + x.Z, reverse=True) for spn, opn in zip(self_prod_sorted, other_prod_sorted): if not spn == opn: return spn < opn # if we made it here, then the rates are the same return True def _set_q(self): """set the Q value of the reaction (in MeV)""" # from the binding energy of the nuclei, Q = -B_reactants + B_products # but note that nucbind is the binding energy *per* nucleon, so we need # to multiply by the number of nucleons self.Q = 0 for n in self.reactants: self.Q += -n.A * n.nucbind for n in self.products: self.Q += n.A * n.nucbind def _set_print_representation(self): # string is output to the terminal, rid is used as a dict key, # and pretty_string is latex # some rates will have no nuclei particles (e.g. gamma) on the left or # right -- we'll try to infer those here lhs_other = [] rhs_other = [] self.string = "" self.rid = "" self.pretty_string = r"$" # put p, n, and alpha second treactants = [] for n in self.reactants: if n.raw not in ["p", "he4", "n"]: treactants.insert(0, n) else: treactants.append(n) # figure out if there are any non-nuclei present # for the moment, we just handle strong rates # there should be the same number of protons on each side and # the same number of neutrons on each side strong_test = sum(n.Z for n in self.reactants) == sum(n.Z for n in self.products) and \ sum(n.A for n in self.reactants) == sum(n.A for n in self.products) if strong_test: if len(self.products) == 1: rhs_other.append("gamma") else: # this is a weak rate if self.weak_type == "electron_capture": # we assume that all the tabular rates are electron capture for now # we expect an electron on the left -- let's make sure # the charge on the left should be +1 the charge on the right assert sum(n.Z for n in self.reactants) == sum(n.Z for n in self.products) + 1 lhs_other.append("e-") rhs_other.append("nu") elif self.weak_type == "beta_decay": # we expect an electron on the right assert sum(n.Z for n in self.reactants) + 1 == sum(n.Z for n in self.products) rhs_other.append("e-") rhs_other.append("nubar") elif "_pos_" in self.weak_type: # we expect a positron on the right -- let's make sure assert sum(n.Z for n in self.reactants) == sum(n.Z for n in self.products) + 1 rhs_other.append("e+") rhs_other.append("nu") elif "_neg_" in self.weak_type: # we expect an electron on the right -- let's make sure assert sum(n.Z for n in self.reactants) + 1 == sum(n.Z for n in self.products) rhs_other.append("e-") rhs_other.append("nubar") else: # we need to figure out what the rate is. We'll assume that it is # not an electron capture if sum(n.Z for n in self.reactants) == sum(n.Z for n in self.products) + 1: rhs_other.append("e+") rhs_other.append("nu") elif sum(n.Z for n in self.reactants) + 1 == sum(n.Z for n in self.products): rhs_other.append("e-") rhs_other.append("nubar") for n, r in enumerate(treactants): self.string += f"{r.c()}" self.rid += f"{r}" self.pretty_string += fr"{r.pretty}" if not n == len(self.reactants)-1: self.string += " + " self.rid += " + " self.pretty_string += r" + " for o in lhs_other: if o == "e-": self.string += " + e⁻" self.pretty_string += r" + \mathrm{e}^-" self.string += " ⟶ " self.rid += " --> " self.pretty_string += r" \rightarrow " for n, p in enumerate(self.products): self.string += f"{p.c()}" self.rid += f"{p}" self.pretty_string += fr"{p.pretty}" if not n == len(self.products)-1: self.string += " + " self.rid += " + " self.pretty_string += r" + " for o in rhs_other: if o == "gamma": self.string += " + 𝛾" self.pretty_string += r"+ \gamma" elif o == "nu": self.string += " + 𝜈" self.pretty_string += r"+ \nu_e" elif o == "nubar": self.string += " + 𝜈" self.pretty_string += r"+ \bar{\nu}_e" if o == "e-": self.string += " + e⁻" self.pretty_string += r" + \mathrm{e}^-" if o == "e+": self.string += " + e⁺" self.pretty_string += r" + \mathrm{e}^+" self.pretty_string += r"$" def _set_rhs_properties(self): """ compute statistical prefactor and density exponent from the reactants. """ self.prefactor = 1.0 # this is 1/2 for rates like a + a (double counting) self.inv_prefactor = 1 for r in set(self.reactants): self.inv_prefactor = self.inv_prefactor * math.factorial(self.reactants.count(r)) self.prefactor = self.prefactor/float(self.inv_prefactor) self.dens_exp = len(self.reactants)-1 if self.weak_type == 'electron_capture': self.dens_exp = self.dens_exp + 1 def _set_screening(self): """ determine if this rate is eligible for screening and the nuclei to use. """ # Tells if this rate is eligible for screening # using screenz.f90 provided by StarKiller Microphysics. # If not eligible for screening, set to None # If eligible for screening, then # Rate.ion_screen is a 2-element (3 for 3-alpha) list of Nucleus objects for screening self.ion_screen = [] nucz = [q for q in self.reactants if q.Z != 0] if len(nucz) > 1: nucz.sort(key=lambda x: x.Z) self.ion_screen = [] self.ion_screen.append(nucz[0]) self.ion_screen.append(nucz[1]) if len(nucz) == 3: self.ion_screen.append(nucz[2]) # if the rate is a reverse rate (defined as Q < 0), then we # might actually want to compute the screening based on the # reactants of the forward rate that was used in the detailed # balance. Rate.symmetric_screen is what should be used in # the screening in this case self.symmetric_screen = [] if self.Q < 0: nucz = [q for q in self.products if q.Z != 0] if len(nucz) > 1: nucz.sort(key=lambda x: x.Z) self.symmetric_screen = [] self.symmetric_screen.append(nucz[0]) self.symmetric_screen.append(nucz[1]) if len(nucz) == 3: self.symmetric_screen.append(nucz[2]) else: self.symmetric_screen = self.ion_screen
[docs] def cname(self): """a C++-safe version of the rate name""" # replace the "__" separating reactants and products with "_to_" # and convert all other "__" to single "_" return self.fname.replace("__", "_to_", 1).replace("__", "_")
[docs] def get_rate_id(self): """ Get an identifying string for this rate.""" return f'{self.rid} <{self.label.strip()}>'
[docs] def heaviest(self): """ Return the heaviest nuclide in this Rate. If two nuclei are tied in mass number, return the one with the lowest atomic number. """ nuc = self.reactants[0] for n in self.reactants + self.products: if n.A > nuc.A or (n.A == nuc.A and n.Z < nuc.Z): nuc = n return nuc
[docs] def lightest(self): """ Return the lightest nuclide in this Rate. If two nuclei are tied in mass number, return the one with the highest atomic number. """ nuc = self.reactants[0] for n in self.reactants + self.products: if n.A < nuc.A or (n.A == nuc.A and n.Z > nuc.Z): nuc = n return nuc
[docs] def ydot_string_py(self): """ Return a string containing the term in a dY/dt equation in a reaction network corresponding to this rate. """ ydot_string_components = [] # prefactor if self.prefactor != 1.0: ydot_string_components.append(f"{self.prefactor:1.14e}") # density dependence if self.dens_exp == 1: ydot_string_components.append("rho") elif self.dens_exp != 0: ydot_string_components.append(f"rho**{self.dens_exp}") # electron fraction dependence if self.weak_type == 'electron_capture' and not self.tabular: ydot_string_components.append("ye(Y)") # composition dependence for r in sorted(set(self.reactants)): c = self.reactants.count(r) if c > 1: ydot_string_components.append(f"Y[j{r.raw}]**{c}") else: ydot_string_components.append(f"Y[j{r.raw}]") # rate_eval.{fname} ydot_string_components.append(f"rate_eval.{self.fname}") return "*".join(ydot_string_components)
[docs] def eval(self, T, rhoY=None): raise NotImplementedError("base Rate class does not know how to eval()")
[docs] def jacobian_string_py(self, y_i): """ Return a string containing the term in a jacobian matrix in a reaction network corresponding to this rate differentiated with respect to y_i y_i is an object of the class ``Nucleus``. """ if y_i not in self.reactants: return "" jac_string_components = [] # prefactor if self.prefactor != 1.0: jac_string_components.append(f"{self.prefactor:1.14e}") # density dependence if self.dens_exp == 1: jac_string_components.append("rho") elif self.dens_exp != 0: jac_string_components.append(f"rho**{self.dens_exp}") # electron fraction dependence if self.weak_type == 'electron_capture' and not self.tabular: jac_string_components.append("ye(Y)") # composition dependence for r in sorted(set(self.reactants)): c = self.reactants.count(r) if y_i == r: # take the derivative if c == 1: continue if c > 2: jac_string_components.append(f"{c}*Y[j{r.raw}]**{c-1}") elif c == 2: jac_string_components.append(f"2*Y[j{r.raw}]") else: # this nucleus is in the rate form, but we are not # differentiating with respect to it if c > 1: jac_string_components.append(f"Y[j{r.raw}]**{c}") else: jac_string_components.append(f"Y[j{r.raw}]") # rate_eval.{fname} jac_string_components.append(f"rate_eval.{self.fname}") return "*".join(jac_string_components)
[docs] def eval_jacobian_term(self, T, rho, comp, y_i): """Evaluate drate/d(y_i), y_i is a Nucleus object. This rate term has the full composition and density dependence, i.e.: rate = rho**n Y1**a Y2**b ... N_A <sigma v> The derivative is only non-zero if this term depends on nucleus y_i. """ if y_i not in self.reactants: return 0.0 ymolar = comp.get_molar() # composition dependence Y_term = 1.0 for r in sorted(set(self.reactants)): c = self.reactants.count(r) if y_i == r: # take the derivative if c == 1: continue Y_term *= c * ymolar[r]**(c-1) else: # this nucleus is in the rate form, but we are not # differentiating with respect to it Y_term *= ymolar[r]**c # density dependence dens_term = rho**self.dens_exp # electron fraction dependence if self.weak_type == 'electron_capture' and not self.tabular: y_e_term = comp.eval_ye() else: y_e_term = 1.0 # finally evaluate the rate -- for tabular rates, we need to set rhoY rate_eval = self.eval(T, rhoY=rho*comp.eval_ye()) return self.prefactor * dens_term * y_e_term * Y_term * rate_eval
[docs] class TableIndex(Enum): """a simple enum-like container for indexing the electron-capture tables""" RHOY = 0 T = 1 MU = 2 DQ = 3 VS = 4 RATE = 5 NU = 6 GAMMA = 7
[docs] class ReacLibRate(Rate): """A single reaction rate. Currently, this is a ReacLib rate, which can be composed of multiple sets, or a tabulated electron capture rate. :raises: :class:`.RateFileError`, :class:`.UnsupportedNucleus` """ def __init__(self, rfile=None, chapter=None, original_source=None, reactants=None, products=None, sets=None, labelprops=None, Q=None): """ rfile can be either a string specifying the path to a rate file or an io.StringIO object from which to read rate information. """ # pylint: disable=super-init-not-called self.rfile_path = None self.rfile = None if isinstance(rfile, str): self.rfile_path = _find_rate_file(rfile) self.rfile = os.path.basename(rfile) self.chapter = chapter # the Reaclib chapter for this reaction self.original_source = original_source # the contents of the original rate file self.fname = None if reactants: self.reactants = Nucleus.cast_list(reactants) else: self.reactants = [] if products: self.products = Nucleus.cast_list(products) else: self.products = [] if sets: self.sets = sets else: self.sets = [] # a modified rate is one where we manually changed some of its # properties self.modified = False self.labelprops = labelprops self.approx = False if self.labelprops == "approx": self.approx = True self.derived = False if self.labelprops == "derived": self.derived = True self.label = None self.resonant = None self.weak = None self.weak_type = None self.reverse = None self.removed = None self.Q = Q self.tabular = False if isinstance(rfile, str): # read in the file, parse the different sets and store them as # SingleSet objects in sets[] f = open(self.rfile_path) elif isinstance(rfile, io.StringIO): # Set f to the io.StringIO object f = rfile else: f = None if f: self._read_from_file(f) f.close() else: self._set_label_properties() self._set_rhs_properties() self._set_screening() self._set_print_representation() def _set_print_representation(self): """ compose the string representations of this Rate. """ super()._set_print_representation() # This is used to determine which rates to detect as the same reaction # from multiple sources in a Library file, so it should not be unique # to a given source, e.g. wc12, but only unique to the reaction. reactants_str = '_'.join([repr(nuc) for nuc in self.reactants]) products_str = '_'.join([repr(nuc) for nuc in self.products]) self.fname = f'{reactants_str}__{products_str}' if self.weak: self.fname += f'__weak__{self.weak_type}' if self.modified: self.fname += "__modified" if self.approx: self.fname += "__approx" if self.derived: self.fname += "__derived" if self.removed: self.fname += "__removed"
[docs] def modify_products(self, new_products): self.products = Nucleus.cast_list(new_products, allow_single=True) self.modified = True # we need to update the Q value and the print string for the rate self._set_q() self._set_screening() self.fname = None # reset so it will be updated self._set_print_representation()
def __hash__(self): return hash(self.__repr__()) def __eq__(self, other): """ Determine whether two Rate objects are equal. They are equal if they contain identical reactants and products and if they contain the same SingleSet sets and if their chapters are equal.""" if not isinstance(other, ReacLibRate): return False x = (self.chapter == other.chapter) and (self.products == other.products) and \ (self.reactants == other.reactants) if not x: return x x = len(self.sets) == len(other.sets) if not x: return x for si in self.sets: scomp = False for sj in other.sets: if si == sj: scomp = True break x = x and scomp return x def __add__(self, other): """Combine the sets of two Rate objects if they describe the same reaction. Must be Reaclib rates.""" assert self.reactants == other.reactants assert self.products == other.products assert self.chapter == other.chapter assert isinstance(self.chapter, int) assert self.label == other.label assert self.weak == other.weak assert self.weak_type == other.weak_type assert self.reverse == other.reverse if self.resonant != other.resonant: self._labelprops_combine_resonance() new_rate = ReacLibRate(chapter=self.chapter, original_source='\n'.join([self.original_source, other.original_source]), reactants=self.reactants, products=self.products, sets=self.sets + other.sets, labelprops=self.labelprops, Q=self.Q) return new_rate def _set_label_properties(self, labelprops=None): """ Calls _update_resonance_combined and then _update_label_properties. """ if labelprops: self.labelprops = labelprops # Update labelprops based on the Sets in this Rate # to set the resonance_combined flag properly self._update_resonance_combined() self._update_label_properties() def _update_resonance_combined(self): """ Checks the Sets in this Rate and updates the resonance_combined flag as well as self.labelprops[4] """ sres = [s.resonant for s in self.sets] if True in sres and False in sres: self._labelprops_combine_resonance() def _labelprops_combine_resonance(self): """ Update self.labelprops[4] = 'c'""" llp = list(self.labelprops) llp[4] = 'c' self.labelprops = ''.join(llp) def _update_label_properties(self): """ Set label and flags indicating Rate is resonant, weak, or reverse. """ assert isinstance(self.labelprops, str) if self.labelprops == "approx": self.label = "approx" self.resonant = False self.weak = False self.weak_type = None self.reverse = False elif self.labelprops == "derived": self.label = "derived" self.resonant = False # Derived may be resonant in some cases self.weak = False self.weak_type = None self.reverse = False else: assert len(self.labelprops) == 6 self.label = self.labelprops[0:4] self.resonant = self.labelprops[4] == 'r' self.weak = self.labelprops[4] == 'w' if self.weak: if self.label.strip() == 'ec' or self.label.strip() == 'bec': self.weak_type = 'electron_capture' else: self.weak_type = self.label.strip().replace('+', '_pos_').replace('-', '_neg_') else: self.weak_type = None self.reverse = self.labelprops[5] == 'v' def _read_from_file(self, f): """ given a file object, read rate data from the file. """ lines = f.readlines() f.close() self.original_source = "".join(lines) # first line is the chapter self.chapter = lines[0].strip() self.chapter = int(self.chapter) # remove any blank lines set_lines = [l for l in lines[1:] if not l.strip() == ""] # the rest is the sets first = 1 while len(set_lines) > 0: # check for a new chapter id in case of Reaclib v2 format check_chapter = set_lines[0].strip() try: # see if there is a chapter number preceding the set check_chapter = int(check_chapter) # check that the chapter number is the same as the first # set in this rate file if check_chapter != self.chapter: raise RateFileError(f'read chapter {check_chapter}, expected chapter {self.chapter} for this rate set.') # get rid of chapter number so we can read a rate set set_lines.pop(0) except (TypeError, ValueError): # there was no chapter number, proceed reading a set pass # sets are 3 lines long s1 = set_lines.pop(0) s2 = set_lines.pop(0) s3 = set_lines.pop(0) # first line of a set has up to 6 nuclei, then the label, # and finally the Q value # get rid of first 5 spaces s1 = s1[5:] # next follows 6 fields of 5 characters containing nuclei # the 6 fields are padded with spaces f = [] for i in range(6): ni = s1[:5] s1 = s1[5:] ni = ni.strip() if ni: f.append(ni) # next come 8 spaces, so get rid of them s1 = s1[8:] # next is a 4-character set label and 2 character flags labelprops = s1[:6] s1 = s1[6:] # next come 3 spaces s1 = s1[3:] # next comes a 12 character Q value followed by 10 spaces Q = float(s1.strip()) if first: self.Q = Q # what's left are the nuclei -- their interpretation # depends on the chapter if self.chapter == 1: # e1 -> e2 self.reactants.append(Nucleus.from_cache(f[0])) self.products.append(Nucleus.from_cache(f[1])) elif self.chapter == 2: # e1 -> e2 + e3 self.reactants.append(Nucleus.from_cache(f[0])) self.products += [Nucleus.from_cache(f[1]), Nucleus.from_cache(f[2])] elif self.chapter == 3: # e1 -> e2 + e3 + e4 self.reactants.append(Nucleus.from_cache(f[0])) self.products += [Nucleus.from_cache(f[1]), Nucleus.from_cache(f[2]), Nucleus.from_cache(f[3])] elif self.chapter == 4: # e1 + e2 -> e3 self.reactants += [Nucleus.from_cache(f[0]), Nucleus.from_cache(f[1])] self.products.append(Nucleus.from_cache(f[2])) elif self.chapter == 5: # e1 + e2 -> e3 + e4 self.reactants += [Nucleus.from_cache(f[0]), Nucleus.from_cache(f[1])] self.products += [Nucleus.from_cache(f[2]), Nucleus.from_cache(f[3])] elif self.chapter == 6: # e1 + e2 -> e3 + e4 + e5 self.reactants += [Nucleus.from_cache(f[0]), Nucleus.from_cache(f[1])] self.products += [Nucleus.from_cache(f[2]), Nucleus.from_cache(f[3]), Nucleus.from_cache(f[4])] elif self.chapter == 7: # e1 + e2 -> e3 + e4 + e5 + e6 self.reactants += [Nucleus.from_cache(f[0]), Nucleus.from_cache(f[1])] self.products += [Nucleus.from_cache(f[2]), Nucleus.from_cache(f[3]), Nucleus.from_cache(f[4]), Nucleus.from_cache(f[5])] elif self.chapter == 8: # e1 + e2 + e3 -> e4 self.reactants += [Nucleus.from_cache(f[0]), Nucleus.from_cache(f[1]), Nucleus.from_cache(f[2])] self.products.append(Nucleus.from_cache(f[3])) # support historical format, where chapter 8 also handles what are # now chapter 9 rates if len(f) == 5: self.products.append(Nucleus.from_cache(f[4])) elif self.chapter == 9: # e1 + e2 + e3 -> e4 + e5 self.reactants += [Nucleus.from_cache(f[0]), Nucleus.from_cache(f[1]), Nucleus.from_cache(f[2])] self.products += [Nucleus.from_cache(f[3]), Nucleus.from_cache(f[4])] elif self.chapter == 10: # e1 + e2 + e3 + e4 -> e5 + e6 self.reactants += [Nucleus.from_cache(f[0]), Nucleus.from_cache(f[1]), Nucleus.from_cache(f[2]), Nucleus.from_cache(f[3])] self.products += [Nucleus.from_cache(f[4]), Nucleus.from_cache(f[5])] elif self.chapter == 11: # e1 -> e2 + e3 + e4 + e5 self.reactants.append(Nucleus.from_cache(f[0])) self.products += [Nucleus.from_cache(f[1]), Nucleus.from_cache(f[2]), Nucleus.from_cache(f[3]), Nucleus.from_cache(f[4])] else: raise RateFileError(f'Chapter could not be identified in {self.original_source}') first = 0 # the second line contains the first 4 coefficients # the third lines contains the final 3 # we can't just use split() here, since the fields run into one another n = 13 # length of the field a = [s2[i:i+n] for i in range(0, len(s2), n)] a += [s3[i:i+n] for i in range(0, len(s3), n)] a = [float(e) for e in a if not e.strip() == ""] self.sets.append(SingleSet(a, labelprops=labelprops)) self._set_label_properties(labelprops)
[docs] def write_to_file(self, f): """ Given a file object, write rate data to the file. """ if self.original_source is None: raise NotImplementedError( f"Original source is not stored for this rate ({self})." " At present, we cannot reconstruct the rate representation without" " storing the original source." ) print(self.original_source, file=f)
[docs] def get_rate_id(self): """ Get an identifying string for this rate. Don't include resonance state since we combine resonant and non-resonant versions of reactions. """ srev = '' if self.reverse: srev = 'reverse' sweak = '' if self.weak: sweak = 'weak' ssrc = 'reaclib' return f'{self.rid} <{self.label.strip()}_{ssrc}_{sweak}_{srev}>'
[docs] def function_string_py(self): """ Return a string containing python function that computes the rate """ fstring = "" fstring += "@numba.njit()\n" fstring += f"def {self.fname}(rate_eval, tf):\n" fstring += f" # {self.rid}\n" fstring += " rate = 0.0\n\n" for s in self.sets: fstring += f" # {s.labelprops[0:5]}\n" set_string = s.set_string_py(prefix="rate", plus_equal=True) for t in set_string.split("\n"): fstring += " " + t + "\n" fstring += "\n" fstring += f" rate_eval.{self.fname} = rate\n\n" return fstring
[docs] def function_string_cxx(self, dtype="double", specifiers="inline", leave_open=False, extra_args=()): """ Return a string containing C++ function that computes the rate """ args = ["const tf_t& tfactors", f"{dtype}& rate", f"{dtype}& drate_dT", *extra_args] fstring = "" fstring += "template <int do_T_derivatives>\n" fstring += f"{specifiers}\n" fstring += f"void rate_{self.cname()}({', '.join(args)}) {{\n\n" fstring += f" // {self.rid}\n\n" fstring += " rate = 0.0;\n" fstring += " drate_dT = 0.0;\n\n" fstring += f" {dtype} ln_set_rate{{0.0}};\n" fstring += f" {dtype} dln_set_rate_dT9{{0.0}};\n" fstring += f" {dtype} set_rate{{0.0}};\n\n" for s in self.sets: fstring += f" // {s.labelprops[0:5]}\n" set_string = s.set_string_cxx(prefix="ln_set_rate", plus_equal=False, with_exp=False) for t in set_string.split("\n"): fstring += " " + t + "\n" fstring += "\n" fstring += " if constexpr (do_T_derivatives) {\n" dln_set_string_dT9 = s.dln_set_string_dT9_cxx(prefix="dln_set_rate_dT9", plus_equal=False) for t in dln_set_string_dT9.split("\n"): fstring += " " + t + "\n" fstring += " }\n" fstring += "\n" fstring += " // avoid underflows by zeroing rates in [0.0, 1.e-100]\n" fstring += " ln_set_rate = std::max(ln_set_rate, -230.0);\n" fstring += " set_rate = std::exp(ln_set_rate);\n" fstring += " rate += set_rate;\n" fstring += " if constexpr (do_T_derivatives) {\n" fstring += " drate_dT += set_rate * dln_set_rate_dT9 / 1.0e9;\n" fstring += " }\n\n" if not leave_open: fstring += "}\n\n" return fstring
[docs] def eval(self, T, rhoY=None): """ evauate the reaction rate for temperature T """ tf = Tfactors(T) r = 0.0 for s in self.sets: f = s.f() r += f(tf) return r
[docs] def eval_deriv(self, T, rhoY=None): """ evauate the derivative of reaction rate with respect to T """ _ = rhoY # unused by this subclass tf = Tfactors(T) drdT = 0.0 for s in self.sets: dfdT = s.dfdT() drdT += dfdT(tf) return drdT
[docs] def get_rate_exponent(self, T0): """ for a rate written as a power law, r = r_0 (T/T0)**nu, return nu corresponding to T0 """ # nu = dln r /dln T, so we need dr/dT r1 = self.eval(T0) dT = 1.e-8*T0 r2 = self.eval(T0 + dT) drdT = (r2 - r1)/dT return (T0/r1)*drdT
[docs] def plot(self, Tmin=1.e8, Tmax=1.6e9, rhoYmin=3.9e8, rhoYmax=2.e9, figsize=(10, 10)): """plot the rate's temperature sensitivity vs temperature :param float Tmin: minimum temperature for plot :param float Tmax: maximum temperature for plot :param float rhoYmin: minimum electron density to plot (e-capture rates only) :param float rhoYmax: maximum electron density to plot (e-capture rates only) :param tuple figsize: figure size specification for matplotlib :return: a matplotlib figure object :rtype: matplotlib.figure.Figure """ _ = (rhoYmin, rhoYmax) # unused by this subclass fig, ax = plt.subplots(figsize=figsize) temps = np.logspace(np.log10(Tmin), np.log10(Tmax), 100) r = np.zeros_like(temps) for n, T in enumerate(temps): r[n] = self.eval(T) ax.loglog(temps, r) ax.set_xlabel(r"$T$") if self.dens_exp == 0: ax.set_ylabel(r"$\tau$") elif self.dens_exp == 1: ax.set_ylabel(r"$N_A <\sigma v>$") elif self.dens_exp == 2: ax.set_ylabel(r"$N_A^2 <n_a n_b n_c v>$") ax.set_title(fr"{self.pretty_string}") return fig
if numba is not None: interpolator_spec = [ ('data', numba.float64[:, :]), ('table_rhoy_lines', numba.int32), ('table_temp_lines', numba.int32), ('rhoy', numba.float64[:]), ('temp', numba.float64[:]) ] else: interpolator_spec = []
[docs] @jitclass(interpolator_spec) class TableInterpolator: """A simple class that holds a pointer to the table data and methods that allow us to interpolate a variable""" def __init__(self, table_rhoy_lines, table_temp_lines, table_data): self.data = table_data self.table_rhoy_lines = table_rhoy_lines self.table_temp_lines = table_temp_lines # for easy indexing, store a 1-d array of T and rhoy self.rhoy = self.data[::self.table_temp_lines, TableIndex.RHOY.value] self.temp = self.data[0:self.table_temp_lines, TableIndex.T.value] def _get_logT_idx(self, logt0): """return the index into the temperatures such that T[i-1] < t0 <= T[i]. We return i-1 here, corresponding to the lower value. Note: we work in terms of log10() """ max_idx = len(self.temp) - 1 return max(0, min(max_idx, np.searchsorted(self.temp, logt0)) - 1) def _get_logrhoy_idx(self, logrhoy0): """return the index into rho*Y such that rhoY[i-1] < rhoy0 <= rhoY[i]. We return i-1 here, corresponding to the lower value. Note: we work in terms of log10() """ max_idx = len(self.rhoy) - 1 return max(0, min(max_idx, np.searchsorted(self.rhoy, logrhoy0)) - 1) def _rhoy_T_to_idx(self, irhoy, jtemp): """given a pair (irhoy, jtemp) into the table, return the 1-d index into the underlying data array assuming row-major ordering""" return irhoy * self.table_temp_lines + jtemp def interpolate(self, logrhoy, logT, component): """given logrhoy and logT, do bilinear interpolation to find the value of the data component in the table""" # We are going to do bilinear interpolation. We create a # polynomial of the form: # # f = A [log(rho) - log(rho_i)] [log(T) - log(T_j)] + # B [log(rho) - log(rho_i)] + # C [log(T) - log(T_j)] + # D # # we then find the i,j such that our point is in the # box with corners (i,j) to (i+1,j+1), and solve for # A, B, C, D # find the T and rhoY in the data table corresponding to the # lower left if logT < self.temp.min() or logT > self.temp.max(): raise ValueError("temperature out of table bounds") if logrhoy < self.rhoy.min() or logrhoy > self.rhoy.max(): raise ValueError("rhoy out of table bounds") irhoy = self._get_logrhoy_idx(logrhoy) jT = self._get_logT_idx(logT) # note: rhoy and T are already stored as log dlogrho = self.rhoy[irhoy+1] - self.rhoy[irhoy] dlogT = self.temp[jT+1] - self.temp[jT] # get the data at the 4 points idx = self._rhoy_T_to_idx(irhoy, jT) f_ij = self.data[idx, component] idx = self._rhoy_T_to_idx(irhoy+1, jT) f_ip1j = self.data[idx, component] idx = self._rhoy_T_to_idx(irhoy, jT+1) f_ijp1 = self.data[idx, component] idx = self._rhoy_T_to_idx(irhoy+1, jT+1) f_ip1jp1 = self.data[idx, component] D = f_ij C = (f_ijp1 - f_ij) / dlogT B = (f_ip1j - f_ij) / dlogrho A = (f_ip1jp1 - B * dlogrho - C * dlogT - D) / (dlogrho * dlogT) r = (A * (logrhoy - self.rhoy[irhoy]) * (logT - self.temp[jT]) + B * (logrhoy - self.rhoy[irhoy]) + C * (logT - self.temp[jT]) + D) return r
[docs] class TabularRate(Rate): """A tabular rate. :raises: :class:`.RateFileError`, :class:`.UnsupportedNucleus` """ def __init__(self, rfile=None): """ rfile can be either a string specifying the path to a rate file or an io.StringIO object from which to read rate information. """ super().__init__() self.rfile_path = None self.rfile = None if isinstance(rfile, str): self.rfile_path = _find_rate_file(rfile) self.rfile = os.path.basename(rfile) self.fname = None self.label = "tabular" self.tabular = True # we should initialize this somehow self.weak_type = "" if isinstance(rfile, str): # read in the file, parse the different sets and store them as # SingleSet objects in sets[] f = open(self.rfile_path) elif isinstance(rfile, io.StringIO): # Set f to the io.StringIO object f = rfile else: f = None if f: self._read_from_file(f) f.close() self._set_rhs_properties() self._set_screening() self._set_print_representation() self.get_tabular_rate() self.interpolator = TableInterpolator(self.table_rhoy_lines, self.table_temp_lines, self.tabular_data_table) def __hash__(self): return hash(self.__repr__()) def __eq__(self, other): """ Determine whether two Rate objects are equal. They are equal if they contain identical reactants and products.""" if not isinstance(other, TabularRate): return False return self.reactants == other.reactants and self.products == other.products def __add__(self, other): raise NotImplementedError("addition not defined for tabular rates") def _read_from_file(self, f): """ given a file object, read rate data from the file. """ lines = f.readlines() f.close() self.original_source = "".join(lines) # first line is the chapter self.chapter = lines[0].strip() if self.chapter != "t": raise RateFileError(f"Invalid chapter for TabularRate ({self.chapter})") # remove any blank lines set_lines = [l for l in lines[1:] if not l.strip() == ""] # e1 -> e2, Tabulated s1 = set_lines.pop(0) s2 = set_lines.pop(0) s3 = set_lines.pop(0) s4 = set_lines.pop(0) s5 = set_lines.pop(0) f = s1.split() try: self.reactants.append(Nucleus.from_cache(f[0])) self.products.append(Nucleus.from_cache(f[1])) except UnsupportedNucleus as ex: raise RateFileError(f'Nucleus objects could not be identified in {self.original_source}') from ex self.table_file = s2.strip() self.table_header_lines = int(s3.strip()) self.table_rhoy_lines = int(s4.strip()) self.table_temp_lines = int(s5.strip()) self.table_num_vars = 6 # Hard-coded number of variables in tables for now. self.table_index_name = f'j_{self.reactants[0]}_{self.products[0]}' self.labelprops = 'tabular' # set weak type if "electroncapture" in self.table_file: self.weak_type = "electron_capture" elif "betadecay" in self.table_file: self.weak_type = "beta_decay" # since the reactants and products were only now set, we need # to recompute Q -- this is used for finding rate pairs self._set_q() def _set_rhs_properties(self): """ compute statistical prefactor and density exponent from the reactants. """ self.prefactor = 1.0 # this is 1/2 for rates like a + a (double counting) self.inv_prefactor = 1 for r in set(self.reactants): self.inv_prefactor = self.inv_prefactor * math.factorial(self.reactants.count(r)) self.prefactor = self.prefactor/float(self.inv_prefactor) self.dens_exp = len(self.reactants)-1 def _set_screening(self): """ tabular rates are not currently screened (they are e-capture or beta-decay)""" self.ion_screen = [] self.symmetric_screen = [] if not self.fname: # This is used to determine which rates to detect as the same reaction # from multiple sources in a Library file, so it should not be unique # to a given source, e.g. wc12, but only unique to the reaction. reactants_str = '_'.join([repr(nuc) for nuc in self.reactants]) products_str = '_'.join([repr(nuc) for nuc in self.products]) self.fname = f'{reactants_str}__{products_str}'
[docs] def get_rate_id(self): """ Get an identifying string for this rate. Don't include resonance state since we combine resonant and non-resonant versions of reactions. """ ssrc = 'tabular' return f'{self.rid} <{self.label.strip()}_{ssrc}>'
[docs] def function_string_py(self): """ Return a string containing python function that computes the rate """ fstring = "" fstring += "@numba.njit()\n" fstring += f"def {self.fname}(rate_eval, T, rhoY):\n" fstring += f" # {self.rid}\n" fstring += f" {self.fname}_interpolator = TableInterpolator(*{self.fname}_info)\n" fstring += f" r = {self.fname}_interpolator.interpolate(np.log10(rhoY), np.log10(T), TableIndex.RATE.value)\n" fstring += f" rate_eval.{self.fname} = 10.0**r\n\n" return fstring
[docs] def get_tabular_rate(self): """read the rate data from .dat file """ # find .dat file and read it self.table_path = _find_rate_file(self.table_file) t_data2d = [] with open(self.table_path) as tabular_file: for i, line in enumerate(tabular_file): # skip header lines if i < self.table_header_lines: continue line = line.strip() # skip empty lines if not line: continue # split the column values on whitespace t_data2d.append(line.split()) # convert the nested list of string values into a numpy float array self.tabular_data_table = np.array(t_data2d, dtype=np.float64)
[docs] def eval(self, T, rhoY=None): """ evauate the reaction rate for temperature T """ r = self.interpolator.interpolate(np.log10(rhoY), np.log10(T), TableIndex.RATE.value) return 10.0**r
[docs] def get_nu_loss(self, T, rhoY): """ get the neutrino loss rate for the reaction if tabulated""" r = self.interpolator.interpolate(np.log10(rhoY), np.log10(T), TableIndex.NU.value) return 10**r
[docs] def plot(self, Tmin=1.e8, Tmax=1.6e9, rhoYmin=3.9e8, rhoYmax=2.e9, color_field='rate', figsize=(10, 10)): """plot the rate's temperature sensitivity vs temperature :param float Tmin: minimum temperature for plot :param float Tmax: maximum temperature for plot :param float rhoYmin: minimum electron density to plot (e-capture rates only) :param float rhoYmax: maximum electron density to plot (e-capture rates only) :param tuple figsize: figure size specification for matplotlib :return: a matplotlib figure object :rtype: matplotlib.figure.Figure """ fig, ax = plt.subplots(figsize=figsize) data = self.tabular_data_table inde1 = data[:, TableIndex.T.value] <= np.log10(Tmax) inde2 = data[:, TableIndex.T.value] >= np.log10(Tmin) inde3 = data[:, TableIndex.RHOY.value] <= np.log10(rhoYmax) inde4 = data[:, TableIndex.RHOY.value] >= np.log10(rhoYmin) data_heatmap = data[inde1 & inde2 & inde3 & inde4].copy() rows, row_pos = np.unique(data_heatmap[:, 0], return_inverse=True) cols, col_pos = np.unique(data_heatmap[:, 1], return_inverse=True) pivot_table = np.zeros((len(rows), len(cols)), dtype=data_heatmap.dtype) if color_field == 'rate': icol = TableIndex.RATE.value title = f"{self.weak_type} rate in log10(1/s)" cmap = 'magma' elif color_field == 'nu_loss': icol = TableIndex.NU.value title = "neutrino energy loss rate in log10(erg/s)" cmap = 'viridis' else: raise ValueError("color_field must be either 'rate' or 'nu_loss'.") try: pivot_table[row_pos, col_pos] = data_heatmap[:, icol] except ValueError: print("Divide by zero encountered in log10\nChange the scale of T or rhoY") im = ax.imshow(pivot_table, cmap=cmap) fig.colorbar(im, ax=ax) ax.set_xlabel(r"$\log(T)$ [K]") ax.set_ylabel(r"$\log(\rho Y_e)$ [g/cm$^3$]") ax.set_title(fr"{self.pretty_string}" + "\n" + title) ax.set_yticks(range(len(rows))) ylabels = [f"{q:4.2f}" for q in rows] ax.set_yticklabels(ylabels) ax.set_xticks(range(len(cols))) xlabels = [f"{q:4.2f}" for q in cols] ax.set_xticklabels(xlabels, rotation=90, ha="right", rotation_mode="anchor") ax.invert_yaxis() return fig
[docs] class DerivedRate(ReacLibRate): """ This class is a derived class from `Rate` with the purpose of computing the inverse rate by the application of detailed balance to the forward reactions. """ def __init__(self, rate, compute_Q=False, use_pf=False): self.use_pf = use_pf self.rate = rate self.compute_Q = compute_Q if not isinstance(rate, Rate): raise TypeError('rate must be a Rate subclass') if (isinstance(rate, TabularRate) or self.rate.weak or self.rate.reverse): raise ValueError('The rate is reverse or weak or tabular') for nuc in self.rate.reactants: if not nuc.spin_states: raise ValueError('One of the reactants spin ground state, is not defined') for nuc in self.rate.products: if not nuc.spin_states: raise ValueError('One of the products spin ground state, is not defined') derived_sets = [] for ssets in self.rate.sets: a = ssets.a prefactor = 0.0 Q = 0.0 prefactor += -np.log(constants.N_A) * (len(self.rate.reactants) - len(self.rate.products)) for nucr in self.rate.reactants: prefactor += 1.5*np.log(nucr.A) + np.log(nucr.spin_states) Q += nucr.A_nuc for nucp in self.rate.products: prefactor += -1.5*np.log(nucp.A) - np.log(nucp.spin_states) Q -= nucp.A_nuc if self.compute_Q: Q = Q * constants.m_u_MeV else: Q = self.rate.Q prefactor += np.log(self.counter_factors()[1]) - np.log(self.counter_factors()[0]) if len(self.rate.reactants) == len(self.rate.products): prefactor += 0.0 else: F = (constants.m_u * constants.k * 1.0e9 / (2.0*np.pi*constants.hbar**2))**(1.5*(len(self.rate.reactants) - len(self.rate.products))) prefactor += np.log(F) a_rev = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] a_rev[0] = prefactor + a[0] a_rev[1] = a[1] - Q / (1.0e9 * constants.k_MeV) a_rev[2] = a[2] a_rev[3] = a[3] a_rev[4] = a[4] a_rev[5] = a[5] a_rev[6] = a[6] + 1.5*(len(self.rate.reactants) - len(self.rate.products)) sset_d = SingleSet(a=a_rev, labelprops=rate.labelprops) derived_sets.append(sset_d) super().__init__(rfile=self.rate.rfile, chapter=self.rate.chapter, original_source=self.rate.original_source, reactants=self.rate.products, products=self.rate.reactants, sets=derived_sets, labelprops="derived", Q=-Q) def _warn_about_missing_pf_tables(self): skip_nuclei = {Nucleus("h1"), Nucleus("n"), Nucleus("he4")} for nuc in set(self.rate.reactants + self.rate.products) - skip_nuclei: if not nuc.partition_function: warnings.warn(UserWarning(f'{nuc} partition function is not supported by tables: set pf = 1.0 by default'))
[docs] def eval(self, T, rhoY=None): r = super().eval(T=T, rhoY=rhoY) z_r = 1.0 z_p = 1.0 if self.use_pf: self._warn_about_missing_pf_tables() for nucr in self.rate.reactants: if not nucr.partition_function: continue #nucr.partition_function = lambda T: 1.0 z_r *= nucr.partition_function.eval(T) for nucp in self.rate.products: if not nucp.partition_function: continue #nucp.partition_function = lambda T: 1.0 z_p *= nucp.partition_function.eval(T) return r*z_r/z_p return r
[docs] def function_string_py(self): """ Return a string containing python function that computes the rate """ self._warn_about_missing_pf_tables() fstring = super().function_string_py() if self.use_pf: fstring += "\n" for nuc in set(self.rate.reactants + self.rate.products): if nuc.partition_function: fstring += f" # interpolating {nuc} partition function\n" fstring += f" {nuc}_pf_exponent = np.interp(tf.T9, xp={nuc}_temp_array, fp=np.log10({nuc}_pf_array))\n" fstring += f" {nuc}_pf = 10.0**{nuc}_pf_exponent\n" else: fstring += f" # setting {nuc} partition function to 1.0 by default, independent of T\n" fstring += f" {nuc}_pf = 1.0\n" fstring += "\n" fstring += " " fstring += "z_r = " fstring += "*".join([f"{nucr}_pf" for nucr in self.rate.reactants]) fstring += "\n" fstring += " " fstring += "z_p = " fstring += "*".join([f"{nucp}_pf" for nucp in self.rate.products]) fstring += "\n" fstring += f" rate_eval.{self.fname} *= z_r/z_p\n" return fstring
[docs] def function_string_cxx(self, dtype="double", specifiers="inline", leave_open=False, extra_args=()): """ Return a string containing C++ function that computes the rate """ self._warn_about_missing_pf_tables() extra_args = ["[[maybe_unused]] part_fun::pf_cache_t& pf_cache", *extra_args] fstring = super().function_string_cxx(dtype=dtype, specifiers=specifiers, leave_open=True, extra_args=extra_args) # right now we have rate and drate_dT without the partition function # now the partition function corrections if self.use_pf: fstring += "\n" for nuc in set(self.rate.reactants + self.rate.products): fstring += f" {dtype} {nuc}_pf, d{nuc}_pf_dT;\n" if nuc.partition_function: fstring += f" // interpolating {nuc} partition function\n" fstring += f" get_partition_function_cached({nuc.cindex()}, tfactors, pf_cache, {nuc}_pf, d{nuc}_pf_dT);\n" else: fstring += f" // setting {nuc} partition function to 1.0 by default, independent of T\n" fstring += f" {nuc}_pf = 1.0_rt;\n" fstring += f" d{nuc}_pf_dT = 0.0_rt;\n" fstring += "\n" fstring += f" {dtype} z_r = " fstring += " * ".join([f"{nucr}_pf" for nucr in self.rate.reactants]) fstring += ";\n" fstring += f" {dtype} z_p = " fstring += " * ".join([f"{nucp}_pf" for nucp in self.rate.products]) fstring += ";\n\n" # now the derivatives, via chain rule chain_terms = [] for n in self.rate.reactants: chain_terms.append(" * ".join([f"{nucr}_pf" for nucr in self.rate.reactants if nucr != n] + [f"d{n}_pf_dT"])) fstring += f" {dtype} dz_r_dT = " fstring += " + ".join(chain_terms) fstring += ";\n" chain_terms = [] for n in self.rate.products: chain_terms.append(" * ".join([f"{nucp}_pf" for nucp in self.rate.products if nucp != n] + [f"d{n}_pf_dT"])) fstring += f" {dtype} dz_p_dT = " fstring += " + ".join(chain_terms) fstring += ";\n\n" fstring += f" {dtype} dzterm_dT = (z_p * dz_r_dT - z_r * dz_p_dT) / (z_p * z_p);\n\n" # final terms fstring += " drate_dT = dzterm_dT * rate + drate_dT * (z_r / z_p);\n" fstring += " rate *= z_r/z_p;\n\n" if not leave_open: fstring += "}\n\n" return fstring
[docs] def counter_factors(self): """This function returns the nucr! = nucr_1! * ... * nucr_r! for each repeated nucr reactant and nucp! = nucp_1! * ... * nucp_p! for each reactant nucp product in a ordered pair (nucr!, nucp!). The factors nucr! and nucp! avoid overcounting when more than one nuclei is involve in the reaction, otherwise it will return 1.0. """ react_counts = Counter(self.rate.reactants) prod_counts = Counter(self.rate.products) reactant_factor = 1.0 for nuc in set(self.rate.reactants): reactant_factor *= math.factorial(react_counts[nuc]) product_factor = 1.0 for nuc in set(self.rate.products): product_factor *= math.factorial(prod_counts[nuc]) return (reactant_factor, product_factor)
[docs] class RatePair: """the forward and reverse rates for a single reaction sequence. Forward rates are those with Q >= 0. :var forward: the forward reaction Rate object :var reverse: the reverse reaction Rate object """ def __init__(self, forward=None, reverse=None): self.forward = forward self.reverse = reverse def __repr__(self): return f"forward: {self.forward} ; reverse: {self.reverse}" def __lt__(self, other): if self.forward is not None and other.forward is not None: return self.forward < other.forward if self.forward is None: return False return True def __eq__(self, other): return self.forward == other.forward and self.reverse == other.reverse
[docs] class ApproximateRate(ReacLibRate): def __init__(self, primary_rate, secondary_rates, primary_reverse, secondary_reverse, is_reverse=False, approx_type="ap_pg"): """the primary rate has the same reactants and products and the final approximate rate would have. The secondary rates are ordered such that together they would give the same sequence""" self.primary_rate = primary_rate self.secondary_rates = secondary_rates self.primary_reverse = primary_reverse self.secondary_reverse = secondary_reverse self.is_reverse = is_reverse self.approx_type = approx_type if self.approx_type == "ap_pg": # an ap_pg approximate rate combines A(a,g)B and A(a,p)X(p,g)B into a # single effective rate by assuming proton equilibrium. assert len(secondary_rates) == 2 # make sure that the primary forward rate makes sense # this should be A(a,g)B assert Nucleus("he4") in self.primary_rate.reactants and len(self.primary_rate.products) == 1 # we are going to define the product A and reactant B from this reaction self.primary_reactant = max(self.primary_rate.reactants) self.primary_product = max(self.primary_rate.products) # the first secondary rate should be A(a,p)X, where X is the # intermediate nucleus assert (self.primary_reactant in self.secondary_rates[0].reactants and Nucleus("he4") in self.secondary_rates[0].reactants and Nucleus("p") in self.secondary_rates[0].products) # the intermediate nucleus is not in our network, so make it # dummy self.intermediate_nucleus = max(self.secondary_rates[0].products) #self.intermediate_nucleus.dummy = True # now the second secondary rate show be X(p,g)B assert (self.intermediate_nucleus in self.secondary_rates[1].reactants and Nucleus("p") in self.secondary_rates[1].reactants and self.primary_product in secondary_rates[1].products) # now ensure that the reverse rate makes sense # the primary reverse rate is B(g,a)A assert (self.primary_product in self.primary_reverse.reactants and self.primary_reactant in self.primary_reverse.products) # now the first secondary reverse rate should be B(g,p)X assert (self.primary_product in self.secondary_reverse[0].reactants and self.intermediate_nucleus in secondary_reverse[0].products and Nucleus("p") in secondary_reverse[0].products) # and the second secondary reverse rate should be X(p,a)A assert (self.intermediate_nucleus in self.secondary_reverse[1].reactants and Nucleus("p") in self.secondary_reverse[1].reactants and self.primary_reactant in self.secondary_reverse[1].products and Nucleus("he4") in self.secondary_reverse[1].products) # now initialize the super class with these reactants and products if not self.is_reverse: super().__init__(reactants=[self.primary_reactant, Nucleus("he4")], products=[self.primary_product], labelprops="approx", chapter=-1) else: super().__init__(reactants=[self.primary_product], products=[self.primary_reactant, Nucleus("he4")], labelprops="approx", chapter=-1) else: raise NotImplementedError(f"approximation type {self.approx_type} not supported") # update the Q value self._set_q()
[docs] def get_child_rates(self): """return a list of all of the rates that are used in this approximation""" tlist = [self.primary_rate] tlist += self.secondary_rates tlist += [self.primary_reverse] tlist += self.secondary_reverse return tlist
def _set_screening(self): # the individual rates are screened -- we don't screen the combination of them pass
[docs] def eval(self, T, rhoY=None): """evaluate the approximate rate""" if self.approx_type == "ap_pg": if not self.is_reverse: # pylint: disable=no-else-return # the approximate forward rate is r_ag + r_ap r_pg / (r_pg + r_pa) r_ag = self.primary_rate.eval(T) r_ap = self.secondary_rates[0].eval(T) r_pg = self.secondary_rates[1].eval(T) r_pa = self.secondary_reverse[1].eval(T) return r_ag + r_ap * r_pg / (r_pg + r_pa) else: # the approximate reverse rate is r_ga + r_pa r_gp / (r_pg + r_pa) r_ga = self.primary_reverse.eval(T) r_gp = self.secondary_reverse[0].eval(T) r_pa = self.secondary_reverse[1].eval(T) r_pg = self.secondary_rates[1].eval(T) return r_ga + r_pa * r_gp / (r_pg + r_pa) raise NotImplementedError(f"approximation type {self.approx_type} not supported")
[docs] def function_string_py(self): """ Return a string containing python function that computes the approximate rate """ if self.approx_type != "ap_pg": raise NotImplementedError("don't know how to work with this approximation") string = "" string += "@numba.njit()\n" string += f"def {self.fname}(rate_eval, tf):\n" if not self.is_reverse: # first we need to get all of the rates that make this up string += f" r_ag = rate_eval.{self.primary_rate.fname}\n" string += f" r_ap = rate_eval.{self.secondary_rates[0].fname}\n" string += f" r_pg = rate_eval.{self.secondary_rates[1].fname}\n" string += f" r_pa = rate_eval.{self.secondary_reverse[1].fname}\n" # now the approximation string += " rate = r_ag + r_ap * r_pg / (r_pg + r_pa)\n" else: # first we need to get all of the rates that make this up string += f" r_ga = rate_eval.{self.primary_reverse.fname}\n" string += f" r_pa = rate_eval.{self.secondary_reverse[1].fname}\n" string += f" r_gp = rate_eval.{self.secondary_reverse[0].fname}\n" string += f" r_pg = rate_eval.{self.secondary_rates[1].fname}\n" # now the approximation string += " rate = r_ga + r_pa * r_gp / (r_pg + r_pa)\n" string += f" rate_eval.{self.fname} = rate\n\n" return string
[docs] def function_string_cxx(self, dtype="double", specifiers="inline", leave_open=False, extra_args=()): """ Return a string containing C++ function that computes the approximate rate """ if self.approx_type != "ap_pg": raise NotImplementedError("don't know how to work with this approximation") args = ["const T& rate_eval", f"{dtype}& rate", f"{dtype}& drate_dT", *extra_args] fstring = "" fstring = "template <typename T>\n" fstring += f"{specifiers}\n" fstring += f"void rate_{self.cname()}({', '.join(args)}) {{\n\n" if not self.is_reverse: # first we need to get all of the rates that make this up fstring += f" {dtype} r_ag = rate_eval.screened_rates(k_{self.primary_rate.cname()});\n" fstring += f" {dtype} r_ap = rate_eval.screened_rates(k_{self.secondary_rates[0].cname()});\n" fstring += f" {dtype} r_pg = rate_eval.screened_rates(k_{self.secondary_rates[1].cname()});\n" fstring += f" {dtype} r_pa = rate_eval.screened_rates(k_{self.secondary_reverse[1].cname()});\n" # now the approximation fstring += f" {dtype} dd = 1.0_rt / (r_pg + r_pa);\n" fstring += " rate = r_ag + r_ap * r_pg * dd;\n" fstring += " if constexpr (std::is_same_v<T, rate_derivs_t>) {\n" fstring += f" {dtype} drdT_ag = rate_eval.dscreened_rates_dT(k_{self.primary_rate.cname()});\n" fstring += f" {dtype} drdT_ap = rate_eval.dscreened_rates_dT(k_{self.secondary_rates[0].cname()});\n" fstring += f" {dtype} drdT_pg = rate_eval.dscreened_rates_dT(k_{self.secondary_rates[1].cname()});\n" fstring += f" {dtype} drdT_pa = rate_eval.dscreened_rates_dT(k_{self.secondary_reverse[1].cname()});\n" fstring += " drate_dT = drdT_ag + drdT_ap * r_pg * dd + r_ap * drdT_pg * dd - r_ap * r_pg * dd * dd * (drdT_pg + drdT_pa);\n" fstring += " }\n" else: # first we need to get all of the rates that make this up fstring += f" {dtype} r_ga = rate_eval.screened_rates(k_{self.primary_reverse.cname()});\n" fstring += f" {dtype} r_pa = rate_eval.screened_rates(k_{self.secondary_reverse[1].cname()});\n" fstring += f" {dtype} r_gp = rate_eval.screened_rates(k_{self.secondary_reverse[0].cname()});\n" fstring += f" {dtype} r_pg = rate_eval.screened_rates(k_{self.secondary_rates[1].cname()});\n" # now the approximation fstring += f" {dtype} dd = 1.0_rt / (r_pg + r_pa);\n" fstring += " rate = r_ga + r_gp * r_pa * dd;\n" fstring += " if constexpr (std::is_same_v<T, rate_derivs_t>) {\n" fstring += f" {dtype} drdT_ga = rate_eval.dscreened_rates_dT(k_{self.primary_reverse.cname()});\n" fstring += f" {dtype} drdT_pa = rate_eval.dscreened_rates_dT(k_{self.secondary_reverse[1].cname()});\n" fstring += f" {dtype} drdT_gp = rate_eval.dscreened_rates_dT(k_{self.secondary_reverse[0].cname()});\n" fstring += f" {dtype} drdT_pg = rate_eval.dscreened_rates_dT(k_{self.secondary_rates[1].cname()});\n" fstring += " drate_dT = drdT_ga + drdT_gp * r_pa * dd + r_gp * drdT_pa * dd - r_gp * r_pa * dd * dd * (drdT_pg + drdT_pa);\n" fstring += " }\n" if not leave_open: fstring += "}\n\n" return fstring