import numpy as np
from astropy.io import fits
from scipy import linalg
from scipy import sparse
from scipy.sparse import csr_matrix
from vega.utils import find_file
BLINDING_STRATEGIES = ['desi_m2', 'desi_y1']
[docs]class Data:
"""Class for handling lya forest correlation function data.
An instance of this is required for each cf component
"""
_data_vec = None
_masked_data_vec = None
_cov_mat = None
_distortion_mat = None
_inv_masked_cov = None
_log_cov_det = None
_blind = None
rp_rt_custom_grid = None
def __init__(self, corr_item):
"""
Parameters
----------
corr_item : CorrelationItem
Item object with the component config
"""
# First save the tracer info
self.corr_item = corr_item
self.tracer1 = corr_item.tracer1
self.tracer2 = corr_item.tracer2
# Read the data file and init the corrdinate grids
data_path = corr_item.config['data'].get('filename')
dmat_path = corr_item.config['data'].get('distortion-file', None)
rp_rt_grid, z_grid = self._read_data(data_path, corr_item.config['cuts'], dmat_path)
self.corr_item.rp_rt_grid = rp_rt_grid
self.corr_item.z_grid = z_grid
self.corr_item.bin_size_rp_model = self.bin_size_rp_model
self.corr_item.bin_size_rt_model = self.bin_size_rt_model
self.corr_item.bin_size_rp_data = self.bin_size_rp_data
self.corr_item.bin_size_rt_data = self.bin_size_rt_data
# Read the metal file and init metals in the corr item
if 'metals' in corr_item.config:
tracer_catalog, metal_correlations = self._init_metals(
corr_item.config['metals'])
self.corr_item.init_metals(tracer_catalog, metal_correlations)
# Check if we have broadband
if 'broadband' in corr_item.config:
self.corr_item.init_broadband(self.coeff_binning_model)
if not self.has_distortion:
self._distortion_mat = np.eye(self.full_data_size)
if not self.has_cov_mat:
self._cov_mat = np.eye(self.full_data_size)
self._cholesky = None
self._scale = 1.
self.scaled_inv_masked_cov = None
self.scaled_log_cov_det = None
@property
def blind(self):
"""Blinding flag property
Returns
-------
bool
Blinding flag
"""
return self._blind
@property
def data_vec(self):
"""Full data vector property
Returns
-------
1D array
Full data vector (xi)
"""
return self._data_vec
@property
def masked_data_vec(self):
"""Masked data vector property
Returns
-------
1D array
Masked data vector (xi[mask])
"""
if self._masked_data_vec is None:
self._masked_data_vec = np.zeros(self.data_mask.sum())
self._masked_data_vec[:] = self.data_vec[self.data_mask]
return self._masked_data_vec
@property
def cov_mat(self):
"""Covariance matrix property
Returns
-------
2D array
Covariance matrix
"""
if self._cov_mat is None:
raise AttributeError(
'No covariance matrix found. Check for it in the data file: ',
self.corr_item.config['data'].get('filename'))
return self._cov_mat
@property
def distortion_mat(self):
"""Distortion matrix property
Returns
-------
2D array
Distortion matrix
"""
if self._distortion_mat is None:
raise AttributeError(
'No distortion matrix found. Check for it in the data file: ',
self.corr_item.config['data'].get('filename'))
return self._distortion_mat
@property
def inv_masked_cov(self):
"""Inverse masked covariance matrix property
Returns
-------
2D array
Inverse masked covariance matrix
"""
if self._inv_masked_cov is None:
# Compute inverse of the covariance matrix
masked_cov = self.cov_mat[:, self.data_mask]
masked_cov = masked_cov[self.data_mask, :]
try:
linalg.cholesky(self.cov_mat)
print('LOG: Full matrix is positive definite')
except linalg.LinAlgError:
print('WARNING: Full matrix is not positive definite')
try:
linalg.cholesky(masked_cov)
print('LOG: Reduced matrix is positive definite')
except linalg.LinAlgError:
print('WARNING: Reduced matrix is not positive definite')
self._inv_masked_cov = linalg.inv(masked_cov)
return self._inv_masked_cov
@property
def log_cov_det(self):
"""Logarithm of the determinant of the covariance matrix property
Returns
-------
float
Logarithm of the determinant of the covariance matrix
"""
if self._log_cov_det is None:
# Compute the log determinant using and LDL^T decomposition
# |C| = Product of Diagonal components of D
masked_cov = self.cov_mat[:, self.data_mask]
masked_cov = masked_cov[self.data_mask, :]
_, d, __ = linalg.ldl(masked_cov)
self._log_cov_det = np.log(d.diagonal()).sum()
assert isinstance(self.log_cov_det, float)
return self._log_cov_det
@property
def has_cov_mat(self):
"""Covariance matrix flag
Returns
-------
bool
Covariance matrix flag
"""
return self._cov_mat is not None
@property
def has_distortion(self):
"""Distortion matrix flag
Returns
-------
bool
Distortion matrix flag
"""
return self._distortion_mat is not None
def _read_data(self, data_path, cuts_config, dmat_path=None):
"""Read the data, mask it and prepare the environment.
Parameters
----------
data_path : string
Path to fits data file
cuts_config : ConfigParser
cuts section from the config file
"""
print(f'Reading data file {data_path}\n')
hdul = fits.open(find_file(data_path))
header = hdul[1].header
self._blinding_strat = None
if 'BLINDING' in header:
self._blinding_strat = header['BLINDING']
if self._blinding_strat == 'none' or self._blinding_strat == 'None':
self._blinding_strat = None
dmat_column_name = 'DM'
if self._blinding_strat in BLINDING_STRATEGIES:
print(f'Warning! Running on blinded data {data_path}')
print(f'Strategy: {self._blinding_strat}. BAO can be sampled')
self._blind = True
self._data_vec = hdul[1].data['DA_BLIND']
dmat_column_name += '_BLIND'
if dmat_column_name in hdul[1].columns.names and dmat_path is None:
self._distortion_mat = csr_matrix(hdul[1].data[dmat_column_name])
elif self._blinding_strat == 'desi_y3':
raise ValueError('Fits are forbidden on Y3 data as we do not have'
' a coherent blinding strategy yet.')
elif self._blinding_strat is None:
self._blind = False
self._data_vec = hdul[1].data['DA']
if dmat_column_name in hdul[1].columns.names:
self._distortion_mat = csr_matrix(hdul[1].data[dmat_column_name])
else:
self._blind = True
raise ValueError(f"Unknown blinding strategy {self._blinding_strat}.")
if 'CO' in hdul[1].columns.names:
self._cov_mat = hdul[1].data['CO']
rp_grid = hdul[1].data['RP']
rt_grid = hdul[1].data['RT']
z_grid = hdul[1].data['Z']
self.rp_min_data = header['RPMIN']
self.rp_max_data = header['RPMAX']
self.rt_max_data = header['RTMAX']
self.num_bins_rp_data = header['NP']
self.num_bins_rt_data = header['NT']
# Get the data bin size
# TODO If RTMIN is ever added to the cf data files this needs modifying
self.bin_size_rp_data = (self.rp_max_data - self.rp_min_data) / self.num_bins_rp_data
self.bin_size_rt_data = self.rt_max_data / self.num_bins_rt_data
if 'NB' in hdul[1].columns.names:
self.nb = hdul[1].data['NB']
else:
self.nb = None
if len(hdul) > 2:
rp_grid_model = hdul[2].data['DMRP']
rt_grid_model = hdul[2].data['DMRT']
z_grid_model = hdul[2].data['DMZ']
else:
rp_grid_model = rp_grid
rt_grid_model = rt_grid
z_grid_model = z_grid
hdul.close()
# Compute the data mask
self.data_mask = self._build_mask(rp_grid, rt_grid, cuts_config, self.rp_min_data,
self.bin_size_rp_data, self.bin_size_rt_data)
# Read distortion matrix and initialize coordinate grids for the model
if dmat_path is not None:
rp_grid_model, rt_grid_model, z_grid_model = self._read_dmat(dmat_path, cuts_config)
else:
self.rp_min_model = self.rp_min_data
self.rp_max_model = self.rp_max_data
self.rt_max_model = self.rt_max_data
self.num_bins_rp_model = self.num_bins_rp_data
self.num_bins_rt_model = self.num_bins_rt_data
self.bin_size_rp_model = self.bin_size_rp_data
self.bin_size_rt_model = self.bin_size_rt_data
self.coeff_binning_model = 1
self.model_mask = self.data_mask
self.data_size = len(self.masked_data_vec)
self.full_data_size = len(self.data_vec)
# TODO this was needed for post distortion BB polynomials. Fix at some point!
# self.r_square_grid = np.sqrt(rp_grid**2 + rt_grid**2)
# self.mu_square_grid = np.zeros(self.r_square_grid.size)
# w = self.r_square_grid > 0.
# self.mu_square_grid[w] = rp_grid[w] / self.r_square_grid[w]
# return the model coordinate grids
rp_rt_grid = np.array([rp_grid_model, rt_grid_model])
return rp_rt_grid, z_grid_model
def _check_if_blinding_matches(self, blinding_flag, dmat_path):
if self._blinding_strat is None:
if not (blinding_flag == 'none' or blinding_flag == 'None'):
print(f'Warning: Data has no blinding, but distortion matrix at {dmat_path} '
f'has a blinding flag {blinding_flag}')
else:
if self._blinding_strat != blinding_flag:
print(f'Warning: Data has a blinding flag {blinding_flag} that does not match '
f'the flag of the distortion matrix at {dmat_path}')
def _read_dmat(self, dmat_path, cuts_config):
print(f'Reading distortion matrix file {dmat_path}\n')
hdul = fits.open(find_file(dmat_path))
header = hdul[1].header
if 'BLINDING' in header:
self._check_if_blinding_matches(header['BLINDING'], dmat_path)
dmat_column_name = 'DM'
if 'BLINDING' in header:
if header['BLINDING'] != 'none':
dmat_column_name = 'DM_BLIND'
self._distortion_mat = csr_matrix(hdul[1].data[dmat_column_name])
rp_grid = hdul[2].data['RP']
rt_grid = hdul[2].data['RT']
z_grid = hdul[2].data['Z']
hdul.close()
self.rp_min_model = header['RPMIN']
self.rp_max_model = header['RPMAX']
self.rt_max_model = header['RTMAX']
self.coeff_binning_model = header['COEFMOD']
self.num_bins_rp_model = header['NP'] * self.coeff_binning_model
self.num_bins_rt_model = header['NT'] * self.coeff_binning_model
# Get the model bin size
# TODO If RTMIN is ever added to the cf data files this needs modifying
self.bin_size_rp_model = (self.rp_max_model - self.rp_min_model) / self.num_bins_rp_model
self.bin_size_rt_model = self.rt_max_model / self.num_bins_rt_model
if ((self.bin_size_rp_model != self.bin_size_rp_data)
or (self.bin_size_rt_model != self.bin_size_rt_data)):
rp_custom_grid = np.arange(self.rp_min_model + self.bin_size_rp_data / 2,
self.rp_max_model, self.bin_size_rp_data)
rt_custom_grid = np.arange(self.bin_size_rt_data / 2,
self.rt_max_model, self.bin_size_rt_data)
rt_custom_grid, rp_custom_grid = np.meshgrid(rt_custom_grid, rp_custom_grid)
self.model_mask = self._build_mask(
rp_custom_grid.flatten(), rt_custom_grid.flatten(), cuts_config, self.rp_min_model,
self.bin_size_rp_data, self.bin_size_rt_data
)
self.rp_rt_custom_grid = np.r_[rp_custom_grid, rt_custom_grid]
else:
self.model_mask = self._build_mask(
rp_grid, rt_grid, cuts_config, self.rp_min_model,
self.bin_size_rp_data, self.bin_size_rt_data
)
return rp_grid, rt_grid, z_grid
def _build_mask(self, rp_grid, rt_grid, cuts_config, rp_min, bin_size_rp, bin_size_rt):
"""Build the mask for the data by comparing
the cuts from config with the data limits.
Parameters
----------
rp_grid : 1D Array
Vector of data rp coordinates
rt_grid : 1D Array
Vector of data rt coordinates
cuts_config : ConfigParser
cuts section from config
header : fits header
Data file header
Returns
-------
(ND Array, float, float)
Mask, Bin size in rp, Bin size in rt
"""
# Read the cuts
rp_min_cut = cuts_config.getfloat('rp-min', 0.)
rp_max_cut = cuts_config.getfloat('rp-max', 200.)
rt_min_cut = cuts_config.getfloat('rt-min', 0.)
rt_max_cut = cuts_config.getfloat('rt-max', 200.)
self.r_min_cut = cuts_config.getfloat('r-min', 10.)
self.r_max_cut = cuts_config.getfloat('r-max', 180.)
self.mu_min_cut = cuts_config.getfloat('mu-min', -1.)
self.mu_max_cut = cuts_config.getfloat('mu-max', +1.)
# Compute bin centers
bin_index_rp = np.floor((rp_grid - rp_min) / bin_size_rp)
bin_center_rp = rp_min + (bin_index_rp + 0.5) * bin_size_rp
bin_center_rt = (np.floor(rt_grid / bin_size_rt) + 0.5) * bin_size_rt
bin_center_r = np.sqrt(bin_center_rp**2 + bin_center_rt**2)
bin_center_mu = bin_center_rp / bin_center_r
# Build the mask by comparing the data bins to the cuts
mask = (bin_center_rp > rp_min_cut) & (bin_center_rp < rp_max_cut)
mask &= (bin_center_rt > rt_min_cut) & (bin_center_rt < rt_max_cut)
mask &= (bin_center_r > self.r_min_cut) & (bin_center_r < self.r_max_cut)
mask &= (bin_center_mu > self.mu_min_cut) & (bin_center_mu < self.mu_max_cut)
return mask
def _init_metals(self, metal_config):
"""Read the metal file and initialize all the metal data.
Parameters
----------
metal_config : ConfigParser
metals section from the config file
Returns
-------
dict
Dictionary containing all tracer objects (metals and the core ones)
list
list of all metal correlations we need to compute
"""
assert ('in tracer1' in metal_config) or ('in tracer2' in metal_config)
# Read metal tracers
metals_in_tracer1 = None
metals_in_tracer2 = None
if 'in tracer1' in metal_config:
metals_in_tracer1 = metal_config.get('in tracer1').split()
if 'in tracer2' in metal_config:
metals_in_tracer2 = metal_config.get('in tracer2').split()
self.metal_mats = {}
self.metal_rp_grids = {}
self.metal_rt_grids = {}
self.metal_z_grids = {}
# Build tracer Catalog
tracer_catalog = {}
tracer_catalog[self.tracer1['name']] = self.tracer1
tracer_catalog[self.tracer2['name']] = self.tracer2
if metals_in_tracer1 is not None:
for metal in metals_in_tracer1:
tracer_catalog[metal] = {'name': metal, 'type': 'continuous'}
if metals_in_tracer2 is not None:
for metal in metals_in_tracer2:
tracer_catalog[metal] = {'name': metal, 'type': 'continuous'}
# Read the metal file
metal_hdul = fits.open(find_file(metal_config.get('filename')))
dm_prefix = 'DM_'
if 'BLINDING' in metal_hdul[1].header:
if metal_hdul[1].header['BLINDING'] != 'none':
dm_prefix = 'DM_BLIND_'
metal_correlations = []
# First look for correlations between tracer1 and metals
if 'in tracer2' in metal_config:
for metal in metals_in_tracer2:
if not self._use_correlation(self.tracer1['name'], metal):
continue
tracers = (self.tracer1['name'], metal)
name = self.tracer1['name'] + '_' + metal
if 'RP_' + name not in metal_hdul[2].columns.names:
name = metal + '_' + self.tracer1['name']
self._read_metal_correlation(metal_hdul, tracers, name, dm_prefix)
metal_correlations.append(tracers)
# Then look for correlations between metals and tracer2
# If we have an auto-cf the files are saved in the format tracer-metal
if 'in tracer1' in metal_config:
for metal in metals_in_tracer1:
if not self._use_correlation(metal, self.tracer2['name']):
continue
tracers = (metal, self.tracer2['name'])
name = metal + '_' + self.tracer2['name']
if 'RP_' + name not in metal_hdul[2].columns.names:
name = self.tracer2['name'] + '_' + metal
self._read_metal_correlation(metal_hdul, tracers, name, dm_prefix)
metal_correlations.append(tracers)
# Finally look for metal-metal correlations
# Some files are reversed order, so reverse order if we don't find it
if ('in tracer1' in metal_config) and ('in tracer2' in metal_config):
for i, metal1 in enumerate(metals_in_tracer1):
j0 = i if self.tracer1 == self.tracer2 else 0
for metal2 in metals_in_tracer2[j0:]:
if not self._use_correlation(metal1, metal2):
continue
tracers = (metal1, metal2)
name = metal1 + '_' + metal2
if 'RP_' + name not in metal_hdul[2].columns.names:
name = metal2 + '_' + metal1
self._read_metal_correlation(metal_hdul, tracers, name, dm_prefix)
metal_correlations.append(tracers)
metal_hdul.close()
return tracer_catalog, metal_correlations
@staticmethod
def _use_correlation(name1, name2):
"""Check if a correlation should be used or not
Parameters
----------
name1 : string
Name of tracer 1
name2 : string
Name of tracer 2
Returns
-------
Bool
Flag for using the correlation between tracer 1 and 2
"""
# For CIV we only want it's autocorrelation
if name1 == 'CIV(eff)' or name2 == 'CIV(eff)':
return name1 == name2
else:
return True
def _read_metal_correlation(self, metal_hdul, tracers, name, dm_prefix):
"""Read a metal correlation from the metal file and add
the data to the existing member dictionaries.
Parameters
----------
metal_hdul : hduList
hduList object for the metal file
tracers : tuple
Tuple with the names of the two tracers
name : string
The name of the specific correlation to be read from file
"""
self.metal_rp_grids[tracers] = metal_hdul[2].data['RP_' + name]
self.metal_rt_grids[tracers] = metal_hdul[2].data['RT_' + name]
self.metal_z_grids[tracers] = metal_hdul[2].data['Z_' + name]
metal_mat_size = len(self.metal_rp_grids[tracers])
dm_name = dm_prefix + name
if dm_name in metal_hdul[2].columns.names:
self.metal_mats[tracers] = csr_matrix(metal_hdul[2].data[dm_name])
elif len(metal_hdul) > 3 and dm_name in metal_hdul[3].columns.names:
self.metal_mats[tracers] = csr_matrix(metal_hdul[3].data[dm_name])
elif self.corr_item.test_flag:
self.metal_mats[tracers] = sparse.eye(metal_mat_size)
else:
raise ValueError("Cannot find correct metal matrices."
" Check that blinding is consistent between cf and metal files.")
[docs] def create_monte_carlo(self, fiducial_model, scale=1., seed=0,
forecast=False):
"""Create monte carlo mock of data using a fiducial model.
Parameters
----------
fiducial_model : 1D Array
Fiducial model of the data
scale : float, optional
Scaling for the covariance, by default 1.
seed : int, optional
Seed for the random number generator, by default 0
forecast : boolean, optional
Forecast option. If true, we don't add noise to the mock,
by default False
Returns
-------
1D Array
Monte Carlo mock of the data
"""
# Check if scale has changed and we need to recompute
if np.isclose(scale, self._scale):
self._recompute = False
else:
self._scale = scale
self._recompute = True
self.scaled_inv_masked_cov = self.inv_masked_cov / self._scale
self.scaled_log_cov_det = np.log(self._scale) + self.log_cov_det
if self.scaled_inv_masked_cov is None:
self.scaled_inv_masked_cov = self.inv_masked_cov
if self.scaled_log_cov_det is None:
self.scaled_log_cov_det = self.log_cov_det
# Compute cholesky decomposition
if (self._cholesky is None or self._recompute) and not forecast:
self._cholesky = linalg.cholesky(self._scale * self.cov_mat)
# Create the mock
np.random.seed(seed)
if forecast:
self.mc_mock = fiducial_model
else:
ran_vec = np.random.randn(self.full_data_size)
self.mc_mock = self._cholesky.dot(ran_vec) + fiducial_model
self.masked_mc_mock = self.mc_mock[self.model_mask]
return self.masked_mc_mock