import numpy as np
import copy
from pkg_resources import resource_filename
from numba import njit, float64
from . import utils
[docs]class PowerSpectrum:
"""Power Spectrum computation and handling.
# ! Slow operations should be kept in init as that is only called once
# ! Compute is called many times and should be fast
Extensions should have their separate method of the form
'compute_extension' that can be called from outside
"""
def __init__(self, config, fiducial, tracer1, tracer2, dataset_name=None):
"""
Parameters
----------
config : dict
pk config object
fiducial : dict
fiducial config
tracer1 : dict
Config of tracer 1
tracer2 : dict
Config of tracer 1
dataset_name : string
Name of dataset, by default None
"""
self._config = config
self.tracer1_name = copy.deepcopy(tracer1['name'])
self.tracer2_name = copy.deepcopy(tracer2['name'])
self.tracer1_type = copy.deepcopy(tracer1['type'])
self.tracer2_type = copy.deepcopy(tracer2['type'])
self._name = dataset_name
self.k_grid = fiducial['k']
self._bin_size_rp = config.getfloat('bin_size_rp')
self._bin_size_rt = config.getfloat('bin_size_rt')
self.use_Gk = self._config.getboolean('model binning', True)
# Get the HCD model and check for UV
self.hcd_model = self._config.get('model-hcd', None)
self._add_uv = self._config.getboolean('add uv', False)
# Check the HCD model
self._Fvoigt_data = None
if self.hcd_model is not None and 'fvoigt' in self.hcd_model:
assert 'fvoigt_model' in self._config.keys(), "No fvoigt_model specified in config"
fvoigt_model = self._config.get('fvoigt_model')
if '/' not in fvoigt_model:
path = f"{resource_filename('vega', 'models')}/fvoigt_models/"
path += f"Fvoigt_{fvoigt_model}.txt"
else:
path = fvoigt_model
self._Fvoigt_data = np.loadtxt(path)
# Initialize some stuff we need
self.pk_Gk = None
self._pk_fid = fiducial['pk_full'] * ((1 + fiducial['z_fiducial'])
/ (1. + fiducial['z_eff']))**2
# Initialize the mu_k grid
num_bins_muk = self._config.getint('num_bins_muk', 1000)
muk_grid = (np.arange(num_bins_muk) + 0.5) / num_bins_muk
self.muk_grid = muk_grid[:, None]
self.k_par_grid = self.k_grid * self.muk_grid
self.k_trans_grid = self.k_grid * np.sqrt(1 - self.muk_grid**2)
self._arinyo_pars = None
self._peak_nl_pars = None
self._L0_hcd_cache = None
self._F_hcd_cache = None
[docs] def compute(self, pk_lin, params, fast_metals=False):
"""Computes a power spectrum for the tracers using the input
linear P(k) and parameters.
Parameters
----------
pk_lin : 1D array
Linear Power Spectrum
params : dict
Computation parameters
Returns
-------
ND Array
Power spectrum
"""
# params = params
# Get the core biases and betas
bias_beta = utils.bias_beta(params, self.tracer1_name, self.tracer2_name)
bias1, beta1, bias2, beta2 = bias_beta
# Add UV model
if self._add_uv:
if self.tracer1_name == 'LYA':
bias1, beta1 = self.compute_bias_beta_uv(bias1, beta1, params)
if self.tracer2_name == 'LYA':
bias2, beta2 = self.compute_bias_beta_uv(bias2, beta2, params)
# Add HCD model
if self.hcd_model is not None:
if self.tracer1_name == 'LYA':
bias1, beta1 = self.compute_bias_beta_hcd(bias1, beta1, params)
if self.tracer2_name == 'LYA':
bias2, beta2 = self.compute_bias_beta_hcd(bias2, beta2, params)
# Compute kaiser model
pk_full = pk_lin * self.compute_kaiser(bias1, beta1, bias2, beta2, fast_metals)
# add non linear small scales
if 'small scale nl' in self._config.keys():
if 'arinyo' in self._config.get('small scale nl'):
pk_full *= self.compute_dnl_arinyo(params)
elif 'mcdonald' in self._config.get('small scale nl'):
pk_full *= self.compute_dnl_mcdonald()
else:
print('small scale nl: must be either mcdonald or arinyo')
raise ValueError('Incorrect \'small scale nl\' specified')
# model the effect of binning
if self.use_Gk:
if self.pk_Gk is None:
self.pk_Gk = self.compute_Gk(params)
pk_full *= self.pk_Gk
# add non linear large scales
if params['peak']:
pk_full *= self.compute_peak_nl(params)
# add full shape smoothing
if 'fullshape smoothing' in self._config:
smoothing_type = self._config.get('fullshape smoothing')
if 'gauss' in smoothing_type:
pk_full *= self.compute_fullshape_gauss_smoothing(params)
elif 'exp' in smoothing_type:
pk_full *= self.compute_fullshape_exp_smoothing(params)
else:
raise ValueError('"fullshape smoothing" must be of type'
' "gauss" or "exp".')
# add velocity dispersion
if 'velocity dispersion' in self._config:
smoothing_type = self._config.get('velocity dispersion')
if 'gauss' in smoothing_type:
pk_full *= self.compute_velocity_dispersion_gauss(params)
elif 'lorentz' in smoothing_type:
pk_full *= self.compute_velocity_dispersion_lorentz(params)
elif 'lorentz_gauss' in smoothing_type:
pk_full *= self.compute_velocity_dispersion_lorentz(params)
pk_full *= self.compute_velocity_dispersion_gauss(params)
else:
raise ValueError('"velocity dispersion" must be of type'
' "gauss" or "lorentz".')
return pk_full
[docs] def compute_kaiser(self, bias1, beta1, bias2, beta2, fast_metals=False):
"""Compute Kaiser model.
Parameters
----------
bias1 : float
Bias for tracer 1
beta1 : float
Beta for tracer 1
bias2 : float
Bias for tracer 2
beta2 : float
Beta for tracer 2
Returns
-------
ND Array
Kaiser term
"""
pk = (1 + beta1 * self.muk_grid**2)
pk = pk * (1 + beta2 * self.muk_grid**2)
if not fast_metals:
pk *= bias1 * bias2
return pk
[docs] def compute_bias_beta_uv(self, bias, beta, params):
""" Compute effective biases that include UV modeling.
Parameters
----------
bias : float
Bias for tracer
beta : float
Beta for tracer
params : dict
Computation parameters
Returns
-------
(float, float)
Effective bias and beta
"""
bias_gamma = params["bias_gamma"]
bias_prim = params["bias_prim"]
lambda_uv = params["lambda_uv"]
W = np.arctan(self.k_grid * lambda_uv) / (self.k_grid * lambda_uv)
beta_eff = beta / (1 + bias_gamma / bias * W / (1 + bias_prim * W))
bias_eff = bias + bias_gamma * W / (1 + bias_prim * W)
return bias_eff, beta_eff
[docs] def compute_bias_beta_hcd(self, bias, beta, params):
""" Compute effective biases that include HCD modeling.
Parameters
----------
bias : float
Bias for tracer
beta : float
Beta for tracer
params : dict
Computation parameters
Returns
-------
(float, float)
Effective bias and beta
"""
# Check if we have an HCD bias for each component
hcd_bias_name = "bias_hcd_{}".format(self._name)
bias_hcd = params.get(hcd_bias_name, None)
if bias_hcd is None:
bias_hcd = params['bias_hcd']
# Get the other parameters
beta_hcd = params["beta_hcd"]
# Check which model we need
if 'Rogers' in self.hcd_model:
L0 = params["L0_hcd"]
self._compute_hcd_cached(self._hcd_Rogers2018, L0, self.k_par_grid)
elif 'fvoigt' in self.hcd_model:
assert self._Fvoigt_data is not None
L0 = params.get("L0_fvoigt", 1)
self._compute_hcd_cached(self._hcd_fvoigt, L0)
elif 'sinc' in self.hcd_model:
L0 = params.get("L0_sinc", 1)
self._compute_hcd_cached(self._hcd_sinc, L0)
else:
raise ValueError(f"Unknown hcd model {self.hcd_model}. "
"Choose from ['Rogers', 'fvoigt', 'sinc']")
bias_eff = bias + bias_hcd * self._F_hcd
beta_eff = (bias * beta + bias_hcd * beta_hcd * self._F_hcd)
beta_eff /= (bias + bias_hcd * self._F_hcd)
return bias_eff, beta_eff
def _compute_hcd_cached(self, func, L0_hcd, *args):
if L0_hcd != self._L0_hcd_cache or self._F_hcd is None:
self._F_hcd = func(L0_hcd, *args)
self._L0_hcd_cache = L0_hcd
def _hcd_sinc(self, L0):
"""HCD sinc model.
Parameters
----------
L0 : float
Characteristic length scale of HCDs
Returns
-------
ND Array
F_hcd
"""
return utils.sinc(self.k_par_grid * L0)
@staticmethod
@njit(float64[:, :](float64, float64[:, :]))
def _hcd_Rogers2018(L0, k_par_grid):
"""Model the effect of HCD systems with the Fourier transform
of a Lorentzian profile. Motivated by Rogers et al. (2018).
Parameters
----------
L0 : float
Characteristic length scale of HCDs
Returns
-------
ND Array
F_hcd
"""
f_hcd = np.exp(-L0 * k_par_grid)
return f_hcd
def _hcd_fvoigt(self, L0):
"""Use Fvoigt function to fit the DLA in the autocorrelation Lyman-alpha
without masking them ! (L0 = 1)
(If you want to mask them use Fvoigt_exp.txt and L0 = 10 as eBOSS DR14)
Parameters
----------
L0 : float
Characteristic length scale of HCDs
Returns
-------
ND Array
F_hcd
"""
k_data = self._Fvoigt_data[:, 0]
F_data = self._Fvoigt_data[:, 1]
F_hcd = np.interp(L0 * self.k_par_grid, k_data, F_data, left=1, right=0)
return F_hcd
[docs] def compute_peak_nl(self, params):
"""Compute the non-linear gaussian correction for the peak component.
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
Smoothing factor for the peak
"""
sigma_par = params.get('sigmaNL_par', None)
sigma_trans = params.get('sigmaNL_per', None)
growth_rate = params.get('growth_rate')
if sigma_par is None and sigma_trans is not None:
sigma_par = sigma_trans * (1 + growth_rate)
elif sigma_trans is None and sigma_par is not None:
sigma_trans = sigma_par / (1 + growth_rate)
elif sigma_par is None and sigma_trans is None:
raise ValueError('No parameters for peak NL found.'
' Add sigmaNL_par and/or sigmaNL_par.')
if self._peak_nl_pars is None:
self._peak_nl_pars = np.array([sigma_par, sigma_trans]) + 1
if not np.allclose(np.array([sigma_par, sigma_trans]),
self._peak_nl_pars):
peak_nl = self.k_par_grid**2 * sigma_par**2
peak_nl += self.k_trans_grid**2 * sigma_trans**2
self._peak_nl_pars = np.array([sigma_par, sigma_trans])
self._peak_nl_cache = np.exp(-peak_nl / 2)
return self._peak_nl_cache
[docs] def compute_dnl_mcdonald(self):
"""Non linear term from McDonald 2003.
Returns
-------
ND Array
D_NL factor
"""
assert self.tracer1_name == "LYA"
assert self.tracer2_name == "LYA"
kvel = 1.22 * (1 + self.k_grid / 0.923)**0.451
dnl = (self.k_grid / 6.4)**0.569 - (self.k_grid / 15.3)**2.01
dnl = dnl - (self.k_grid * self.muk_grid / kvel)**1.5
return np.exp(dnl)
[docs] def compute_dnl_arinyo(self, params):
"""Non linear term from Arinyo et al 2015.
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
D_NL factor
"""
two_lya_flag = "LY" in self.tracer1_name and "LY" in self.tracer2_name
one_lya_flag = "LY" in self.tracer1_name or "LY" in self.tracer2_name
q1 = params["dnl_arinyo_q1"]
kv = params["dnl_arinyo_kv"]
av = params["dnl_arinyo_av"]
bv = params["dnl_arinyo_bv"]
kp = params["dnl_arinyo_kp"]
if self._arinyo_pars is None:
self._arinyo_pars = np.array([q1, kv, av, bv, kp]) + 1
if not np.allclose(np.array([q1, kv, av, bv, kp]), self._arinyo_pars):
growth = q1 * self.k_grid**3 * self._pk_fid / (2 * np.pi**2)
pec_velocity = (self.k_grid / kv)**av * np.abs(self.muk_grid)**bv
pressure = (self.k_grid / kp) * (self.k_grid / kp)
dnl = np.exp(growth * (1 - pec_velocity) - pressure)
self._arinyo_pars = np.array([q1, kv, av, bv, kp])
if two_lya_flag:
self._arinyo_dnl_cache = dnl
elif one_lya_flag:
self._arinyo_dnl_cache = np.sqrt(dnl)
else:
return np.ones(dnl.shape)
# raise ValueError("Arinyo NL term called for correlation with no Lyman-alpha")
return self._arinyo_dnl_cache
[docs] def compute_Gk(self, params):
"""Model the effect of binning of the cf.
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
G(k)
"""
bin_size_rp = params.get("par binsize {}".format(self._name), self._bin_size_rp)
bin_size_rt = params.get("per binsize {}".format(self._name), self._bin_size_rt)
Gk = 1.
if bin_size_rp != 0:
Gk = Gk * utils.sinc(self.k_par_grid * bin_size_rp / 2)
if bin_size_rt != 0:
Gk = Gk * utils.sinc(self.k_trans_grid * bin_size_rt / 2)
return Gk
[docs] def compute_fullshape_gauss_smoothing(self, params):
"""Compute a Gaussian smoothing for the full correlation function.
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
Smoothing factor
"""
sigma_par = params.get('par_sigma_smooth', None)
sigma_trans = params.get('per_sigma_smooth', None)
if sigma_par is None and sigma_trans is None:
raise ValueError('Asked for fullshape gaussian smoothing without setting the'
' smoothing parameters (par_sigma_smooth and/or per_sigma_smooth).')
elif sigma_par is None:
sigma_par = sigma_trans
elif sigma_trans is None:
sigma_trans = sigma_par
gauss_smoothing = self.k_par_grid**2 * sigma_par**2
gauss_smoothing += self.k_trans_grid**2 * sigma_trans**2
return np.exp(-gauss_smoothing / 2)**2
[docs] def compute_fullshape_exp_smoothing(self, params):
""" Compute a Gaussian and exp smoothing for the full
correlation function (usefull for london_mocks_v6.0).
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
Smoothing factor
"""
# Get the parameters
sigma_par_sq = params['par_sigma_smooth']**2
sigma_trans_sq = params['per_sigma_smooth']**2
exp_par_sq = params['par_exp_smooth']**2
exp_trans_sq = params['per_exp_smooth']**2
# Compute the smoothing components
gauss_smoothing = self.k_par_grid**2 * sigma_par_sq
gauss_smoothing += self.k_trans_grid**2 * sigma_trans_sq
exp_smoothing = np.abs(self.k_par_grid) * exp_par_sq
exp_smoothing += np.abs(self.k_trans_grid) * exp_trans_sq
return np.exp(-gauss_smoothing / 2) * np.exp(-exp_smoothing)
[docs] def compute_velocity_dispersion_gauss(self, params):
"""Compute a gaussian smoothing factor to model velocity dispersion.
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
Smoothing factor
"""
assert 'discrete' in [self.tracer1_type, self.tracer2_type]
smoothing = np.ones(self.k_par_grid.shape)
if self.tracer1_type == 'discrete':
sigma = params['sigma_velo_disp_gauss_' + self.tracer1_name]
smoothing *= np.exp(-0.25 * (self.k_par_grid * sigma)**2)
if self.tracer2_type == 'discrete':
sigma = params['sigma_velo_disp_gauss_' + self.tracer2_name]
smoothing *= np.exp(-0.25 * (self.k_par_grid * sigma)**2)
return smoothing
[docs] def compute_velocity_dispersion_lorentz(self, params):
"""Compute a lorentzian smoothing factor to model velocity dispersion.
Parameters
----------
params : dict
Computation parameters
Returns
-------
ND Array
Smoothing factor
"""
assert 'discrete' in [self.tracer1_type, self.tracer2_type]
smoothing = np.ones(self.k_par_grid.shape)
if self.tracer1_type == 'discrete':
sigma = params['sigma_velo_disp_lorentz_' + self.tracer1_name]
smoothing *= 1. / np.sqrt(1 + (self.k_par_grid * sigma)**2)
if self.tracer2_type == 'discrete':
sigma = params['sigma_velo_disp_lorentz_' + self.tracer2_name]
smoothing *= 1. / np.sqrt(1 + (self.k_par_grid * sigma)**2)
return smoothing