Source code for pynucastro.reduction.drgep

import numpy as np

from pynucastro.nucdata import Nucleus
from pynucastro.reduction.reduction_utils import (mpi_importer,
                                                  mpi_numpy_decomp, to_list)

MPI = mpi_importer()


[docs] def calc_interaction_matrix(net, rvals): """Calculate direct interaction coefficients.""" N_species = len(net.unique_nuclei) # create index mapping j_map = {} for idx, n in enumerate(net.unique_nuclei): j_map[n] = idx # Calculate coefficients for A p_A = np.zeros(N_species, dtype=np.float64) c_A = np.zeros(N_species, dtype=np.float64) # A along rows, B along columns p_AB = np.zeros((N_species, N_species)) c_AB = np.zeros((N_species, N_species)) for i in range(N_species): n = net.unique_nuclei[i] for r in net.nuclei_produced[n]: rval = r.products.count(n) * rvals[r] p_A[i] += rval bs = set(r.products) | set(r.reactants) for b in bs: p_AB[i, j_map[b]] += rval for r in net.nuclei_consumed[n]: rval = r.reactants.count(n) * rvals[r] c_A[i] += rval bs = set(r.products) | set(r.reactants) for b in bs: c_AB[i, j_map[b]] += rval denom = np.maximum(p_A, c_A)[:, np.newaxis] # Calculate direct interaction coefficients r_AB = np.abs(p_AB - c_AB) / denom return r_AB
[docs] def calc_interaction_matrix_numpy(net, rvals_arr): """Calculate direct interaction coefficients using NumPy.""" # Evaluate terms on RHS of ODE system prod_terms = net.nuc_prod_count * rvals_arr cons_terms = net.nuc_cons_count * rvals_arr # Calculate total production and consumption of each nucleus A p_A = prod_terms.sum(axis=1) c_A = cons_terms.sum(axis=1) # Calculate production / consumption of A in reactions involving B p_AB = prod_terms @ net.nuc_used c_AB = cons_terms @ net.nuc_used # We will normalize by maximum of production and consumption fluxes denom = np.maximum(p_A, c_A)[:, np.newaxis] # Calculate direct interaction coefficients r_AB = np.abs(p_AB - c_AB) / denom return r_AB
[docs] def get_adj_nuc(net): """ Get set of adjacent nuclei for each nucleus in the net. Returns dictionary keyed by nuclei. """ adj_nuc = {} for n in net.unique_nuclei: bs = set() for r in net.nuclei_produced[n]: bs |= set(r.products) | set(r.reactants) for r in net.nuclei_consumed[n]: bs |= set(r.products) | set(r.reactants) adj_nuc[n] = bs return adj_nuc
[docs] def drgep_dijkstras(net, r_AB, target, adj_nuc): """ Modified Dijkstra's algorithm to find paths that maximize the overall interaction coefficient between the target and each other nucleus in the net. """ # Number of species nspec = len(net.unique_nuclei) # Create data structures j_map = {} inf = float('inf') dist = np.zeros(nspec, dtype=np.float64) R_TB = np.zeros(nspec, dtype=np.float64) for idx, n in enumerate(net.unique_nuclei): j_map[n] = idx dist[idx] = -inf R_TB[idx] = -inf dist[j_map[target]] = 1.0 R_TB[j_map[target]] = inf imax = np.argmax(dist) # Main loop while dist[imax] > -inf: r_TA = dist[imax] R_TB[imax] = r_TA n = net.unique_nuclei[imax] bs = adj_nuc[n] for b in bs: jb = j_map[b] if R_TB[jb] > -inf: continue dist[jb] = max(dist[jb], r_TA * r_AB[imax, jb]) dist[imax] = -inf imax = np.argmax(dist) return R_TB
def _drgep_kernel(net, R_TB, rvals, targets, tols, adj_nuc): r_AB = calc_interaction_matrix(net, rvals) for target, tol in zip(targets, tols): R_TB_i = drgep_dijkstras(net, r_AB, target, adj_nuc) np.maximum(R_TB, R_TB_i, out=R_TB, where=R_TB_i >= tol) def _drgep_kernel_numpy(net, R_TB, rvals, targets, tols, adj_nuc): r_AB = calc_interaction_matrix_numpy(net, rvals) for target, tol in zip(targets, tols): R_TB_i = drgep_dijkstras(net, r_AB, target, adj_nuc) np.maximum(R_TB, R_TB_i, out=R_TB, where=R_TB_i >= tol) def _drgep(net, conds, targets, tols): #----------------------------------- # Calculate interaction coefficients #----------------------------------- R_TB = np.zeros(len(net.unique_nuclei), dtype=np.float64) adj_nuc = get_adj_nuc(net) for comp, rho, T in conds: rvals = net.evaluate_rates(rho=rho, T=T, composition=comp) _drgep_kernel(net, R_TB, rvals, targets, tols, adj_nuc) return R_TB def _drgep_mpi(net, conds, targets, tols): #---------- # Init. MPI #---------- comm = MPI.COMM_WORLD MPI_N = comm.Get_size() MPI_rank = comm.Get_rank() #----------------------------------------------- # Calculate interaction coefficients in parallel #----------------------------------------------- R_TB_loc = np.zeros(len(net.unique_nuclei), dtype=np.float64) adj_nuc = get_adj_nuc(net) for i in range(MPI_rank, len(conds), MPI_N): comp, rho, T = conds[i] rvals = net.evaluate_rates(rho=rho, T=T, composition=comp) _drgep_kernel(net, R_TB_loc, rvals, targets, tols, adj_nuc) R_TB = np.zeros_like(R_TB_loc) comm.Allreduce([R_TB_loc, MPI.DOUBLE], [R_TB, MPI.DOUBLE], op=MPI.MAX) return R_TB def _drgep_numpy(net, conds, targets, tols): #---------------------------------------- # Unpack conditions; make precalculations #---------------------------------------- comp_L, rho_L, T_L = conds adj_nuc = get_adj_nuc(net) #------------------------------------------------ # Calculate interaction coefficients (vectorized) #------------------------------------------------ R_TB = np.zeros(len(net.unique_nuclei), dtype=np.float64) for comp in comp_L: net.update_yfac_arr(comp) for rho in rho_L: net.update_prefac_arr(rho, comp) for T in T_L: rvals_arr = net.evaluate_rates_arr(T) _drgep_kernel_numpy(net, R_TB, rvals_arr, targets, tols, adj_nuc) net.clear_arrays() return R_TB def _drgep_mpi_numpy(net, conds, targets, tols): #---------------------------------------- # Unpack conditions; make precalculations #---------------------------------------- n = tuple(map(len, conds)) comp_L, rho_L, T_L = conds adj_nuc = get_adj_nuc(net) #----------------------------------- # Init. MPI and divide up conditions #----------------------------------- comm = MPI.COMM_WORLD MPI_N = comm.Get_size() MPI_rank = comm.Get_rank() comp_idx, comp_step, rho_idx, rho_step, T_idx, T_step = mpi_numpy_decomp(MPI_N, MPI_rank, n) #-------------------------------------------------------------- # Calculate interaction coefficients (vectorized and using MPI) #-------------------------------------------------------------- R_TB_loc = np.zeros(len(net.unique_nuclei), dtype=np.float64) for i in range(comp_idx, n[0], comp_step): comp = comp_L[i] net.update_yfac_arr(comp) for j in range(rho_idx, n[1], rho_step): rho = rho_L[j] net.update_prefac_arr(rho, comp) for k in range(T_idx, n[2], T_step): T = T_L[k] rvals_arr = net.evaluate_rates_arr(T) _drgep_kernel_numpy(net, R_TB_loc, rvals_arr, targets, tols, adj_nuc) R_TB = np.zeros_like(R_TB_loc) comm.Allreduce([R_TB_loc, MPI.DOUBLE], [R_TB, MPI.DOUBLE], op=MPI.MAX) net.clear_arrays() return R_TB
[docs] def drgep(net, conds, targets, tols, returnobj='net', use_mpi=False, use_numpy=False): """ Implementation of Directed Relation Graph with Error Propagation (DRGEP) reduction method described in :cite:t:`pepiot-desjardins:2008` and :cite:t:`niemeyer:2011`. :param net: The network (RateCollection) to reduce. :param conds: A set of conditions to reduce over. Should either be a sequence of (composition, density, temperature) sequences/tuples if running in standard mode, or a sequence of 3 sequences ((composition, density, temperature) ordering) if running in NumPy mode. In the latter case, the sequences will be permuted to create the dataset. The compositions should be pynucastro Composition objects. :param targets: A collection of target nuclei (or a single target nucleus) to run the graph search algorithm from. Should be supplied as pynucastro Nucleus objects or strings. :param tols: Tolerance(s) or cutoff threshold(s) to use for paths from each of the target nuclei. Nuclei whose interaction coefficients do not meet the specified tolerance will have their interaction coefficients set to 0.0. Can be a single number (will be the same for all targets) or a separate value for each target nucleus. :param returnobj: The type of object to return. Options are 'net' (a reduced network, the default setting), 'nuclei' (unique nuclei with nonzero interaction coefficients, ordered so the interaction coefficients are descending), and 'coeff' (the interaction coefficients as a NumPy array, with entries corresponding to nuclei in *net.unique_nuclei*). :param use_mpi: Whether to divide up the set of conditions across MPI processes or not. Default setting is *False*. :param use_numpy: Whether to use NumPy to vectorize the interaction coefficient calculations or not. This is more memory intensive and may actually hinder performance for some setups. Conditions should be supplied as 3 lists that will be permuted to form the dataset (see *conds* parameter). Default setting is *False*. """ #------------------ # Process arguments #------------------ if returnobj not in {'net', 'nuclei', 'coeff'}: raise ValueError(f"Invalid 'returnobj' argument: '{returnobj}'.") targets = Nucleus.cast_list(targets) tols = to_list(tols, len(targets)) #------------------------------------------------------- # Determine operation mode and launch appropriate helper #------------------------------------------------------- if use_mpi: if use_numpy: R_TB = _drgep_mpi_numpy(net, conds, targets, tols) else: R_TB = _drgep_mpi(net, conds, targets, tols) elif use_numpy: R_TB = _drgep_numpy(net, conds, targets, tols) else: R_TB = _drgep(net, conds, targets, tols) #------------------------ # Return requested object #------------------------ if returnobj == 'net': nuclei = [net.unique_nuclei[i] for i in range(len(net.unique_nuclei)) if R_TB[i] > 0.0] return net.linking_nuclei(nuclei) if returnobj == 'nuclei': idx = sorted(range(len(R_TB)), key=lambda i: R_TB[i], reverse=True) return [net.unique_nuclei[i] for i in idx if R_TB[i] > 0.0] if returnobj == 'coeff': return R_TB # returnobj was checked at the start of the function assert False