Source code for sdp_par_model.evaluate

"""This file contains methods for programmatically evaluating the SKA SDP
parametric model.

from __future__ import print_function  # Makes Python-3 style print() function available in Python 2.x

import warnings

import numpy as np
from scipy import optimize as opt
from sympy import simplify, lambdify, Max, Lambda, FiniteSet, Function, Expr, Symbol
import math
import traceback
import itertools

from .parameters.definitions import Telescopes, Pipelines, Bands, Constants as c
from .parameters.container import ParameterContainer, BLDep

[docs]def is_literal(expression): """ Returns true iff the expression is already a literal (e.g. float or integer) value that cannot be substituted or evaluated further. Used to halt attempts at further evaluating symbolic expressions """ return isinstance(expression, (str, float, int, np.ndarray, list))
[docs]def evaluate_expression(expression, tp): """ Evaluate an expression by substituting the telescopec parameters into them. Depending on the type of expression, the result might be a value, a list of values (in case it is baseline-dependent) or a string (if evaluation failed). :param expression: the expression, expressed as a function of the telescope parameters, Tsnap and Nfacet :param tp: the telescope parameters (ParameterContainer object containing all relevant parameters) :return: """ # Already a plain value? if is_literal(expression): # Only evaluate symbolically when required return expression # Dictionary? Recurse if isinstance(expression, dict): return { k: evaluate_expression(e, tp) for k, e in expression.items() } # Otherwise try to evaluate using sympy try: # Baseline dependent? if isinstance(expression, BLDep): return [ float(expression(**subs)) for subs in tp.baseline_bins ] else: # Otherwise just evaluate directly return float(expression) except Exception as e: traceback.print_exc() warnings.warn("Failed to evaluate (%s): %s" % (e, str(expression))) return None
[docs]def optimize_lambdified_expr(lam, bound_lower, bound_upper): # Lower bound cannot be higher than the uppper bound. if bound_lower < bound_upper: result = opt.minimize_scalar(lam, bounds=(float(bound_lower), float(bound_upper)), method='bounded') if not result.success: warnings.warn('WARNING! : Was unable to optimize free variable. Using a value of: %f' % result.x) # else: # print ('Optimized free variable = %f' % result.x) # pass return result.x elif bound_lower > bound_upper: warnings.warn('Unable to optimize free variable as upper bound %g is lower than lower bound %g.' 'Adhering to lower bound.' % (bound_upper, bound_lower)) return bound_lower elif bound_lower == bound_upper: return bound_lower else: raise Exception("Computer says no.") # This should be impossible
[docs]def cheap_lambdify_curry(free_vars, expression): """ Translate sympy expression to an actual Python expression that can be evaluated quickly. This is roughly the same as sympy's lambdify, with a number of differences: 1. We only support a subset of functions. Note that sympy's list is incomplete as well, and actually has a wrong translation rule for "Max". 2. The return is curried, so for multiple "free_vars" (x, y) you will have to call the result as "f(x)(y)" instead of "f(x,y)". This means we can easily obtain a function that is specialised for a certain value of the outer variable. """ # Do "quick & dirty" translation. This map might need updating # when new functions get used in module = { 'Max': 'max', 'Min': 'min', 'ln': 'math.log', 'log': 'math.log', 'sqrt': 'math.sqrt', 'sign': 'np.sign', # No sign in math, apparently 'Abs': 'abs', 'sin': 'math.sin', 'cos': 'math.cos', 'acos': 'math.acos', 'acosh': 'math.acosh', 'arg': 'np.angle', 'asin': 'math.asin', 'asinh': 'math.asinh', 'atan': 'math.atan', 'atan2': 'math.atan2', 'atanh': 'math.atanh', 'ceiling': 'math.ceil', 'floor': 'math.floor' } expr_body = str(expression) for (sympy_name, numpy_name) in module.items(): expr_body = expr_body.replace(sympy_name + '(', numpy_name + '(') # Create head of lambda expression expr_head = '' for free_var in free_vars: expr_head += 'lambda ' + str(free_var) + ':' # Evaluate in order to build lambda return eval(expr_head + expr_body)
[docs]def minimise_parameters(telescope_parameters, expression_string = 'Rflop', expression = None, lower_bound = {}, upper_bound = {}, only_one_minimum = ['Nfacet'], verbose = False): """Computes the optimal value for free variables in telescope parameters such that it minimizes the value of an expression (typically Rflop). Returns result as a dictionary. :param telescope_parameters: Contains the definition of the expression that needs to be minimzed. This should be a symbolic expression that involves Deltaw_Stack and/or Nfacet. :param expression_string: The expression that should be minimized. This is typically assumed to be the computational load, but may also be, for example, buffer size. :param expression: TODO define :param lower_bound: Lower bound to use for symbols :param upper_bound: Upper bound to use for symbols :param only_one_minimum: Assume that the given (integer) symbol only has one minimum, so we can assume that any local optimum we find is the global optimum. :param verbose: Be more chatty about what's going on. """ assert isinstance(telescope_parameters, ParameterContainer) # Find symbols to optimise if expression is None: expression = telescope_parameters.get(expression_string) assert expression is not None, "Telescope parameters do not define %s!" % expression_string else: expression_string = str(expression) free_symbols = expression.free_symbols free_symbol_names = [ str(sym) for sym in free_symbols ] # Default bounds lower_bound = dict(lower_bound) upper_bound = dict(upper_bound) if 'Nfacet' in free_symbol_names: if 'Nfacet' not in lower_bound: lower_bound['Nfacet'] = 1 if 'Nfacet' not in upper_bound: upper_bound['Nfacet'] = 20 if 'Tsnap' in free_symbol_names: if 'Tsnap' not in lower_bound: lower_bound['Tsnap'] = telescope_parameters.get('Tsnap_min', 0.1, warn=False) if 'Tsnap' not in upper_bound: upper_bound['Tsnap'] = max(telescope_parameters.get('Tsnap_min', 0.1, warn=False), 0.5 * telescope_parameters.get('Tobs', 21600, warn=False)) if 'DeltaW_stack' in free_symbol_names: if 'DeltaW_stack' not in lower_bound: lower_bound['DeltaW_stack'] = 0.1 if 'DeltaW_stack' not in upper_bound: upper_bound['DeltaW_stack'] = telescope_parameters.DeltaW_max(telescope_parameters.Bmax) # We can only optimise one floating-point variable, and every symbol must have bounds float_symbol = None int_symbols = [] int_ranges = [] for sym in free_symbols: assert str(sym) in lower_bound, "Symbol %s must have a lower bound!" % sym assert str(sym) in upper_bound, "Symbol %s must have an upper bound!" % sym if not sym.is_integer: assert float_symbol is None, "Can only optimise for one float symbol at a time!" float_symbol = sym float_lower_bound = lower_bound[str(float_symbol)] float_upper_bound = upper_bound[str(float_symbol)] else: rnge = range(lower_bound[str(sym)], upper_bound[str(sym)]+1) # Sort symbols with only one minimum to the back to the list if str(sym) in only_one_minimum: int_symbols.append(sym) int_ranges.append(rnge) else: int_symbols.insert(0, sym) int_ranges.insert(0, rnge) # Construct lambda from integer parameters, then float parameter params = list(int_symbols) if float_symbol is not None: params.append(float_symbol) expression_lam = cheap_lambdify_curry(params, expression) # Loop over the different integer values results = [] skip_until = tuple() last_result = None result_got_worse = 0 for int_vals in itertools.product(*int_ranges): if int_vals < skip_until: continue if verbose: print('Evaluating', ', '.join(["%s=%d" % vi for vi in zip(int_symbols, int_vals)]), end='') # Set integer symbols expr = expression_lam #TODO Peter: what does the forloop below accomplish? for i in int_vals: expr = expr(i) # Optimise float symbol if float_symbol is None: opt_val = 0 result = float(expr) if verbose: print(' -> %s = %g' % (expression_string, result)) else: opt_val = optimize_lambdified_expr(expr, float_lower_bound, float_upper_bound) result = float(expr(opt_val)) if verbose: print(' -> %s=%g, %s=%g' % (float_symbol, opt_val, expression_string, result)) results.append((result,) + int_vals + (opt_val,)) # Check whether we can start skipping values (naive, can likely do better) if last_result is None: pass elif last_result >= result: result_got_worse = 0 elif str(int_symbols[-1]) in only_one_minimum: # Allow this a few times before we actually stop result_got_worse += 1 if result_got_worse >= 2: if len(int_vals) == 1: break # I.e. if we are at (1,2,3), skip until (1,3) skip_until = int_vals[:-1] skip_until[-1] += 1 last_result = result # Return parameters with lowest value result, *vals = results[np.argmin(np.array(results)[:,0])] if verbose: print(', '.join(["%s=%g" % vi for vi in zip(params, vals)]), ' yielded the lowest value of %s=%g' % (expression_string, result)) return dict(zip([str(p) for p in params],vals))
[docs]def evaluate_expressions(expressions, tp): """ Evaluate a sequence of expressions by substituting the telescope_parameters into them. :param expressions: An array of expressions to be evaluated :param tp: The set of telescope parameters that should be used to evaluate each expression """ return [ evaluate_expression(expr, tp) for expr in expressions ]
[docs]def collect_free_symbols(formulas): """ Returns the names of all free symbol in the given formulas. We always count all functions as free. :param formulas: Formulas to search for free symbols. """ def free_f(expr): if isinstance(expr, BLDep): expr = expr.term if not isinstance(expr, Expr): return set() functions = set(map(lambda f: str(f.func), expr.atoms(Function))) frees = set(map(lambda s: str(s), expr.free_symbols)) return set(frees).union(functions) return set().union(*list(map(free_f, formulas)))