import os
import git
import numpy as np
from pathlib import Path
from astropy.io import fits
from datetime import datetime
from configparser import ConfigParser
import vega
from vega.utils import find_file
[docs]class BuildConfig:
"""Build and manage config files based on available templates
"""
_params_template = None
recognised_correlations = ['lyaxlya', 'lyaxlyb', 'lyaxqso', 'lybxqso',
'lyaxdla', 'lybxdla', 'qsoxqso', 'qsoxdla', 'dlaxdla']
def __init__(self, options={}, overwrite=False):
"""Initialize the model options that are not tracer or correlation specific.
Parameters
----------
options : dict, optional
Dictionary with model options, by default {}
Here is a list of options:
scale_params: string from ['ap_at', 'phi_alpha', 'aiso_epsilon'], default 'ap_at'
template: string with custom_link, default 'PlanckDR16/PlanckDR16.fits'
full_shape: Bool, default False
full_shape_alpha: Bool, default False
smooth_scaling: Bool, default False
small_scale_nl: Bool, default False
small_scale_nl_cross: Bool, default False
bao_broadening: Bool, default False
uv_background: Bool, default False
velocity_dispersion: string from [None, 'lorentz', 'gauss'], default None
radiation_effects: Bool, default False
hcd_model: string from [None, 'Rogers', 'fvoigt', 'sinc'], default None
fvoigt_model: string, default exp
fullshape_smoothing: string from [None, 'gauss', 'gauss_iso', 'exp'], default None
metals: List can include ['all', 'SiII(1190)', 'SiII(1193)', 'SiIII(1207)',
'SiII(1260)', 'CIV(eff)'], default None
"""
self.overwrite = overwrite
self.options = {}
self.options['scale_params'] = options.get('scale_params', 'ap_at')
self.options['template'] = options.get('template', 'PlanckDR16/PlanckDR16.fits')
self.options['full_shape'] = options.get('full_shape', False)
self.options['full_shape_alpha'] = options.get('full_shape_alpha', False)
self.options['smooth_scaling'] = options.get('smooth_scaling', False)
self.options['small_scale_nl'] = options.get('small_scale_nl', False)
self.options['small_scale_nl_cross'] = options.get('small_scale_nl_cross', False)
self.options['bao_broadening'] = options.get('bao_broadening', False)
self.options['uv_background'] = options.get('uv_background', False)
self.options['velocity_dispersion'] = options.get('velocity_dispersion', None)
self.options['radiation_effects'] = options.get('radiation_effects', False)
self.options['hcd_model'] = options.get('hcd_model', None)
self.options['fvoigt_model'] = options.get('fvoigt_model', 'exp')
self.options['fullshape_smoothing'] = options.get('fullshape_smoothing', None)
self.options['fullshape_smoothing_metals'] = options.get(
'fullshape_smoothing_metals', False)
self.options['desi-instrumental-systematics'] = options.get(
'desi-instrumental-systematics', False)
self.options['test'] = options.get('test', False)
metals = options.get('metals', None)
if metals is not None:
if 'all' in metals:
metals = ['SiII(1190)', 'SiII(1193)', 'SiIII(1207)', 'SiII(1260)', 'CIV(eff)']
self.options['metals'] = metals
[docs] def build(self, correlations, fit_type, fit_info, out_path, parameters={}, name_extension=None):
"""Build Vega config files and write them to an output directory
Parameters
----------
correlations : dict
Information for each correlation. It must contain the path to the measured correlation,
and the path to metal files if metals were requested. Optionally specify scale cuts.
List of options:
corr_path: string
metal_path: string
r-min: float, default 10
r-max: float, default 180
rt-min: float, default 0
fast_metals: bool, default False
binsize: int, default 4 (deprecated)
broadband: broadband string configuration
fit_type : string
Name of the fit. Includes the name of the correlations with the two
tracers separated by an "x" (e.g. lyaxqso), and different correlations
separated by an underscore (e.g. lyaxlya_lyaxqso). If unsure check
the templates folder to see all possibilities.
fit_info : dict
Fit information. Must contain a list of sampled parameters and the effective redshift.
List of options:
fitter: bool, default True
sampler: bool, default False
bias_beta_config: dict with 'tracer': 'bias_beta', 'bias_eta_beta', 'bias_bias_eta'
zeff: float, default None
zeff_rmin: float, default 0
zeff_rmax: float, default 300
sample_params: list or dict with vega setup (par_name: min max start step)
priors: dict with vega setup (par_name: 'gaussian mean sigma')
Polychord: dict with Polychord setup
out_path : string
Path to directory where to write the config files
parameters : dict, optional
Parameter values to write to the main config, by default {}
name_extension : string, optional
Optional string to add to the config file names, by default None
Returns
-------
string
Path to the main config file
"""
# Save some of the info
self.fit_info = fit_info
self.name_extension = name_extension
# Check if we need sampler or fitter or both
self.fitter = fit_info.get('fitter', True)
self.sampler = fit_info.get('sampler', False)
# get the relevant paths
self.config_path = Path(os.path.expandvars(out_path))
assert self.config_path.is_dir()
if self.fitter:
self.fitter_out_path = self.config_path / 'output_fitter'
if not self.fitter_out_path.exists():
os.mkdir(self.fitter_out_path)
if self.sampler:
self.sampler_out_path = self.config_path / 'output_sampler'
if not self.sampler_out_path.exists():
os.mkdir(self.sampler_out_path)
# Check if we know the correlation types
components = fit_type.split('_')
for corr in components:
if corr not in self.recognised_correlations:
raise ValueError(f'Unknown correlation {corr}, part of fit type {fit_type}.')
if len(components) != len(set(components)):
print(f'Warning! fit type {fit_type} has duplicates')
# Get git hash
vega_path = Path(os.path.dirname(vega.__file__))
try:
git_hash = git.Repo(vega_path.parents[0]).head.object.hexsha
except git.InvalidGitRepositoryError:
git_hash = "None"
# Build config files for each correlation
self.corr_paths = []
self.corr_names = []
self.data_paths = []
for name in components:
# Check if we have info on the correlation
if name not in correlations:
raise ValueError(f'You asked for correlation {name} but did not provide'
' its configuration in the "correlations" dictionary.')
# Build the config file for the correlation and save the path
corr_path, data_path, tracer1, tracer2 = self._build_corr_config(name,
correlations[name],
git_hash)
self.corr_paths.append(corr_path)
self.data_paths.append(data_path)
if tracer1 not in self.corr_names:
self.corr_names.append(tracer1)
if tracer2 not in self.corr_names:
self.corr_names.append(tracer2)
main_path = self._build_main_config(fit_type, fit_info, parameters, git_hash)
return main_path
def _build_corr_config(self, name, corr_info, git_hash):
"""Build config file for a correlation based on a template
Parameters
----------
name : string
Name of the correlation. Must be the same as corresponding template file name
corr_info : dict
Correlation information. The paths to the data and metal files are required.
Returns
-------
string
Path to the config file for the correlation.
"""
# Read template
config = ConfigParser()
config.optionxform = lambda option: option
template_path = find_file(f'vega/templates/{name}.ini')
config.read(template_path)
# get tracer info
tracer1 = config['data']['tracer1']
tracer2 = config['data']['tracer2']
type1 = config['data']['tracer1-type']
type2 = config['data']['tracer2-type']
# Write the basic info
config['data']['filename'] = corr_info.get('corr_path')
if 'distortion-file' in corr_info:
config['data']['distortion-file'] = corr_info.get('distortion-file')
config['cuts']['r-min'] = str(corr_info.get('r-min', 10))
config['cuts']['r-max'] = str(corr_info.get('r-max', 180))
config['cuts']['rt-min'] = str(corr_info.get('rt-min', 0))
if self.options['test']:
config['data']['test'] = 'True'
if 'binsize' in corr_info:
config['parameters'] = {}
config['parameters']['par binsize {}'.format(name)] = str(corr_info.get('binsize', 4))
config['parameters']['per binsize {}'.format(name)] = str(corr_info.get('binsize', 4))
# Write the model options
# Things that require LYA
if tracer1 == 'LYA' and tracer2 == 'LYA':
if self.options['small_scale_nl']:
config['model']['small scale nl'] = 'dnl_arinyo'
elif tracer1 == 'LYA' or tracer2 == 'LYA':
if self.options['small_scale_nl_cross']:
config['model']['small scale nl'] = 'dnl_arinyo'
# Things that require both tracers to be continuous
if type1 == 'continuous' and type2 == 'continuous':
if self.options['desi-instrumental-systematics']:
config['model']['desi-instrumental-systematics'] = 'True'
# Things that require at least one tracer to be continuous
if type1 == 'continuous' or type2 == 'continuous':
if self.options['uv_background']:
config['model']['add uv'] = 'True'
if self.options['hcd_model'] is not None:
assert self.options['hcd_model'] in ['fvoigt', 'Rogers2018', 'sinc']
config['model']['model-hcd'] = self.options['hcd_model']
if self.options['hcd_model'] == 'fvoigt':
config['model']['fvoigt_model'] = self.options['fvoigt_model']
if self.options['metals'] is not None:
config['metals'] = {}
config['metals']['filename'] = corr_info.get('metal_path')
config['metals']['z evol'] = 'bias_vs_z_std'
if type1 == 'continuous':
config['metals']['in tracer1'] = ' '.join(self.options['metals'])
if type2 == 'continuous':
config['metals']['in tracer2'] = ' '.join(self.options['metals'])
if 'fast_metals' in corr_info:
config['model']['fast_metals'] = corr_info.get('fast_metals', 'False')
# Things that require at least one discrete tracer
if type1 == 'discrete' or type2 == 'discrete':
if self.options['velocity_dispersion'] is not None:
assert self.options['velocity_dispersion'] in ['lorentz', 'gauss']
config['model']['velocity dispersion'] = self.options['velocity_dispersion']
if self.options['metals'] is not None and type1 != type2:
config['metals']['velocity dispersion'] = self.options['velocity_dispersion']
# Only for the LYA - QSO cross
if 'LYA' in [tracer1, tracer2] and 'QSO' in [tracer1, tracer2]:
if self.options['radiation_effects']:
config['model']['radiation effects'] = 'True'
# General things
if 'broadband' in corr_info:
config['broadband'] = {}
for key, item in corr_info['broadband'].items():
config['broadband'][key] = item
if self.options['fullshape_smoothing'] is not None:
assert self.options['fullshape_smoothing'] in ['gauss', 'gauss_iso', 'exp']
config['model']['fullshape smoothing'] = self.options['fullshape_smoothing']
condition = (type1 == 'continuous' or type2 == 'continuous')
condition &= self.options['metals'] is not None
condition &= self.options['fullshape_smoothing_metals']
if condition:
config['metals']['fullshape smoothing'] = self.options['fullshape_smoothing']
config['model']['fast_metals'] = "False"
if self.name_extension is None:
corr_path = self.config_path / '{}.ini'.format(name)
else:
corr_path = self.config_path / '{}-{}.ini'.format(name, self.name_extension)
if corr_path.is_file() and not self.overwrite:
raise ValueError(f'File {corr_path} already exists. Please change the name extension.')
with open(corr_path, 'w') as configfile:
configfile.write(f'# File written on {datetime.now()} \n')
configfile.write(f'# Vega git hash: {git_hash} \n\n')
config.write(configfile)
return corr_path, config['data']['filename'], tracer1, tracer2
[docs] @staticmethod
def get_zeff(data_paths, rmin=0., rmax=300.):
"""Compute effective redshift of all correlations
Parameters
----------
data_paths : List[string]
List of paths to exported picca correlations
rmin : float, optional
Minimum separation, by default 0.
rmax : float, optional
Maximum separation, by default 300.
Returns
-------
float
Effective redshift
"""
zeff_list = []
weights = []
for path in data_paths:
hdul = fits.open(path)
r_arr = np.sqrt(hdul[1].data['RP']**2 + hdul[1].data['RT']**2)
cells = (r_arr > rmin) * (r_arr < rmax)
zeff = np.average(hdul[1].data['Z'][cells], weights=hdul[1].data['NB'][cells])
weight = np.sum(hdul[1].data['NB'][cells])
hdul.close()
zeff_list.append(zeff)
weights.append(weight)
zeff = np.average(zeff_list, weights=weights)
return zeff
def _build_main_config(self, fit_type, fit_info, parameters, git_hash):
"""Build the main vega configuration file
Parameters
----------
fit_type : string
Name of the fit. Includes the name of the correlations with the two
tracers separated by a single underscore (e.g. lyalya_qso),
and different correlations separated by a double underscore
(e.g. lyalya_lyalya__lyalya_qso). If unsure check the templates
folder to see all possibilities.
fit_info : dict
Fit information. Must contain a list of sampled parameters and the effective redshift.
parameters : dict, optional
Parameter values to write to the main config
Returns
-------
Path
Path to written 'main.ini' file to be used with vega
"""
# Initialize the config
config = ConfigParser()
config.optionxform = lambda option: option
# Check the effective redshift
self.zeff_in = fit_info.get('zeff', None)
zeff_rmin = fit_info.get('zeff_rmin', 0.)
zeff_rmax = fit_info.get('zeff_rmax', 300.)
zeff_comp = self.get_zeff(self.data_paths, zeff_rmin, zeff_rmax)
if self.zeff_in is None:
self.zeff_in = zeff_comp
elif self.zeff_in != zeff_comp:
print('Warning: Input zeff and computed zeff are different. Will write input zeff.')
# Write the paths to the correlation configs
config['data sets'] = {}
config['data sets']['zeff'] = str(self.zeff_in)
corr_paths = [str(path) for path in self.corr_paths]
config['data sets']['ini files'] = ' '.join(corr_paths)
# Write the scale parameters functions
config['cosmo-fit type'] = {}
config['cosmo-fit type']['cosmo fit func'] = self.options['scale_params']
config['cosmo-fit type']['full-shape'] = str(self.options['full_shape'])
config['cosmo-fit type']['full-shape-alpha'] = str(self.options['full_shape_alpha'])
config['cosmo-fit type']['smooth-scaling'] = str(self.options['smooth_scaling'])
# Write the template info
config['fiducial'] = {}
config['fiducial']['filename'] = self.options['template']
# Write the output path
run_name = fit_type
if self.name_extension is not None:
run_name += '-{}'.format(self.name_extension)
config['output'] = {}
config['output']['filename'] = str(self.fitter_out_path / run_name)
# Write the sampled parameters
sample_params = fit_info['sample_params']
config['sample'] = {}
if type(sample_params) is list:
for param in sample_params:
config['sample'][param] = 'True'
elif type(sample_params) is dict:
for param, setup in sample_params.items():
config['sample'][param] = setup
else:
raise TypeError('The sample_params object has to be either a list or a dict.')
# Write the priors
if 'priors' in fit_info:
config['priors'] = {}
for par, prior in fit_info['priors'].items():
assert par in config['sample'], 'Cannot add prior for parameter that is not sampled'
config['priors'][par] = prior
# Write the parameters
self.parameters = parameters
config['parameters'] = {}
for name, value in self.parameters.items():
config['parameters'][name] = str(value)
# Check all sampled parameters are defined
for param in sample_params:
if param not in config['parameters']:
raise ValueError(f'Asked for unknown parameter "{param}". This does not exist in '
'the current configuration. Please check the vega configuration '
'you requested is correct. If this is a new parameter that does '
'not have a default value yet, please add it to the parameters '
'dictionary when calling BuildConfig.')
# Check if we need the sampler
if self.sampler:
config['control'] = {}
config['control']['sampler'] = 'True'
config['Polychord'] = {}
config['Polychord']['path'] = str(self.sampler_out_path)
config['Polychord']['name'] = run_name
config['Polychord']['num_live'] = fit_info['Polychord'].get('num_live',
str(25*len(sample_params)))
config['Polychord']['num_repeats'] = fit_info['Polychord'].get('num_repeats',
str(len(sample_params)))
config['Polychord']['do_clustering'] = 'True'
config['Polychord']['boost_posterior'] = str(3)
# Write main config
if self.name_extension is None:
main_path = self.config_path / 'main.ini'
else:
main_path = self.config_path / 'main-{}.ini'.format(self.name_extension)
if main_path.is_file() and not self.overwrite:
raise ValueError(f'File {main_path} already exists. Please change the name extension.')
with open(main_path, 'w') as configfile:
configfile.write(f'# File written on {datetime.now()} \n')
configfile.write(f'# Vega git hash: {git_hash} \n\n')
config.write(configfile)
return main_path
@property
def parameters(self):
"""Parameters property
Returns
-------
dict
Parameters
"""
return self._parameters
@parameters.setter
def parameters(self, parameters):
"""Setter for parameters property
Parameters
----------
parameters : dict
Parameters dictionary
"""
if self._params_template is None:
# Read template
config = ConfigParser()
config.optionxform = lambda option: option
template_path = find_file('vega/templates/parameters.ini')
config.read(template_path)
self._params_template = config['parameters']
def get_par(name):
if name not in parameters and name not in self._params_template:
raise ValueError('Unknown parameter: {}, please pass a default value.'.format(name))
return parameters.get(name, self._params_template[name])
new_params = {}
# Scale parameters
if self.options['scale_params'] == 'ap_at':
new_params['ap'] = get_par('ap')
new_params['at'] = get_par('at')
elif self.options['scale_params'] == 'phi_alpha':
new_params['phi'] = get_par('phi')
new_params['alpha'] = get_par('alpha')
if self.options['full_shape']:
new_params['phi_full'] = get_par('phi_full')
if self.options['full_shape_alpha']:
new_params['alpha_full'] = get_par('alpha_full')
if self.options['smooth_scaling']:
new_params['phi_smooth'] = get_par('phi_smooth')
new_params['alpha_smooth'] = get_par('alpha_smooth')
elif self.options['scale_params'] == 'aiso_epsilon':
new_params['aiso'] = get_par('aiso')
new_params['epsilon'] = get_par('epsilon')
else:
raise ValueError('Unknown scale parameters: {}'.format(self.options['scale_params']))
# Peak parameters
if self.options['bao_broadening']:
new_params['sigmaNL_per'] = get_par('sigmaNL_per')
new_params['sigmaNL_par'] = get_par('sigmaNL_par')
else:
new_params['sigmaNL_per'] = 0.
new_params['sigmaNL_par'] = 0.
new_params['bao_amp'] = get_par('bao_amp')
def add_bias_beta(new_params, tracer, bias_beta_config, bias, bias_eta, beta, growth_rate):
if bias_beta_config == 'bias_beta':
new_params[f'bias_{tracer}'] = bias
new_params[f'beta_{tracer}'] = beta
elif bias_beta_config == 'bias_bias_eta':
new_params[f'bias_{tracer}'] = bias
new_params[f'bias_eta_{tracer}'] = bias_eta
new_params['growth_rate'] = growth_rate
elif bias_beta_config == 'bias_eta_beta':
new_params[f'beta_{tracer}'] = beta
new_params[f'bias_eta_{tracer}'] = bias_eta
new_params['growth_rate'] = growth_rate
else:
raise ValueError(f'Option {bias_beta_config} not a valid bias_beta_config. '
'Choose from ["bias_beta", "bias_eta_beta", "bias_bias_eta"].')
# bias beta model
for name in self.corr_names:
bias_beta_config = self.fit_info['bias_beta_config'].get(name, 'bias_beta')
growth_rate = parameters.get('growth_rate', None)
if growth_rate is None:
growth_rate = self.get_growth_rate(self.zeff_in)
if (name == 'LYA') or (name == 'LYB'):
bias = parameters.get(f'bias_{name}', self.get_lya_bias(self.zeff_in))
bias_eta = parameters.get(f'bias_eta_{name}', None)
beta = float(get_par(f'beta_{name}'))
if bias_eta is None:
bias_eta = bias * beta / growth_rate
elif name == 'QSO':
bias = parameters.get('bias_QSO', self.get_qso_bias(self.zeff_in))
beta = parameters.get('beta_QSO', None)
bias_eta = 1
if beta is None:
beta = growth_rate / bias
else:
raise ValueError(f'Tracer {name} not supported yet. Please open an issue')
add_bias_beta(new_params, name, bias_beta_config, bias, bias_eta, beta, growth_rate)
new_params['alpha_{}'.format(name)] = get_par('alpha_{}'.format(name))
# Small scale non-linear model
if self.options['small_scale_nl']:
new_params['dnl_arinyo_q1'] = get_par('dnl_arinyo_q1')
new_params['dnl_arinyo_kv'] = get_par('dnl_arinyo_kv')
new_params['dnl_arinyo_av'] = get_par('dnl_arinyo_av')
new_params['dnl_arinyo_bv'] = get_par('dnl_arinyo_bv')
new_params['dnl_arinyo_kp'] = get_par('dnl_arinyo_kp')
# HCDs
if self.options['hcd_model'] is not None:
new_params['bias_hcd'] = get_par('bias_hcd')
new_params['beta_hcd'] = get_par('beta_hcd')
new_params['L0_hcd'] = get_par('L0_hcd')
# Delta_rp
if 'QSO' in self.corr_names:
new_params['drp_QSO'] = get_par('drp_QSO')
# Velocity dispersion parameters
if self.options['velocity_dispersion'] is not None:
if self.options['velocity_dispersion'] == 'lorentz':
new_params['sigma_velo_disp_lorentz_QSO'] = get_par('sigma_velo_disp_lorentz_QSO')
else:
new_params['sigma_velo_disp_gauss_QSO'] = get_par('sigma_velo_disp_gauss_QSO')
# QSO radiation effects
if self.options['radiation_effects']:
new_params['qso_rad_strength'] = get_par('qso_rad_strength')
new_params['qso_rad_asymmetry'] = get_par('qso_rad_asymmetry')
new_params['qso_rad_lifetime'] = get_par('qso_rad_lifetime')
new_params['qso_rad_decrease'] = get_par('qso_rad_decrease')
# UV background parameters
if self.options['uv_background']:
new_params['bias_gamma'] = get_par('bias_gamma')
new_params['bias_prim'] = get_par('bias_prim')
new_params['lambda_uv'] = get_par('lambda_uv')
# Metals
if self.options['metals'] is not None:
for name in self.options['metals']:
new_params['bias_eta_{}'.format(name)] = get_par('bias_eta_{}'.format(name))
new_params['beta_{}'.format(name)] = get_par('beta_{}'.format(name))
new_params['alpha_{}'.format(name)] = get_par('alpha_{}'.format(name))
# Full-shape smoothing
if self.options['fullshape_smoothing'] is not None:
new_params['par_sigma_smooth'] = get_par('par_sigma_smooth')
if self.options['fullshape_smoothing'] in ['gauss', 'exp']:
new_params['per_sigma_smooth'] = get_par('per_sigma_smooth')
if self.options['fullshape_smoothing'] == 'exp':
new_params['par_exp_smooth'] = get_par('par_exp_smooth')
new_params['per_exp_smooth'] = get_par('per_exp_smooth')
# DESI instrumental systematics amplitude
if self.options['desi-instrumental-systematics']:
new_params['desi_inst_sys_amp'] = get_par('desi_inst_sys_amp')
# Check for broadband parameters
for name, value in parameters.items():
if 'BB' in name and name not in new_params:
new_params[name] = value
self._parameters = new_params
[docs] @staticmethod
def get_lya_bias(z):
"""Compute default Lya bias at redshift z
Parameters
----------
z : float
Redshift
Returns
-------
float
Default Lya bias
"""
return -0.1167 * ((1 + z) / (1 + 2.334))**2.9
[docs] @staticmethod
def get_qso_bias(z):
"""Compute default QSO bias at redshift z
Parameters
----------
z : float
Redshift
Returns
-------
float
Default QSO bias
"""
return 3.91 * ((1 + z) / (1 + 2.39))**1.7133
[docs] @staticmethod
def get_growth_rate(z, Omega_m=0.3153):
"""Compute default growth rate at redshift z
Parameters
----------
z : float
Redshift
Omega_m : float, optional
Matter fraction, by default 0.3153
Returns
-------
float
Default growth rate
"""
Omega_m_z = Omega_m * ((1 + z)**3) / (Omega_m * ((1 + z)**3) + 1 - Omega_m)
Omega_lambda_z = 1 - Omega_m_z
growth_rate = (Omega_m_z**0.6) + (Omega_lambda_z / 70.) * (1 + Omega_m_z / 2.)
return growth_rate