Source code for sdp_par_model.config

from __future__ import print_function

# from builtins import int
import warnings

import numpy as np
import sympy
import pylru  # Install using pip (conda doesn't resolve this in my test)
import copy

from .parameters.container import ParameterContainer, BLDep
from .parameters import definitions as p
from .parameters.definitions import (Telescopes, Pipelines, Bands)
from .parameters import equations as f
# from .parameters.definitions import Constants as c
from . import evaluate

TEL_PARAM_CACHE = pylru.lrucache(1000)

[docs]class PipelineConfig: """ A full SDP pipeline configuration. This collects all data required to parameterise a pipeline. """ def __init__(self, telescope=None, pipeline=None, band=None, hpso=None, hpso_pipe=None, adjusts={}, **kwargs): """ :param telescope: Telescope to use (can be omitted if HPSO specified) :param pipeline: Pipeline mode (can be omitted if HPSO specified) :param band: Frequency band (can be omitted if HPSO specified) :param hpso: High Priority Science Objective ID (can be omitted if telescope, pipeline and band specified) :param hpso_pipe: The specific pipeline of the HPSO for which results are to be calculated :param adjusts: Values that should be adjusted in the telescope parameters. Keyword arguments get added to the adjustments automatically. Can be a string of the the form "name=val name2=val flag". """ params = ParameterContainer() # Alias for now if hpso_pipe is not None: pipeline = hpso_pipe # Load HPSO parameters if pipeline is None: raise ValueError("pipeline must be set!") if hpso is not None: assert hpso in p.HPSOs.hpso_telescopes assert pipeline is not None if telescope is not None or band is not None: raise Exception("(telescope + band) *XOR* hpso need to be set (i.e. not both)") self.hpso = hpso self.hpso_pipe = pipeline self.pipeline = pipeline self.telescope = p.HPSOs.hpso_telescopes[hpso] p.apply_telescope_parameters(params, self.telescope) p.apply_hpso_parameters(params, hpso, pipeline) if hasattr(params, 'pipeline'): pipeline = params.pipeline else: # This may be a bit of an outdated case; we mainly work in terms of HPSOs if (telescope is None) or (band is None): raise Exception("(telescope + band) *XOR* hpso need to be set (i.e. not both)") = band self.telescope = telescope self.pipeline = pipeline p.apply_telescope_parameters(params, self.telescope) p.apply_band_parameters(params, # Adjustments from keyword arguments if type(adjusts) == str: def mk_adjust(adjust): # Setting a field? fields = adjust.split('=') if len(fields) == 2: return (fields[0], eval(fields[1])) # Otherwise assume that it's a flag return (adjust, True) adjusts = dict(map(mk_adjust, adjusts.split(' '))) self.adjusts = dict(adjusts) self.adjusts.update(**kwargs) assert 'max_baseline' not in self.adjusts, 'Please use Bmax for consistency!' # Store max allowed baseline length, load default parameters self.max_allowed_baseline = params.baseline_bins[-1] if 'Bmax' not in self.adjusts: self.adjusts['Bmax'] = params.Bmax self.default_frequencies = params.Nf_max if 'Nf_max' not in self.adjusts: self.adjusts['Nf_max'] = params.Nf_max
[docs] def describe(self): """ Returns a name that identifies this configuration. """ # Identify by either (HPSO + pipeline), or (pipeline + band) if hasattr(self, "hpso"): name = self.hpso + ' (' + self.pipeline + ')' else: name = self.pipeline + ' (' + + ')' # Add modifiers for n, val in self.adjusts.items(): if n == 'Nf_max' and self.adjusts[n] == self.default_frequencies: continue if n == 'Bmax' and self.adjusts[n] == self.max_allowed_baseline: continue if n == 'on_the_fly': n = 'otf' if n == 'scale_predict_by_facet': n = 'spbf' if val == True: name += ' [%s]' % n elif val == False: name += ' [!%s]' % n else: name += ' [%s=%s]' % (n, val) return name
[docs] def telescope_and_band_are_compatible(self): """ Checks whether the supplied telescope and band are compatible with each other. """ is_compatible = False telescope = self.telescope band = if telescope in Telescopes.available_teles: is_compatible = (band in Bands.telescope_bands[telescope]) else: raise ValueError("Unknown telescope %s" % telescope) return is_compatible
[docs] def is_valid(self, pure_pipelines=True): """Checks integrity of the pipeline configuration. :return: (okay?, list of errors/warnings) """ messages = [] okay = True # Maximum baseline if self.adjusts['Bmax'] > self.max_allowed_baseline: messages.append('WARNING: Bmax (%g m) exceeds the maximum ' \ 'allowed baseline of %g m for telescope \'%s\'.' \ % (self.adjusts['Bmax'], self.max_allowed_baseline, self.telescope)) # Only pure pipelines supported? if pure_pipelines: if self.pipeline not in Pipelines.pure_pipelines: messages.append("ERROR: The '%s' imaging pipeline is currently not supported" % str(self.pipeline)) okay = False # Band compatibility. Can skip for HPSOs, as they override the # band manually. if (hasattr(self, "hpso") and hasattr(self, "band")) or (not (hasattr(self, "hpso") or hasattr(self, "band"))): messages.append("ERROR: Either the Imaging Band or an HPSO needs to be defined (and not both).") okay = False if not (hasattr(self, "hpso") or self.telescope_and_band_are_compatible()): messages.append("ERROR: Telescope '%s' and band '%s' are not compatible" % (str(self.telescope), str( okay = False if not hasattr(self, "pipeline"): messages.append("ERROR: Configuration has no defined pipeline") okay = False return (okay, messages)
[docs] def calc_tel_params(cfg, verbose=False, adjusts={}, symbolify='', optimize_expression='Rflop', clear_symbolised=None): """ Calculates telescope parameters for this configuration. Some values may (optionally) be overwritten, e.g. the maximum baseline or number of frequency channels. :param cfg: Valid pipeline configuration :param verbose: How chatty we are supposed to be :param adjusts: Dictionary of telescope parameters to adjust :param symbolify: Generate symbolified telescope parameters :param optimize_expression: Set free symbols in a way that minimises given telescope parameter (only if symbolify is not set) :param clear_symbolised: Whether to clear parameters with free symbols after optimisation. (only if symbolify is not set. Default on if optimize_expression is not None.) """ assert cfg.is_valid()[0], "calc_tel_params must be called for a valid pipeline configuration!" if clear_symbolised is None: clear_symbolised = (optimize_expression is not None) # Check whether we have a cached copy tel_param_desc = (cfg.describe(), str(adjusts.items()), symbolify, optimize_expression, clear_symbolised) if not verbose: if tel_param_desc in TEL_PARAM_CACHE: return copy.deepcopy(TEL_PARAM_CACHE[tel_param_desc]) telescope_params = ParameterContainer() p.apply_global_parameters(telescope_params) p.define_symbolic_variables(telescope_params) # Note the order in which these settings are applied. # Each one (possibly) overwrites previous definitions if they should they overlap # (as happens with e.g. frequency bands) # First: The telescope's parameters (Primarily the number of dishes, bands, beams and baselines) p.apply_telescope_parameters(telescope_params, cfg.telescope) # Then define pipeline and frequency-band # Includes frequency range, Observation time, number of cycles, quality factor, number of channels, etc. if hasattr(cfg, "hpso"): # Note the ordering; HPSO parameters get applied last, and therefore have the final say p.apply_pipeline_parameters(telescope_params, cfg.pipeline) p.apply_hpso_parameters(telescope_params, cfg.hpso, cfg.pipeline) elif hasattr(cfg, "band"): p.apply_band_parameters(telescope_params, p.apply_pipeline_parameters(telescope_params, cfg.pipeline) # Apply parameter adjustments. Needs to be done before bin # calculation in case Bmax gets changed. Note that an # overwrite is required, i.e. the parameter must exist. for par, value in cfg.adjusts.items(): telescope_params.__dict__[par] = value for par, value in adjusts.items(): telescope_params.__dict__[par] = value if telescope_params.blcoal: # Limit bins to those shorter than Bmax bins = telescope_params.baseline_bins nbins_used = min(bins.searchsorted(telescope_params.Bmax) + 1, len(bins)) bins = bins[:nbins_used] # Same for baseline sizes. Note that we normalise /before/ # reducing the list. binfracs = telescope_params.baseline_bin_distribution binfracs /= sum(binfracs) binfracs = binfracs[:nbins_used] # Calculate old and new bin sizes binsize = bins[nbins_used-1] binsizeNew = telescope_params.Bmax if nbins_used > 1: binsize -= bins[nbins_used-2] binsizeNew -= bins[nbins_used-2] # Scale last bin bins[nbins_used-1] = telescope_params.Bmax binfracs[nbins_used-1] *= float(binsizeNew) / float(binsize) if verbose: print("Baseline coalescing on") else: if verbose: print("Baseline coalescing off") telescope_params.baseline_bins = np.array((telescope_params.Bmax,)) # m telescope_params.baseline_bin_distribution = np.array((1.0,)) bins = [telescope_params.Bmax] binfracs=[1.0] # Apply imaging equations f.apply_imaging_equations(telescope_params, cfg.pipeline, bins, binfracs, verbose, symbolify) # Free symbols to minimise? if symbolify == '' and optimize_expression is not None and \ telescope_params.get(optimize_expression) is not None and \ isinstance(telescope_params.get(optimize_expression), sympy.Expr) and \ len(telescope_params.get(optimize_expression).free_symbols) > 0: # Minimise substs = evaluate.minimise_parameters(telescope_params, optimize_expression, verbose=verbose) telescope_params = telescope_params.subs(substs) # Clear unoptimised values? if symbolify == '' and clear_symbolised: telescope_params.clear_symbolised() # Cache, return TEL_PARAM_CACHE[tel_param_desc] = copy.deepcopy(telescope_params) return telescope_params
[docs] def eval_expression_products(pipelineConfig, expression='Rflop', verbose=False): """ Evaluate a parameter sum for each product :param pipelineConfig: Pipeline configuration to use :param expression: Procuct parameter to evaluate :param verbose: Verbosity to use for `calc_tel_params` """ values={} tp = pipelineConfig.calc_tel_params(verbose) # Loop through defined products, add to result for name, product in tp.products.items(): if expression in product: values[name] = values.get(name, 0) + \ evaluate.evaluate_expression(product[expression], tp) return values
[docs] def eval_param_sweep_1d(pipelineConfig, expression_string='Rflop', parameter_string='Rccf', param_val_min=10, param_val_max=10, number_steps=1, verbose=False): """Evaluates an expression for a range of different parameter values, by varying the parameter linearly in a specified range in a number of steps :param pipelineConfig: :param expression_string: The expression that needs to be evaluated, as string (e.g. "Rflop") :param parameter_string: the parameter that will be swept - written as text (e.g. "Bmax") :param param_val_min: minimum value for the parameter's value sweep :param param_val_max: maximum value for the parameter's value sweep :param number_steps: the number of *intervals* that will be used to sweep the parameter from min to max :param verbose: :return: Pair of parameter values and results :raise AssertionError: """ assert param_val_max > param_val_min print("Starting sweep of parameter %s, evaluating expression %s over range (%s, %s) in %d steps " "(i.e. %d data points)" % (parameter_string, expression_string, str(param_val_min), str(param_val_max), number_steps, number_steps + 1)) param_values = np.linspace(param_val_min, param_val_max, num=number_steps + 1) results = [] for i in range(len(param_values)): # Calculate telescope parameter with adjusted parameter adjusts = {parameter_string: param_values[i]} tp = pipelineConfig.calc_tel_params(verbose, adjusts=adjusts) percentage_done = i * 100.0 / len(param_values) print("> %.1f%% done: Evaluating %s for %s = %g" % (percentage_done, expression_string, parameter_string, param_values[i])) # Perform a check to see that the value of the assigned parameter wasn't changed by the imaging equations, # otherwise the assigned value would have been lost (i.e. not a free parameter) parameter_final_value = tp.get(parameter_string) eta = 1e-10 if abs((parameter_final_value - param_values[i])/param_values[i]) > eta: raise AssertionError('Value assigned to %s seems to be overwritten after assignment ' 'by the method compute_derived_parameters(). (%g -> %g). ' 'Cannot peform parameter sweep.' % (parameter_string, param_values[i], parameter_final_value)) if expression_string.find(".") >= 0: product, expr = expression_string.split(".") result_expression = tp.products[product].get(expr, 0) else: result_expression = tp.get(expression_string) results.append(evaluate.evaluate_expression(result_expression, tp)) print('done with parameter sweep!') return (param_values, results)
[docs] def eval_param_sweep_2d(pipelineConfig, expression_string='Rflop', parameters=None, params_ranges=None, number_steps=2, verbose=False): """ Evaluates an expression for a 2D grid of different values for two parameters, by varying each parameter linearly in a specified range in a number of steps. Similar to :meth:`eval_param_sweep_1d`, except that it sweeps a 2D parameter space, returning a matrix of values. :param pipelineConfig: :param expression_string: The expression that needs to be evalued, as string (e.g. "Rflop") :param parameters: The two parameters to sweep :param params_ranges: Ranges to use for parameters :param number_steps: The number of *intervals* that will be used to sweep the parameters from min to max :param verbose: :returns: Triple of parameter values (both) and results """ assert (parameters is not None) and (len(parameters) == 2) assert (params_ranges is not None) and (len(params_ranges) == 2) for prange in params_ranges: assert len(prange) == 2 assert prange[1] > prange[0] n_param_x_values = number_steps + 1 n_param_y_values = number_steps + 1 nr_evaluations = n_param_x_values * n_param_y_values # The number of function evaluations that will be required print("Evaluating expression %s while\nsweeping parameters %s and %s over 2D domain [%s, %s] x [%s, %s] in %d " "steps each,\nfor a total of %d data evaluation points" % (expression_string, parameters[0], parameters[1], str(params_ranges[0][0]), str(params_ranges[0][1]), str(params_ranges[1][0]), str(params_ranges[1][1]), number_steps, nr_evaluations)) param_x_values = np.linspace(params_ranges[0][0], params_ranges[0][1], num=n_param_x_values) param_y_values = np.linspace(params_ranges[1][0], params_ranges[1][1], num=n_param_y_values) results = np.zeros((n_param_x_values, n_param_y_values)) # Create an empty numpy matrix to hold results # Nested 2D loop over all values for param1 and param2. Indexes iterate over y (inner loop), then x (outer loop) for ix in range(n_param_x_values): param_x_value = param_x_values[ix] for iy in range(n_param_y_values): param_y_value = param_y_values[iy] # Overwrite the corresponding fields of tp with the to-be-evaluated values adjusts = { parameters[0]: param_x_value, parameters[1]: param_y_value, } tp = pipelineConfig.calc_tel_params(verbose, adjusts=adjusts) percentage_done = (ix * n_param_y_values + iy) * 100.0 / nr_evaluations print("> %.1f%% done: Evaluating %s for (%s, %s) = (%s, %s)" % (percentage_done, expression_string, parameters[0], parameters[1], str(param_x_value), str(param_y_value))) # Perform a check to see that the value of the assigned parameters weren't changed by the imaging # equations, otherwise the assigned values would have been lost (i.e. not free parameters) parameter1_final_value = tp.get(parameters[0]) parameter2_final_value = tp.get(parameters[1]) eta = 1e-10 if abs((parameter1_final_value - param_x_value) / param_x_value) > eta: raise AssertionError('Value assigned to %s seems to be overwritten after assignment ' 'by the method compute_derived_parameters(). Cannot peform parameter sweep.' % parameters[0]) if abs((parameter2_final_value - param_y_value) / param_y_value) > eta: print(parameter2_final_value) print(param_y_value) raise AssertionError('Value assigned to %s seems to be overwritten after assignment ' 'by the method compute_derived_parameters(). Cannot peform parameter sweep.' % parameters[1]) result_expression = tp.get(expression_string) results[iy, ix] = evaluate.evaluate_expression(result_expression, tp) print('done with parameter sweep!') return (param_x_values, param_y_values, results)
[docs] def eval_products_symbolic(pipelineConfig, expression='Rflop', symbolify='product'): """ Returns formulas for the given product property. :param pipelineConfig: Pipeline configuration to use. :param expression: Product property to query. FLOP rate by default. :param symbolify: How aggressively sub-formulas should be replaced by symbols. """ # Create symbol-ified telescope model tp = pipelineConfig.calc_tel_params(symbolify=symbolify) # Collect equations and free variables eqs = {} for product in tp.products: eqs[product] = tp.products[product].get(expression, 0) return eqs
[docs] def eval_symbols(pipelineConfig, symbols, recursive=False, symbolify='', optimize_expression=None): """Returns formulas for the given symbol names. This can be used to look up the definitions behind sympy Symbols returned by eval_products_symbolic or this function. The returned dictionary will contain an entry for all symbols that we could look up sucessfully - this excludes symbols that are not defined or have only a tautological definition ("sym = sym"). :param pipelineConfig: Pipeline configuration to use. :param symbols: Symbols to query :param recursive: Look up free symbols in symbol definitions? :param symbolify: How aggressively sub-formulas should be replaced by symbols. """ # Create possibly symbol-ified telescope model tp = pipelineConfig.calc_tel_params(symbolify=symbolify) # Optimise to settle Tsnap and Nfacet if not optimize_expression is None: assert(symbolify == '') # Will likely fail otherwise tp = pipelineConfig.calc_tel_params(optimize_expression=optimize_expression) # Create lookup map for symbols symMap = {} for name, v in tp.__dict__.items(): symMap[tp.make_symbol_name(name)] = v # Start collecting equations eqs = {} while len(symbols) > 0: new_symbols = set() for sym in symbols: if sym in eqs: continue # Look up if not sym in symMap: continue v = symMap[str(sym)] # If the equation is "name = name", it is not defined at this level. Push back to next level if isinstance(v, sympy.Symbol) and str(v) == sym: continue eqs[str(sym)] = v if isinstance(v, sympy.Expr) or isinstance(v, BLDep): new_symbols = new_symbols.union(evaluate.collect_free_symbols([v])) symbols = new_symbols return eqs