import numpy as np
from scipy.integrate import quad
from scipy.interpolate import interp1d
from . import utils
[docs]class CorrelationFunction:
"""Correlation function computation and handling.
# ! Slow operations should be kept in init as that is only called once
# ! Compute is called many times and should be fast
Extensions should have their separate method of the form
'compute_extension' that can be called from outside
"""
def __init__(self, config, fiducial, coords_grid, scale_params,
tracer1, tracer2, bb_config=None, metal_corr=False):
"""
Parameters
----------
config : ConfigParser
model section of config file
fiducial : dict
fiducial config
coords_grid : dict
Dictionary with coordinate grid - r, mu, z
scale_params : ScaleParameters
ScaleParameters object
tracer1 : dict
Config of tracer 1
tracer2 : dict
Config of tracer 2
bb_config : list, optional
list with configs of broadband terms, by default None
metal_corr : bool, optional
Whether this is a metal correlation, by default False
"""
self._config = config
self._r = coords_grid['r']
self._mu = coords_grid['mu']
self._z = coords_grid['z']
self._multipole = config.getint('single_multipole', -1)
self._tracer1 = tracer1
self._tracer2 = tracer2
self._z_eff = fiducial['z_eff']
self._rel_z_evol = (1. + self._z) / (1 + self._z_eff)
self._scale_params = scale_params
self._metal_corr = metal_corr
# Check if we need delta rp (Only for the cross)
self._delta_rp_name = None
if tracer1['type'] == 'discrete' and tracer2['type'] != 'discrete':
self._delta_rp_name = 'drp_' + tracer1['name']
elif tracer2['type'] == 'discrete' and tracer1['type'] != 'discrete':
self._delta_rp_name = 'drp_' + tracer2['name']
# Precompute growth
self._z_fid = fiducial['z_fiducial']
self._Omega_m = fiducial.get('Omega_m', None)
self._Omega_de = fiducial.get('Omega_de', None)
if not config.getboolean('old_growth_func', False):
self.xi_growth = self.compute_growth(self._z, self._z_fid, self._Omega_m,
self._Omega_de)
else:
self.xi_growth = self.compute_growth_old(self._z, self._z_fid, self._Omega_m,
self._Omega_de)
# Initialize the broadband
self.has_bb = False
if bb_config is not None:
self._init_broadband(bb_config)
self.has_bb = True
# Check for QSO radiation modeling and check if it is QSOxLYA
# Does this work for the QSO auto as well?
self.radiation_flag = False
if 'radiation effects' in self._config:
self.radiation_flag = self._config.getboolean('radiation effects')
if self.radiation_flag:
names = [self._tracer1['name'], self._tracer2['name']]
if not ('QSO' in names and 'LYA' in names):
raise ValueError('You asked for QSO radiation effects, but it'
' can only be applied to the cross (QSOxLya)')
# Check for relativistic effects and standard asymmetry
self.relativistic_flag = False
if 'relativistic correction' in self._config:
self.relativistic_flag = self._config.getboolean('relativistic correction')
self.asymmetry_flag = False
if 'standard asymmetry' in self._config:
self.asymmetry_flag = self._config.getboolean('standard asymmetry')
if self.relativistic_flag or self.asymmetry_flag:
types = [self._tracer1['type'], self._tracer2['type']]
if ('continuous' not in types) or (types[0] == types[1]):
raise ValueError('You asked for relativistic effects or standard asymmetry,'
' but they only work for the cross')
[docs] def compute(self, pk, pk_lin, PktoXi_obj, params):
"""Compute correlation function for input P(k).
Parameters
----------
pk : ND Array
Input power spectrum
pk_lin : 1D Array
Linear isotropic power spectrum
PktoXi_obj : vega.PktoXi
An instance of the transform object used to turn Pk into Xi
params : dict
Computation parameters
Returns
-------
1D Array
Output correlation function
"""
# Compute the core
xi = self.compute_core(pk, PktoXi_obj, params)
# Add bias evolution
xi *= self.compute_bias_evol(params)
# Add growth
xi *= self.xi_growth
# Add QSO radiation modeling for cross
if self.radiation_flag and not params['peak']:
xi += self.compute_qso_radiation(params)
# Add relativistic effects
if self.relativistic_flag:
xi += self.compute_xi_relativistic(pk_lin, PktoXi_obj, params)
# Add standard asymmetry
if self.asymmetry_flag:
xi += self.compute_xi_asymmetry(pk_lin, PktoXi_obj, params)
return xi
[docs] def compute_core(self, pk, PktoXi_obj, params):
"""Compute the core of the correlation function.
This does the Hankel transform of the input P(k),
sums the necessary multipoles and rescales the coordinates
Parameters
----------
pk : ND Array
Input power spectrum
PktoXi_obj : vega.PktoXi
An instance of the transform object used to turn Pk into Xi
params : dict
Computation parameters
Returns
-------
1D Array
Output correlation function
"""
# Check for delta rp
delta_rp = 0.
if self._delta_rp_name is not None:
delta_rp = params.get(self._delta_rp_name, 0.)
# Get rescaled Xi coordinates
ap, at = self._scale_params.get_ap_at(params, metal_corr=self._metal_corr)
rescaled_r, rescaled_mu = self._rescale_coords(self._r, self._mu, ap, at, delta_rp)
# Compute correlation function
xi = PktoXi_obj.compute(rescaled_r, rescaled_mu, pk, self._multipole)
return xi
@staticmethod
def _rescale_coords(r, mu, ap, at, delta_rp=0.):
"""Rescale Xi coordinates using ap/at.
Parameters
----------
r : ND array
Array of radius coords of Xi
mu : ND array
Array of mu = rp/r coords of Xi
ap : float
Alpha parallel
at : float
Alpha transverse
delta_rp : float, optional
Delta radius_parallel - nuisance correction for wrong redshift,
used for discrete tracers, by default 0.
Returns
-------
ND Array
Rescaled radii
ND Array
Rescaled mu
"""
mask = r != 0
rp = r[mask] * mu[mask] + delta_rp
rt = r[mask] * np.sqrt(1 - mu[mask]**2)
rescaled_rp = ap * rp
rescaled_rt = at * rt
rescaled_r = np.zeros(len(r))
rescaled_mu = np.zeros(len(mu))
rescaled_r[mask] = np.sqrt(rescaled_rp**2 + rescaled_rt**2)
rescaled_mu[mask] = rescaled_rp / rescaled_r[mask]
return rescaled_r, rescaled_mu
[docs] def compute_bias_evol(self, params):
"""Compute bias evolution for the correlation function.
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
Bias evolution for tracer
"""
# Compute the bias evolution
bias_evol = self._get_tracer_evol(params, self._tracer1['name'])
bias_evol *= self._get_tracer_evol(params, self._tracer2['name'])
return bias_evol
def _get_tracer_evol(self, params, tracer_name):
"""Compute tracer bias evolution.
Parameters
----------
params : dict
Computation parameters
tracer_name : string
Name of tracer
Returns
-------
ND Array
Bias evolution for tracer
"""
handle_name = 'z evol {}'.format(tracer_name)
if handle_name in self._config:
evol_model = self._config.get(handle_name, 'standard')
else:
evol_model = self._config.get('z evol', 'standard')
# Compute the bias evolution using the right model
if 'croom' in evol_model:
bias_evol = self._bias_evol_croom(params, tracer_name)
else:
bias_evol = self._bias_evol_std(params, tracer_name)
return bias_evol
def _bias_evol_std(self, params, tracer_name):
"""Bias evolution standard model.
Parameters
----------
params : dict
Computation parameters
tracer_name : string
Tracer name
Returns
-------
ND Array
Bias evolution for tracer
"""
p0 = params['alpha_{}'.format(tracer_name)]
bias_z = self._rel_z_evol**p0
return bias_z
def _bias_evol_croom(self, params, tracer_name):
"""Bias evolution Croom model for QSO, see Croom et al. 2005.
Parameters
----------
params : dict
Computation parameters
tracer_name : string
Tracer name
Returns
-------
ND Array
Bias evolution for tracer
"""
assert tracer_name == "QSO"
p0 = params["croom_par0"]
p1 = params["croom_par1"]
bias_z = (p0 + p1*(1. + self._z)**2) / (p0 + p1 * (1 + self._z_eff)**2)
return bias_z
[docs] def compute_growth(self, z_grid=None, z_fid=None,
Omega_m=None, Omega_de=None):
"""Compute growth factor.
Implements eq. 7.77 from S. Dodelson's Modern Cosmology book.
Returns
-------
ND Array
Growth factor
"""
# Check the defaults
if z_grid is None:
z_grid = self._z
if z_fid is None:
z_fid = self._z_fid
if Omega_m is None:
Omega_m = self._Omega_m
if Omega_de is None:
Omega_de = self._Omega_de
# Check if we have dark energy
if Omega_de is None:
growth = (1 + z_fid) / (1. + z_grid)
return growth**2
# Compute the growth at each redshift on the grid
growth = utils.growth_function(z_grid, Omega_m, Omega_de)
# Scale to the fiducial redshift
growth /= utils.growth_function(z_fid, Omega_m, Omega_de)
return growth**2
def compute_growth_old(self, z_grid=None, z_fid=None, Omega_m=None, Omega_de=None):
def hubble(z, Omega_m, Omega_de):
return np.sqrt(Omega_m*(1+z)**3 + Omega_de + (1-Omega_m-Omega_de)*(1+z)**2)
def dD1(a, Omega_m, Omega_de):
z = 1/a-1
return 1./(a*hubble(z, Omega_m, Omega_de))**3
# Calculate D1 in 100 values of z between 0 and zmax, then interpolate
nbins = 100
zmax = 5.
z = zmax * np.arange(nbins, dtype=float) / (nbins-1)
D1 = np.zeros(nbins, dtype=float)
pars = (Omega_m, Omega_de)
for i in range(nbins):
a = 1/(1+z[i])
D1[i] = 5/2.*Omega_m*hubble(z[i], *pars)*quad(dD1, 0, a, args=pars)[0]
D1 = interp1d(z, D1)
growth = D1(z_grid) / D1(z_fid)
return growth**2
def _init_broadband(self, bb_config):
"""Initialize the broadband terms.
Parameters
----------
bb_config : list
list with configs of broadband terms
"""
self.bb_terms = {}
self.bb_terms['pre-add'] = []
self.bb_terms['post-add'] = []
self.bb_terms['pre-mul'] = []
self.bb_terms['post-mul'] = []
# First pick up the normal broadband terms
normal_broadbands = [el for el in bb_config
if el['func'] != 'broadband_sky']
for index, config in enumerate(normal_broadbands):
# Create the name for the parameters of this term
name = 'BB-{}-{} {} {} {}'.format(config['cf_name'], index,
config['type'], config['pre'],
config['rp_rt'])
# Create the broadband term dictionary
bb = {}
bb['name'] = name
bb['func'] = config['func']
bb['rp_rt'] = config['rp_rt']
bb['r_config'] = config['r_config']
bb['mu_config'] = config['mu_config']
self.bb_terms[config['pre'] + "-" + config['type']].append(bb)
# Next pick up the sky broadban terms
sky_broadbands = [el for el in bb_config
if el['func'] == 'broadband_sky']
for index, config in enumerate(sky_broadbands):
assert config['rp_rt'] == 'rp,rt'
# Create the name for the parameters of this term
name = 'BB-{}-{}-{}'.format(config['cf_name'],
index + len(normal_broadbands),
config['func'])
# Create the broadband term dictionary
bb = {}
bb['name'] = name
bb['func'] = config['func']
bb['bin_size_rp'] = config['bin_size_rp']
self.bb_terms[config['pre'] + "-" + config['type']].append(bb)
[docs] def compute_broadband(self, params, pos_type):
"""Compute the broadband terms for
one position (pre-distortion/post-distortion)
and one type (multiplicative/additive).
Parameters
----------
params : dict
Computation parameters
pos_type : string
String with position and type, must be one of:
'pre-mul' or 'pre-add' or 'post-mul' or 'post-add'
Returns
-------
1d Array
Output broadband
"""
assert pos_type in ['pre-mul', 'pre-add', 'post-mul', 'post-add']
corr = None
# Loop over the right pos/type configuration
for bb_term in self.bb_terms[pos_type]:
# Check if it's sky or normal broadband
if bb_term['func'] != 'broadband_sky':
# Initialize the broadband and check
# if we need to add or multiply
if corr is None:
corr = self.broadband(bb_term, params)
if 'mul' in pos_type:
corr = 1 + corr
elif 'mul' in pos_type:
corr *= 1 + self.broadband(bb_term, params)
else:
corr += self.broadband(bb_term, params)
else:
# Initialize the broadband and check
# if we need to add or multiply
if corr is None:
corr = self.broadband_sky(bb_term, params)
if 'mul' in pos_type:
corr = 1 + corr
elif 'mul' in pos_type:
corr *= 1 + self.broadband_sky(bb_term, params)
else:
corr += self.broadband_sky(bb_term, params)
# Give defaults if corr is still None
if corr is None:
if 'mul' in pos_type:
corr = 1.
else:
corr = 0.
return corr
[docs] def broadband_sky(self, bb_term, params):
"""Compute sky broadband term.
Calculates a Gaussian broadband in rp,rt for the sky residuals.
Parameters
----------
bb_term : dict
broadband term config
params : dict
Computation parameters
Returns
-------
1d Array
Output broadband
"""
rp = self._r * self._mu
rt = self._r * np.sqrt(1 - self._mu**2)
scale = params[bb_term['name'] + '-scale-sky']
sigma = params[bb_term['name'] + '-sigma-sky']
corr = scale / (sigma * np.sqrt(2. * np.pi))
corr *= np.exp(-0.5 * (rt / sigma)**2)
w = (rp >= 0.) & (rp < bb_term['bin_size_rp'])
corr[~w] = 0.
return corr
[docs] def broadband(self, bb_term, params):
"""Compute broadband term.
Calculates a power-law broadband in r and mu or rp,rt.
Parameters
----------
bb_term : dict
broadband term config
params : dict
Computation parameters
Returns
-------
1d Array
Output broadband
"""
r1 = self._r / 100.
r2 = self._mu
if bb_term['rp_rt'] == 'rp,rt':
r1 = self._r / 100. * self._mu
r2 = self._r / 100. * np.sqrt(1 - self._mu**2)
r_min, r_max, dr = bb_term['r_config']
mu_min, mu_max, dmu = bb_term['mu_config']
r1_powers = np.arange(r_min, r_max + 1, dr)
r2_powers = np.arange(mu_min, mu_max + 1, dmu)
bb_params = []
for i in r1_powers:
for j in r2_powers:
bb_params.append(params['{} ({},{})'.format(
bb_term['name'], i, j)])
bb_params = np.array(bb_params).reshape(-1, r_max - r_min + 1)
corr = (bb_params[:, :, None, None] * r1**r1_powers[:, None, None] *
r2**r2_powers[None, :, None]).sum(axis=(0, 1, 2))
return corr
[docs] def compute_qso_radiation(self, params):
"""Model the contribution of QSO radiation to the cross
(the transverse proximity effect)
Parameters
----------
params : dict
Computation parameters
Returns
-------
1D
Xi QSO radiation model
"""
assert 'QSO' in [self._tracer1['name'], self._tracer2['name']]
assert self._tracer1['name'] != self._tracer2['name']
# Compute the shifted r and mu grids
delta_rp = params.get(self._delta_rp_name, 0.)
rp = self._r * self._mu + delta_rp
rt = self._r * np.sqrt(1 - self._mu**2)
r_shift = np.sqrt(rp**2 + rt**2)
mu_shift = rp / r_shift
# Get the QSO radiation model parameters
strength = params['qso_rad_strength']
asymmetry = params['qso_rad_asymmetry']
lifetime = params['qso_rad_lifetime']
decrease = params['qso_rad_decrease']
# Compute the QSO radiation model
xi_rad = strength / (r_shift**2) * (1 - asymmetry * (1 - mu_shift**2))
xi_rad *= np.exp(-r_shift * ((1 + mu_shift) / lifetime + 1 / decrease))
return xi_rad
[docs] def compute_xi_relativistic(self, pk, PktoXi_obj, params):
"""Calculate the cross-correlation contribution from
relativistic effects (Bonvin et al. 2014).
Parameters
----------
pk : ND Array
Input power spectrum
PktoXi_obj : vega.PktoXi
An instance of the transform object used to turn Pk into Xi
params : dict
Computation parameters
Returns
-------
1D Array
Output xi relativistic
"""
assert 'continuous' in [self._tracer1['type'], self._tracer2['type']]
assert self._tracer1['type'] != self._tracer2['type']
# Get rescaled Xi coordinates
delta_rp = params.get(self._delta_rp_name, 0.)
ap, at = self._scale_params.get_ap_at(params, metal_corr=self._metal_corr)
rescaled_r, rescaled_mu = self._rescale_coords(self._r, self._mu, ap, at, delta_rp)
# Compute the correlation function
xi_rel = PktoXi_obj.pk_to_xi_relativistic(rescaled_r, rescaled_mu, pk, params)
return xi_rel
[docs] def compute_xi_asymmetry(self, pk, PktoXi_obj, params):
"""Calculate the cross-correlation contribution from
standard asymmetry (Bonvin et al. 2014).
Parameters
----------
pk : ND Array
Input power spectrum
PktoXi_obj : vega.PktoXi
An instance of the transform object used to turn Pk into Xi
params : dict
Computation parameters
Returns
-------
1D Array
Output xi asymmetry
"""
assert 'continuous' in [self._tracer1['type'], self._tracer2['type']]
assert self._tracer1['type'] != self._tracer2['type']
# Get rescaled Xi coordinates
delta_rp = params.get(self._delta_rp_name, 0.)
ap, at = self._scale_params.get_ap_at(params, metal_corr=self._metal_corr)
rescaled_r, rescaled_mu = self._rescale_coords(self._r, self._mu, ap, at, delta_rp)
# Compute the correlation function
xi_asy = PktoXi_obj.pk_to_xi_asymmetry(rescaled_r, rescaled_mu, pk, params)
return xi_asy
[docs] def compute_desi_instrumental_systematics(self, params, bin_size_rp):
"""Compute DESI instrumental systematics model
TODO add link to Satya's paper describing this
Parameters
----------
params : dict
Computation parameters
bin_size_rp : float
Bin size along the line-of-sight
Returns
-------
1D Array
Output correction
"""
if self._tracer1['type'] != self._tracer2['type']:
raise ValueError('DESI instrumental systematics model only applies '
'to auto-correlation functions.')
rp = self._r * self._mu
rt = self._r * np.sqrt(1 - self._mu**2)
# b = 0.0003189935987295203
b = params.get('desi_inst_sys_amp', 0.0003189935987295203)
w = (rp > 0) & (rp < bin_size_rp) & (rt < 80)
correction = np.zeros(rt.shape)
correction[w] = b * (rt[w] / 80 - 1)**2
return correction