Source code for vega.vega_interface

"""Main module."""
import os.path
import numpy as np
import scipy.stats
from astropy.io import fits
import configparser
import copy

from . import correlation_item, data, utils
from vega.scale_parameters import ScaleParameters
from vega.model import Model
from vega.minimizer import Minimizer
from vega.analysis import Analysis
from vega.output import Output
from vega.parameters.param_utils import get_default_values
from vega.plots.plot import VegaPlots

BLIND_FIXED_PARS = ['ap_full', 'at_full', 'aiso_full', 'epsilon_full',
                    'phi_smooth', 'alpha_smooth', 'phi_full', 'alpha_full',
                    'growth_rate']


[docs]class VegaInterface: """Main Vega class. Parse the main config and initialize a correlation item for each component. If there is data, initialize data and model objects for each component. Handle the parameter config and call the analysis class. """ _blind = None def __init__(self, main_path): """ Parameters ---------- main_path : string Path to main.ini config file """ # Read the main config file self.main_config = configparser.ConfigParser() self.main_config.optionxform = lambda option: option self.main_config.read(utils.find_file(main_path)) # Read the fiducial pk file self.fiducial = self._read_fiducial(self.main_config['fiducial']) # Read the effective redshift and the data config paths self.fiducial['z_eff'] = self.main_config['data sets'].getfloat('zeff') write_cf = self.main_config['output'].getboolean('write_cf', False) write_pk = self.main_config['output'].getboolean('write_pk', False) self.fiducial['save-components'] = write_cf or write_pk ini_files = self.main_config['data sets'].get('ini files').split() self.model_pk = self.main_config['control'].getboolean('model_pk', False) # Initialize the individual components self.corr_items = {} for path in ini_files: config = configparser.ConfigParser() config.optionxform = lambda option: option config.read(utils.find_file(os.path.expandvars(path))) name = config['data'].get('name') self.corr_items[name] = correlation_item.CorrelationItem(config, self.model_pk) # Check if all correlations have data files self.data = {} self._has_data = True for name, corr_item in self.corr_items.items(): if not corr_item.has_data: self._has_data = False # Initialize the data for name, corr_item in self.corr_items.items(): if self._has_data: self.data[name] = data.Data(corr_item) else: self.data[name] = None # Initialize scale parameters self.scale_params = ScaleParameters(self.main_config['cosmo-fit type']) # initialize the models self.models = {} if self._has_data: for name, corr_item in self.corr_items.items(): self.models[name] = Model(corr_item, self.fiducial, self.scale_params, self.data[name]) # Read parameters self.params = self._read_parameters(self.corr_items, self.main_config['parameters']) self.sample_params = self._read_sample(self.main_config['sample']) # Check blinding self._blind = False if self._has_data: for data_obj in self.data.values(): if data_obj.blind: self._blind = True # Apply blinding if self._blind: for par in self.sample_params['limits'].keys(): if par in BLIND_FIXED_PARS: raise ValueError(f'Running on blind data, parameter {par} must be fixed.') if ('bias_QSO' in self.sample_params['limits']) and ( 'beta_QSO' in self.sample_params['limits']): print('WARNING! Running on blind data and sampling bias_QSO and beta_QSO.') # Get priors self.priors = {} if 'priors' in self.main_config: self.priors = self._init_priors(self.main_config['priors']) for param in self.priors.keys(): if param not in self.sample_params['limits'].keys(): print('Warning: Prior specified for a parameter that is' ' not sampled!') # Read the monte carlo parameters self.mc_config = None if 'monte carlo' in self.main_config: self.mc_config = {} config = self.main_config['monte carlo'] self.mc_config['params'] = copy.deepcopy(self.params) mc_params = self.main_config['mc parameters'] for param, value in mc_params.items(): self.mc_config['params'][param] = float(value) self.mc_config['sample'] = self._read_sample(config) # Initialize the minimizer and the analysis objects if not self.sample_params['limits']: self.minimizer = None else: self.minimizer = Minimizer(self.chi2, self.sample_params) self.analysis = Analysis(Minimizer(self.chi2, self.sample_params), self.main_config, self.mc_config) # Check for sampler self.has_sampler = False if 'control' in self.main_config: self.has_sampler = self.main_config['control'].getboolean('sampler', False) if self.has_sampler: if 'Polychord' not in self.main_config: raise RuntimeError('run_sampler called, but no sampler initialized') self.output = Output(self.main_config['output'], self.data, self.corr_items, self.analysis) self.monte_carlo = False self.plots = None if self._has_data: self.plots = VegaPlots(vega_data=self.data)
[docs] def compute_model(self, params=None, run_init=True, direct_pk=None): """Compute correlation function model using input parameters. Parameters ---------- params : dict, optional Computation parameters, by default None run_init: boolean, optional Whether to run model.init() before computing the model, by default True direct_pk: 1D array or None, optional If not None, the full Pk (e.g. from CLASS/CAMB) to be used directly, by default None Returns ------- dict Dictionary of cf models for each component """ # Overwrite computation parameters local_params = copy.deepcopy(self.params) if params is not None: for par, val in params.items(): local_params[par] = val # Go through each component and compute the model cf model_cf = {} if run_init: self.models = {} for name, corr_item in self.corr_items.items(): if run_init: self.models[name] = Model(corr_item, self.fiducial, self.scale_params, self.data[name]) if direct_pk is None: model_cf[name] = self.models[name].compute(local_params, self.fiducial['pk_full'], self.fiducial['pk_smooth']) else: model_cf[name] = self.models[name].compute_direct(local_params, direct_pk) return model_cf
[docs] def compute_sensitivity(self, nominal=None, frac=0.1, verbose=True): """Compute the model sensitivity to each floating parameter. Calculate numerical partial derivatives of the model with respect to each floating pararameter, evaluated at a specified point in parameter space. Calculate Fisher information distributed over bins of (rt,rp). Results are stored in a dictionary attribute named `sensitivity` with keys `nominal`, `partials`, and `fisher`. Parameters ---------- nominal : dict or None Dictionary of (value,error) tuples for each floating parameter. Uses the results of the last call to minimize when None, or raises a RuntimeError when minimize has not yet been called. frac : float Estimate partial derivatives of the likelihood using central finite differences at value +/- frac * error for each floating parameter. verbose : bool Print progress of the computation when True. """ # Copy the baseline parameters to use. if nominal is None: if self.bestfit.params is None: raise RuntimeError('No nominal parameter values provided or saved by minimize()') nominal = {p.name: ( p.value, p.error ) for p in self.bestfit.params} nfloating = len(nominal) ninfo = (nfloating * (nfloating + 1)) // 2 params = copy.deepcopy(self.params) for pname, (pvalue, perror) in nominal.items(): params[pname] = pvalue # Initialize the sensitivity results. self.sensitivity = dict(nominal = copy.deepcopy(nominal), partials={ }, fisher={ }) for n in self.corr_items: rp, rt = self.corr_items[n].rp_rt_grid self.sensitivity['partials'][n] = np.zeros((nfloating, 2, 2, len(rp))) self.sensitivity['fisher'][n] = np.zeros((ninfo, 2, len(rp))) # Loop over fit parameters self.fiducial['save-components'] = True bao_amp = self.params['bao_amp'] for pindex, (pname, (pvalue, perror)) in enumerate(nominal.items()): if verbose: print(f'Calculating sensitivity for [{pindex}] {pname} at {pvalue:.4f} ± {perror:.4f}') # Compute partial derivatives wrt to p for each multipole delta = frac * perror for sign in (+1, -1): params[pname] = pvalue + sign * delta # Compute the model for all datasets. cfs = self.compute_model(params, run_init=True) # Loop over datasets to update the partial derivative calculations. for n, cf in cfs.items(): model = self.models[n] # Distorted peak self.sensitivity['partials'][n][pindex,0,0] += sign * bao_amp * model.xi_distorted['peak']['core'] # Distorted smooth self.sensitivity['partials'][n][pindex,0,1] += sign * model.xi_distorted['smooth']['core'] # Undistorted peak self.sensitivity['partials'][n][pindex,1,0] += sign * bao_amp * model.xi['peak']['core'] # Distorted smooth self.sensitivity['partials'][n][pindex,1,1] += sign * model.xi['smooth']['core'] # Normalize the partial derivatives. for n in self.corr_items: self.sensitivity['partials'][n][pindex] /= 2 * delta # Restore the fitted parameter value. params[pname] = pvalue # Loop over pairs of fit parameters. if verbose: print('Computing Fisher information for each pair of parameters...') idx = 0 for pindex1, (pname1, (pvalue1, perror1)) in enumerate(nominal.items()): for pindex2, (pname2, (pvalue2, perror2)) in enumerate(nominal.items()): if pindex1 > pindex2: continue # Loop over datasets. for n in self.corr_items: fisher = self.sensitivity['fisher'][n][idx] # Lookup the data vector mask for this dataset mask = self.data[n].data_mask # Loop over distorted / non-distorted. for idistort in range(2): # Combine peak + smooth partials. partial1 = self.sensitivity['partials'][n][pindex1,idistort].sum(axis=0) partial2 = self.sensitivity['partials'][n][pindex2,idistort].sum(axis=0) # Calculate the Fisher info for all unmasked correlation bins. masked_info = partial1[mask] * self.data[n].inv_masked_cov.dot(partial2[mask]) fisher[idistort, self.data[n].data_mask] = masked_info # Calculate the predicted inverse covariance for this parameter pair. #ivar[idistort] = np.sum(fisher[idistort]) #ferror = ivar ** -0.5 if ivar > 0 else np.nan # Set unused bins to NaN for plotting fisher[idistort, ~mask] = np.nan idx += 1
[docs] def chi2(self, params=None, direct_pk=None): """Compute full chi2 for all components. Parameters ---------- params : dict, optional Computation parameters, by default None direct_pk: 1D array or None, optional If not None, the full Pk (e.g. from CLASS/CAMB) to be used directly, by default None Returns ------- float chi^2 """ assert self._has_data # Check if blinding is initialized if self._blind is None: self._blind = False for data_obj in self.data.values(): if data_obj.blind: self._blind = True # Overwrite computation parameters local_params = copy.deepcopy(self.params) if params is not None: for par, val in params.items(): local_params[par] = val # Enforce blinding if self._blind: for par, val in local_params.items(): if par in BLIND_FIXED_PARS: local_params[par] = 1. # Go trough each component and compute the chi^2 chi2 = 0 for name in self.corr_items: try: if direct_pk is None: model_cf = self.models[name].compute(local_params, self.fiducial['pk_full'], self.fiducial['pk_smooth']) else: model_cf = self.models[name].compute_direct(local_params, direct_pk) except utils.VegaBoundsError: self.models[name].PktoXi.cache_pars = None return 1e100 if self.monte_carlo: diff = self.data[name].masked_mc_mock - model_cf[self.data[name].model_mask] chi2 += diff.T.dot(self.data[name].scaled_inv_masked_cov.dot(diff)) else: diff = self.data[name].masked_data_vec - model_cf[self.data[name].model_mask] chi2 += diff.T.dot(self.data[name].inv_masked_cov.dot(diff)) # Add priors for param, prior in self.priors.items(): if param not in local_params: err_msg = ("You have specified a prior for a parameter not in " f"the model. Offending parameter: {param}") assert param in local_params, err_msg chi2 += self._gaussian_chi2_prior(local_params[param], prior[0], prior[1]) assert isinstance(chi2, float) return chi2
[docs] def log_lik(self, params=None, direct_pk=None): """Compute full log likelihood for all components. Parameters ---------- params : dict, optional Computation parameters, by default None direct_pk: 1D array or None, optional If not None, the full Pk (e.g. from CLASS/CAMB) to be used directly, by default None Returns ------- float log Likelihood """ assert self._has_data # Get the full chi2 chi2 = self.chi2(params, direct_pk) # Compute the normalization for each component log_norm = 0 for name in self.corr_items: log_norm -= 0.5 * self.data[name].data_size * np.log(2 * np.pi) if self.monte_carlo: log_norm -= 0.5 * self.data[name].scaled_log_cov_det else: log_norm -= 0.5 * self.data[name].log_cov_det # Compute log lik log_lik = log_norm - 0.5 * chi2 # Add priors normalization for param, prior in self.priors.items(): log_lik += self._gaussian_lik_prior(prior[1]) return log_lik
[docs] def monte_carlo_sim(self, params=None, scale=None, seed=0, forecast=False): """Compute Monte Carlo simulations for each Correlation item. Parameters ---------- params : dict, optional Computation parameters, by default None scale : float/dict, optional Scaling for the covariance, by default 1. seed : int, optional Seed for the random number generator, by default 0 forecast : boolean, optional Forecast option. If true, we don't add noise to the mock, by default False Returns ------- dict Dictionary with MC mocks for each item """ assert self._has_data # Overwrite computation parameters local_params = copy.deepcopy(self.params) if params is not None: for par, val in params.items(): local_params[par] = val mocks = {} for name in self.corr_items: # Compute fiducial model fiducial_model = self.models[name].compute( local_params, self.fiducial['pk_full'], self.fiducial['pk_smooth']) # Get scale if scale is None: item_scale = self.corr_items[name].cov_rescale elif type(scale) is float or type(scale) is int: item_scale = scale elif name in scale: item_scale = scale[name] else: item_scale = 1. # Create the mock mocks[name] = self.data[name].create_monte_carlo(fiducial_model, item_scale, seed, forecast) self.monte_carlo = True return mocks
[docs] def minimize(self): """Minimize the chi2 over the sampled parameters. """ if self.minimizer is None: print("No sampled parameters. Skipping minimization.") return # if not self.fiducial['save-components']: # self.set_fast_metals() self.minimizer.minimize() self.bestfit_model = self.compute_model(self.minimizer.values, run_init=False) self.total_data_size = 0 self.bestfit_corr_stats = {} num_pars = len(self.sample_params['limits']) print('\n----------------------------------------------------') for name in self.corr_items: data_size = self.data[name].data_size self.total_data_size += data_size if self.monte_carlo: diff = self.data[name].masked_mc_mock \ - self.bestfit_model[name][self.data[name].model_mask] chisq = diff.T.dot(self.data[name].scaled_inv_masked_cov.dot(diff)) else: diff = self.data[name].masked_data_vec \ - self.bestfit_model[name][self.data[name].model_mask] chisq = diff.T.dot(self.data[name].inv_masked_cov.dot(diff)) reduced_chisq = chisq / (data_size - num_pars) p_value = 1 - scipy.stats.chi2.cdf(chisq, data_size - num_pars) print(f'{name} chi^2/(ndata-nparam): {chisq:.1f}/({data_size}-{num_pars}) ' f'= {reduced_chisq:.3f}, PTE={p_value:.2f}') print('----------------------------------------------------') self.bestfit_corr_stats[name] = {'size': data_size, 'chisq': chisq, 'reduced_chisq': reduced_chisq, 'p_value': p_value} self.chisq = self.minimizer.fmin.fval self.reduced_chisq = self.chisq / (self.total_data_size - num_pars) self.p_value = 1 - scipy.stats.chi2.cdf(self.chisq, self.total_data_size - num_pars) print(f'Total chi^2/(ndata-nparam): {self.chisq:.1f}/({self.total_data_size}-{num_pars}) ' f'= {self.reduced_chisq:.3f}, PTE={self.p_value:.2f}') print('----------------------------------------------------\n') if not self.minimizer.fmin.is_valid: print('Invalid fit!!! Check data, covariance, model and priors.')
@property def bestfit(self): """Access the bestfit results from iminuit. Returns ------- Minimizer Returns the Minimizer class which stores the bestfit values """ return self.minimizer
[docs] def set_fast_metals(self): """Activate fast metals. This is automatically called when running the minimizer or the sampler. """ print('Warning! Activating fast metals for minimizing/sampling.') for name in self.corr_items: if self.models[name].metals is not None: self.models[name].metals.fast_metals = True
@staticmethod def _read_fiducial(fiducial_config): """Read the fiducial pk file and get the configs. Parameters ---------- fiducial_config : ConfigParser fiducial section from the main config file Returns ------- dict dictionary with the fiducial data and config """ # First check the path and replace with the right model if necessary path = fiducial_config.get('filename') path = utils.find_file(os.path.expandvars(path)) # if not os.path.isfile(path): # path = resource_filename('vega', 'models') + '/{}'.format(path) print('INFO: reading input Pk {}'.format(path)) fiducial = {} # Open the fits file and get what we need hdul = fits.open(path) fiducial['z_fiducial'] = hdul[1].header['ZREF'] fiducial['Omega_m'] = hdul[1].header['OM'] fiducial['Omega_de'] = hdul[1].header['OL'] fiducial['k'] = hdul[1].data['K'] fiducial['pk_full'] = hdul[1].data['PK'] fiducial['pk_smooth'] = hdul[1].data['PKSB'] hdul.close() return fiducial @staticmethod def _read_parameters(corr_items, parameters_config): """Read computation parameters. If a parameter is specified multiple times, the parameters in the main config file have priority. Parameters ---------- corr_items : dict Dictionary of correlation items parameters_config : ConfigParser parameters section from main config Returns ------- dict Computation parameters """ params = {} # First get the parameters from each component config for name, corr_item in corr_items.items(): if 'parameters' in corr_item.config: for param, value in corr_item.config.items('parameters'): params[param] = float(value) # Next get the parameters in the main config for param, value in parameters_config.items(): params[param] = float(value) return params def _read_sample(self, sample_config): """Read sample parameters. These must be of the form: param = min max / for sampler only or param = min max val err / for both sampler and fitter. Fitter accepts None for min/max, but the sampler does not. Parameters ---------- sample_config : ConfigParser sample section from main config Returns ------- dict Config for the sampled parameters """ # Initialize the dictionaries we need sample_params = {} sample_params['limits'] = {} sample_params['values'] = {} sample_params['errors'] = {} sample_params['fix'] = {} default_values = get_default_values() def check_param(param): if param not in default_values: raise ValueError('Default values not found for: %s. Please add' ' them to default_values.txt, or provide the' ' full sampling specification.' % param) for param, values in sample_config.items(): if param not in self.params: print('Warning: You tried sampling the parameter: %s.' ' As this parameter was not specified under' ' [parameters], it will be skipped.' % param) continue values_list = values.split() # Get the prior limits # ! Sampler needs actual values (no None) if len(values_list) > 1: lower_limit = None upper_limit = None if values_list[0] != 'None': lower_limit = float(values_list[0]) if values_list[1] != 'None': upper_limit = float(values_list[1]) sample_params['limits'][param] = (lower_limit, upper_limit) else: if values_list[0] not in ['True', 'true', 't', 'y', 'yes']: continue check_param(param) sample_params['limits'][param] = default_values[param]['limits'] # Get the values and errors for the fitter if len(values_list) > 2: sample_params['values'][param] = float(values_list[2]) else: check_param(param) sample_params['values'][param] = self.params[param] if len(values_list) > 3: assert len(values_list) == 4 sample_params['errors'][param] = float(values_list[3]) else: check_param(param) sample_params['errors'][param] = default_values[param]['error'] # Populate the fix values sample_params['fix'][param] = False return sample_params @staticmethod def _gaussian_chi2_prior(value, mean, sigma): return (value - mean)**2 / sigma**2 @staticmethod def _gaussian_lik_prior(sigma): return -0.5 * np.log(2 * np.pi) - np.log(sigma) @staticmethod def _init_priors(prior_config): """Initialize the priors. Only gaussian priors are currently supported Parameters ---------- prior_config : ConfigParser priors section from main config Returns ------- dict Dictionary of priors (mean, sigma) with the keys as parameter names """ prior_dict = {} for param, prior in prior_config.items(): prior_list = prior.split() if len(prior_list) != 3: raise ValueError('Prior configuration must have the format:' ' "<param> = gaussian <mean> <sigma>"') if prior_list[0] not in ['gaussian', 'Gaussian']: raise ValueError('Only gaussian priors are supported.') prior_dict[param] = np.array(prior_list[1:]).astype(float) return prior_dict