Source code for pynucastro.rates.library

import collections
import io
import re
from os import walk
from pathlib import Path

from pynucastro.nucdata import Nucleus, UnsupportedNucleus
from pynucastro.rates.derived_rate import DerivedRate
from pynucastro.rates.files import (RateFileError, _find_rate_file,
                                    get_rates_dir)
from pynucastro.rates.known_duplicates import (find_duplicate_rates,
                                               is_allowed_dupe)
from pynucastro.rates.rate import Rate
from pynucastro.rates.reaclib_rate import ReacLibRate
from pynucastro.rates.tabular_rate import TabularRate


[docs] def list_known_rates(): """Print a list of all of the rates found in the library """ lib_path = Path(__file__).parents[1]/"library" for _, _, filenames in walk(lib_path): for f in filenames: # skip over files that are not rate files if Path(f).suffix in (".md", ".dat", ".py", "ipynb"): continue try: lib = Library(f) except (RateFileError, UnsupportedNucleus): continue print(f"{f:32} : ") for r in lib.get_rates(): print(f" : {r}")
def _rate_name_to_nuc(name): # first try to interpret name as A(x,y)B rate_str = re.compile(r"([A-Za-z0-9]+)\(([A-Za-z0-9_]*),([A-Za-z0-9_]*)\)([A-Za-z0-9]+)", re.IGNORECASE) nucs = rate_str.search(name) try: _r = [nucs.group(1), nucs.group(2)] _p = [nucs.group(3), nucs.group(4)] except AttributeError: return None # now try to make nuclei objects. reactants = [] for nuc in _r: if nuc == "": continue try: n = Nucleus(nuc) reactants.append(n) except (ValueError, AssertionError): # we need to interpret some things specially if nuc.lower() in ["e", "nu", "_", "g", "gamma"]: # first electrons and neutrins, and nothing continue if nuc.lower() == "pp": reactants += [Nucleus("p"), Nucleus("p")] continue if nuc.lower() == "aa": reactants += [Nucleus("he4"), Nucleus("he4")] continue print(f"couldn't deal with {nuc}") raise products = [] for nuc in _p: if nuc == "": continue try: n = Nucleus(nuc) products.append(n) except (ValueError, AssertionError): # we need to interpret some things specially if nuc.lower() in ["e", "nu", "_", "g", "gamma"]: # first electrons and neutrinos, gammas, and nothing continue if nuc.lower() == "pp": products += [Nucleus("p"), Nucleus("p")] continue if nuc.lower() == "aa": products += [Nucleus("he4"), Nucleus("he4")] continue print(f"couldn't deal with {nuc}") raise return reactants, products
[docs] def capitalize_rid(rid, delimiter): """Capitalize a ``Rate`` ``rid`` or ``fname`` given the delimiter. The delimiter is usually either "_" or " " Parameters ---------- rid : str the rate's id delimiter : str the delimiter used to split the rid into substrings Returns ------- str """ rid_nucs = rid.split(delimiter) rid_mod = [] do_capitalization = True for n in rid_nucs: if n in ("weak", "approx", "derived"): do_capitalization = False if do_capitalization: if n not in ("n", "p"): n = n.capitalize() rid_mod.append(n) rid_mod = delimiter.join(rid_mod) return rid_mod
[docs] class Library: """A Library is a container storing multiple rates that allows for filtering rates based on rules, managing duplicate rates, and selecting subsets of rates based on properties. At its heart is a ``dict`` of rates keyed by the rate id. A library may contain rates from a single source, or be created by adding or subtracting existing Library objects. Parameters ---------- libfile : str a file containing a sequence of rates in a format that we understand (for example a ReacLib database) rates : list, dict, Rate a single :py:class:`Rate <pynucastro.rates.rate.Rate>` or an iterable of `Rate` objects. If it is a dictionary, then it should be keyed by the rate id. """ def __init__(self, libfile=None, rates=None): self._rates = {} if rates: if isinstance(rates, Rate): rates = [rates] if isinstance(rates, dict): self._rates = rates elif isinstance(rates, (list, set)): self.add_rates(rates) else: raise TypeError("rates in Library constructor must be a Rate object, list of Rate objects, or dictionary of Rate objects keyed by Rate.id") if libfile: library_file = _find_rate_file(libfile) self._read_library_file(library_file)
[docs] def get_rates(self): """Return a list of the rates in this library. Returns ------- list """ return list(self._rates.values())
[docs] def get_rate(self, rid): """Return a rate matching the id provided. Returns ------- Rate """ try: rid_mod = capitalize_rid(rid, " ") return self._rates[rid_mod] except KeyError: pass # fallback to the rate fname try: rid_mod = capitalize_rid(rid, "_") return [q for q in self.get_rates() if q.fname == rid_mod][0] except IndexError: raise LookupError(f"rate identifier {rid!r} does not match a rate in this library.") from None
@property def num_rates(self): """Get the total number of rates in the Library Returns ------- int """ return len(self.get_rates())
[docs] def add_rate(self, rate): """Manually add a rate to the library. Parameters ---------- rate : Rate The rate to add """ if not isinstance(rate, Rate): raise TypeError(f"invalid Rate object {rate}") rid = rate.id if rid in self._rates: raise ValueError(f"supplied a Rate object already in the Library: {rid}") self._rates[rid] = rate
[docs] def add_rates(self, ratelist): """Add multiple rates to the library Parameters ---------- ratelist : list of Rate the list of rates to add to the library. """ for rate in ratelist: self.add_rate(rate)
[docs] def get_rate_by_name(self, name): """Given a string representing a rate in the form 'A(x,y)B' (or a list of strings for multiple rates) return the Rate objects that match from the Library. If there are multiple inputs, then a list of Rate objects is returned. Returns ------- rates : list, Rate A single rate or a list of rates """ rate_name_list = name if isinstance(name, str): rate_name_list = [name] rates_out = [] for rname in rate_name_list: reactants, products = _rate_name_to_nuc(rname) rf = RateFilter(reactants=reactants, products=products) _lib = self.filter(rf) if _lib is None: print(f"rate {rname} not found") continue rates_out += _lib.get_rates() if len(rates_out) == 0: return None if len(rates_out) == 1: return rates_out[0] return rates_out
[docs] def remove_rate(self, rate): """Manually remove a rate from the library by supplying the short name "A(x,y)B", a Rate object, or the rate id Parameters ---------- rate : str, Rate The rate to remove from the library. """ if isinstance(rate, Rate): rid = rate.id self._rates.pop(rid) elif isinstance(rate, str): rid = self.get_rate_by_name(rate).id self._rates.pop(rid) else: # we assume that a rate id as provided self._rates.pop(rate)
[docs] def get_nuclei(self): """Get the list of unique nuclei in the library Returns ------- set """ return {nuc for r in self.get_rates() for nuc in r.reactants + r.products}
[docs] def heaviest(self): """Return the heaviest nuclide in this library. Returns ------- Nucleus """ nuc = None for r in self.get_rates(): rnuc = r.heaviest() if nuc: if rnuc.A > nuc.A or (rnuc.A == nuc.A and rnuc.Z < nuc.Z): nuc = rnuc else: nuc = rnuc return nuc
[docs] def lightest(self): """Return the lightest nuclide in this library. Returns ------- Nucleus """ nuc = None for r in self.get_rates(): rnuc = r.lightest() if nuc: if rnuc.A < nuc.A or (rnuc.A == nuc.A and rnuc.Z > nuc.Z): nuc = rnuc else: nuc = rnuc return nuc
def _read_library_file(self, library_file): # loop through library file, read lines library_source_lines = collections.deque() with library_file.open("r") as flib: for line in flib: ls = line.rstrip('\n') if ls.strip(): library_source_lines.append(ls) # identify distinct rates from library lines current_chapter = None while True: if len(library_source_lines) == 0: break # Check to see if there is a chapter ID, if not then use current_chapter # (for Reaclib v1 formatted library files) line = library_source_lines[0].strip() chapter = None if line in ('t', 'T'): chapter = 't' library_source_lines.popleft() else: try: chapter = int(line) except (TypeError, ValueError): # we can't interpret line as a chapter so use current_chapter assert current_chapter, f'malformed library file {library_file}, cannot identify chapter.' chapter = current_chapter else: library_source_lines.popleft() current_chapter = chapter rlines = None rate_type = None if chapter == 't': rlines = [library_source_lines.popleft() for i in range(5)] rate_type = "tabular" elif isinstance(chapter, int): rlines = [library_source_lines.popleft() for i in range(3)] rate_type = "reaclib" if rlines: sio = io.StringIO('\n'.join([f'{chapter}'] + rlines)) #print(sio.getvalue()) try: if rate_type == "reaclib": r = ReacLibRate(rfile=sio) elif rate_type == "tabular": r = TabularRate(rfile=sio) else: raise NotImplementedError("rate not implemented") except UnsupportedNucleus: pass else: rid = r.id if rid in self._rates: self._rates[rid] = self._rates[rid] + r else: self._rates[rid] = r
[docs] def write_to_file(self, filename, *, prepend_rates_dir=False): """Write the library out to a file of the given name in Reaclib format. Parameters ---------- filename : str The filename to use for the library prepend_rates_dir : bool If ``True``, then output to the pynucastro rate file directory. """ if prepend_rates_dir: filename = get_rates_dir()/filename with filename.open("w") as f: for rate in self.get_rates(): rate.write_to_file(f)
def __repr__(self): """Return a string containing the rates IDs in this library.""" rstrings = [] tmp_rates = [v for k, v in self._rates.items()] for r in sorted(tmp_rates): if r.Q is not None and r.Q >= 0: rstrings.append(f'{r.__repr__():30} [Q = {float(r.Q):6.2f} MeV] ({r.id})') for r in sorted(tmp_rates): if r.Q is not None and r.Q < 0: rstrings.append(f'{r.__repr__():30} [Q = {float(r.Q):6.2f} MeV] ({r.id})') for r in sorted(tmp_rates): if r.Q is None: rstrings.append(f'{r.__repr__():30} ({r.id})') return '\n'.join(rstrings) def __add__(self, other): """Add two libraries to get a library containing rates from both. """ new_rates = self._rates for rid, r in other._rates.items(): if rid in new_rates: if r != new_rates[rid]: raise ValueError(f'rate {r} defined differently in libraries') else: new_rates[rid] = r new_library = Library(rates=new_rates) return new_library def __sub__(self, other): """Return a Library containing the rates in this library that are not contained in other_library """ diff_rates = set(self.get_rates()) - set(other.get_rates()) new_library = Library(rates=diff_rates) return new_library
[docs] def get_rate_by_nuclei(self, reactants, products): """Given a list of reactants and products, return any matching rates Parameters ---------- reactants : list of Nucleus or str the list of nuclei that serve as reactants. products : list of Nucleus or str the list of nuclei that serve as products. Returns ------- Rate, list(Rate) a list of Rate object or a single Rate (if there is only one) """ reactants = sorted(Nucleus.cast_list(reactants)) products = sorted(Nucleus.cast_list(products)) _tmp = [r for r in self.get_rates() if sorted(r.reactants) == reactants and sorted(r.products) == products] if not _tmp: return None if len(_tmp) == 1: return _tmp[0] return _tmp
[docs] def linking_nuclei(self, nuclist, *, with_reverse=True, print_warning=True): """Return a library containing the rates linking the list of nuclei passed in. Parameters ---------- nuclist : list of str or Nucleus the nuclei to link (either the string names or the Nucleus objects) with_reverse : bool do we include reverse rates? print_warning : bool if ``True``, then print a warning if one of the input nuclei is not linked. Returns ------- Library """ nucleus_set = set(Nucleus.cast_list(nuclist)) # Discard rates with nuclei that are not in nucleus_set filtered_rates = [] for r in self.get_rates(): include = True for nuc in r.reactants: if nuc not in nucleus_set: include = False break for nuc in r.products: if nuc not in nucleus_set: include = False break if not with_reverse and r.reverse: include = False if include: filtered_rates.append(r) # create a new library containing the filtered rates new_lib = Library(rates=filtered_rates) # print out a warning if one of the input nuclei is not linked if print_warning: lib_nuclei = new_lib.get_nuclei() for nuc in nucleus_set: if nuc not in lib_nuclei: print(f"warning: {nuc} was not able to be linked") return new_lib
[docs] def filter(self, filter_spec): """Filter the rates in the library based on a set of rules. Parameters ---------- filter_spec : RateFilter, list a filter (or list of filters) to apply to the library to define a subset of rates. Returns ------- Library """ if isinstance(filter_spec, RateFilter): filter_specifications = [filter_spec] else: filter_specifications = list(filter_spec) matching_rates = {} for rid, r in self._rates.items(): for f in filter_specifications: if f.matches(r): matching_rates[rid] = r break if matching_rates: return Library(rates=matching_rates) return None
[docs] def forward(self): """Select only the forward rates, discarding the inverse rates obtained by detailed balance. Note: "forward" here means that it is not a reverse rate derived from detailed balance, and does not necessarily mean Q > 0. Returns ------- Library """ only_fwd_filter = RateFilter(reverse=False) only_fwd = self.filter(only_fwd_filter) return only_fwd
[docs] def backward(self): """Select only the reverse rates, obtained by detailed balance. Note: "reverse" here means that it was derived by detailed balance, and not that Q < 0. Returns ------- Library """ only_bwd_filter = RateFilter(reverse=True) only_bwd = self.filter(only_bwd_filter) return only_bwd
[docs] def forward_for_detailed_balance(self): """Loop over the forward rates (as filtered by :py:meth:`.forward`) and return those that can be used to derive reverse rates via detailed balance. This means that they cannot be tabular or weak rates. Returns ------- Library """ collect_rates = [] onlyfwd = self.forward() for r in onlyfwd.get_rates(): try: DerivedRate(rate=r, compute_Q=False, use_pf=False) except ValueError: continue else: collect_rates.append(r) list1 = Library(rates=collect_rates) return list1
[docs] def derived_backward(self, compute_Q=False, use_pf=False): """Loop over all of the forward rates that can be used to derive inverse rates (as returned by :py:meth:`.forward_for_detailed_balance`) and derive the inverses, potentially taking into account the partition function and recomputing Q. Parameters ---------- compute_Q : bool do we recompute the Q value based on the masses? use_pf : bool do we use the temperature-dependent partition function? Returns ------- Library """ derived_rates = [] onlyfwd = self.forward_for_detailed_balance() for r in onlyfwd.get_rates(): try: i = DerivedRate(rate=r, compute_Q=compute_Q, use_pf=use_pf) except ValueError: continue else: derived_rates.append(i) onlybwd = Library(rates=derived_rates) return onlybwd
[docs] class RateFilter: """A RateFilter filters out a specified rate or set of rates A RateFilter stores selection rules specifying a rate or group of rates to assist in searching for rates stored in a Library. Parameters ---------- reactants : str, Nucleus, Iterable(Nucleus, str) the allowed reactant nuclei of the rates products : str, Nucleus, Iterable(Nucleus, str) the allowed products of the rates exact : bool if ``True``, products or reactants must match exactly. if ``False``, then all products or reactants must be found in a comparison rate, but the comparison may contain additional products or reactants reverse : bool if ``True``, only match reverse-derived rates if ``False``, only match directly-derived rates if None, you don't care, match both min_reactants : int match Rates that have at least this many reactants min_products : int match Rates that have at least this many products max_reactants : int match Rates that have no more than this many reactants max_products : int match Rates that have no more than this many products filter_function : Callable a function (``Rate`` -> ``bool``) that can take a single rate as an argument may be used to specify additional criteria, returning ``True`` if the rate meets all of them, ``False`` otherwise Examples -------- Create a filter that finds all proton capture and proton-burning reactions in a ``Library`` instance ``my_library``: >>> pcap_filter = RateFilter(reactants='p', exact=False) >>> pcap_library = my_library.filter(pcap_filter) or you can use ``Nucleus``: >>> pcap_filter = RateFilter(reactants=Nucleus('p'), exact=False) >>> pcap_library = my_library.filter(pcap_filter) Create a filter that finds O16(g,a)C12. Note: * photons/gammas are not treated as nuclides, so they cannot be a reactant or product * this rate is in the ReacLib library used here as O16 --> He4 + C12 --- you need to know how your library treats rates >>> cago_filter = RateFilter(reactants='o16', products=['c12', 'a']) >>> cago_library = my_library.filter(cago_filter) """ def __init__(self, reactants=None, products=None, exact=True, reverse=None, min_reactants=None, max_reactants=None, min_products=None, max_products=None, filter_function=None): self.reactants = [] self.products = [] self.exact = exact self.reverse = reverse self.min_reactants = min_reactants self.min_products = min_products self.max_reactants = max_reactants self.max_products = max_products self.filter_function = filter_function if reactants: self.reactants = Nucleus.cast_list(reactants, allow_single=True) if products: self.products = Nucleus.cast_list(products, allow_single=True) @staticmethod def _contents_equal(a, b): """ Return True if the contents of a and b exactly match, ignoring ordering. If either a or b is None, return True only if both a and b are None. """ if a and b: return len(a) == len(b) and sorted(a) == sorted(b) return (not a) and (not b) @staticmethod def _compare_nuclides(test, reference, exact=True): """ test and reference should be iterables of Nucleus objects. If an exact match is desired, test and reference should exactly match, ignoring ordering. Otherwise, return True only if every element of test appears at least one time in reference. """ matches = True if exact: matches = RateFilter._contents_equal(test, reference) else: for nuc in test: if nuc not in reference: matches = False break return matches
[docs] def matches(self, r): """Given a Rate r, see if it matches this RateFilter. Parameters ---------- r : Rate the rate we are looking for Returns ------- bool """ # do cheaper checks first matches_reverse = True matches_min_reactants = True matches_min_products = True matches_max_reactants = True matches_max_products = True if isinstance(self.reverse, bool): matches_reverse = self.reverse == r.reverse if isinstance(self.min_reactants, int): matches_min_reactants = len(r.reactants) >= self.min_reactants if isinstance(self.min_products, int): matches_min_products = len(r.products) >= self.min_products if isinstance(self.max_reactants, int): matches_max_reactants = len(r.reactants) <= self.max_reactants if isinstance(self.max_products, int): matches_max_products = len(r.products) <= self.max_products # exit early if any of these checks failed if not (matches_reverse and matches_min_reactants and matches_min_products and matches_max_reactants and matches_max_products): return False # now do more expensive checks, and exit immediately if any fail if self.reactants: if not self._compare_nuclides(self.reactants, r.reactants, self.exact): return False if self.products: if not self._compare_nuclides(self.products, r.products, self.exact): return False if self.filter_function is not None: if not self.filter_function(r): return False return True
[docs] def invert(self): """Return a RateFilter matching the inverse rate. Returns ------- RateFilter """ newfilter = RateFilter(reactants=self.products, products=self.reactants, exact=self.exact, reverse=self.reverse, min_reactants=self.min_products, max_reactants=self.max_products, min_products=self.min_reactants, max_products=self.max_reactants) return newfilter
[docs] class ReacLibLibrary(Library): """Create a :py:class:`Library` containing all of the rates in the latest stored version of the ReacLib library. """ def __init__(self): libfile = 'reaclib_default2_20250330' Library.__init__(self, libfile=libfile)
[docs] class TabularLibrary(Library): """Create a :py:class:`Library` containing all of the tabular rates we know (excluding duplications) across multiple sources. Parameters ---------- ordering : list of str The list of sources of the rates from lowest to highest precedence. We will read from the first source, and then for any later sources, for any duplicate rates, we will replace the existing rate with the version from the higher-priority library. The default ordering is ``["ffn", "oda", "langanke", "suzuki"]`` """ lib_path = Path(__file__).parents[1]/"library/tabular" def __init__(self, ordering=None): # find all of the tabular rates that pynucastro knows about # we'll assume that these are of the form *betadecay.dat or # *electroncapture.dat if ordering is None: ordering = ["ffn", "oda", "langanke", "suzuki"] trates = [] for source in ordering: source_dir = self.lib_path / Path(source) for _, _, filenames in sorted(walk(source_dir)): for f in sorted(filenames): if f.endswith("electroncapture.dat") or f.endswith("betadecay.dat"): r = TabularRate(rfile=source_dir / f) if r in trates: # we are looping over the various libraries in order # from lowest precedence to highest. So if the rate # exists, then delete it and add this one. Since # matching only looks at reactants and products, we # can use the new r for both deleting a adding trates.remove(r) trates.append(r) Library.__init__(self, rates=trates)
[docs] class SuzukiLibrary(TabularLibrary): """Create a :py:class:`Library` containing all of the tabular rates inside the "suzuki" subdirectory. """ def __init__(self): super().__init__(ordering=["suzuki"])
[docs] class LangankeLibrary(TabularLibrary): """Create a :py:class:`Library` containing all of the tabular rates inside the "langanke" subdirectory. """ def __init__(self): super().__init__(ordering=["langanke"])
[docs] class FFNLibrary(TabularLibrary): """Create a :py:class:`Library` containing all of the tabular rates inside the "ffn" subdirectory. """ def __init__(self): super().__init__(ordering=["ffn"])
[docs] class OdaLibrary(TabularLibrary): """Create a :py:class:`Library` containing all of the tabular rates inside the "oda" subdirectory. """ def __init__(self): super().__init__(ordering=["oda"])