from pathlib import Path
import os.path
from astropy.io import fits
import numpy as np
import h5py
[docs]class Output:
"""Class for handling the Vega output,
and reading/writing output files.
"""
def __init__(self, config, data, corr_items, analysis=None):
"""
Parameters
----------
config : ConfigParser
Output section of main config file
data : dict
Vega Data objects
corr_items : dict
Vega correlation_item objects
analysis : Analysis, optional
Analysis object, by default None
"""
self.data = data
self.analysis = analysis
self.corr_items = corr_items
self.type = config.get('type', 'fits')
self.overwrite = config.get('overwrite', False)
self.outfile = os.path.expandvars(config['filename'])
self.output_cf = config.getboolean('write_cf', False)
self.output_pk = config.getboolean('write_pk', False)
[docs] def write_results(self, corr_funcs, params, minimizer=None, scan_results=None, models=None):
"""Write results in the fits or hdf format
Parameters
----------
corr_funcs : dict
Model correlation functions to write to file.
This should be the output of vega.compute_model()
params : dict
Parameters to write to file. These should be the
parameters vega.compute_model() was called with.
minimizer : Minimizer, optional
Minimizer object after minimization was done, by default None
scan_results : list, optional
List of scan results, by default None
models : dict, optional
Dictionary with the Vega Model objects, by default None
"""
if self.type == 'fits':
self.write_results_fits(corr_funcs, params, minimizer, scan_results, models)
elif self.type == 'hdf' or self.type == 'h5':
self.write_results_hdf(minimizer, scan_results)
else:
raise ValueError('Unknown output type. Set type = fits'
' or type = hdf')
[docs] def write_results_fits(self, corr_funcs, params, minimizer=None, scan_results=None,
models=None):
"""Write output in the fits format
Parameters
----------
corr_funcs : dict
Model correlation functions to write to file.
This should be the output of vega.compute_model()
params : dict
Parameters to write to file. These should be the
parameters vega.compute_model() was called with.
minimizer : Minimizer, optional
Minimizer object after minimization was done, by default None
scan_results : list, optional
List of scan results, by default None
models : dict, optional
Dictionary with the Vega Model objects, by default None
"""
if self.data is None:
raise ValueError('Output object was initialized with an invalid data object.'
' Reinitialize with a valid vega.data object.')
primary_hdu = fits.PrimaryHDU()
model_hdu = self._model_hdu(corr_funcs, params)
hdu_list = [primary_hdu, model_hdu]
if minimizer is not None:
bestfit_hdu = self._bestfit_hdu(minimizer)
hdu_list.append(bestfit_hdu)
if self.output_pk:
assert models is not None
for key, model in models.items():
pk_hdu = self._pk_hdu(key, model)
hdu_list.append(pk_hdu)
if self.output_cf:
assert models is not None
for key, model in models.items():
cf_hdu = self._cf_hdu(key, model)
hdu_list.append(cf_hdu)
if scan_results is not None:
assert minimizer is not None
scan_hdu = self._scan_hdu(scan_results, minimizer)
hdu_list.append(scan_hdu)
hdul = fits.HDUList(hdu_list)
if self.outfile[-5:] != '.fits':
self.outfile += '.fits'
hdul.writeto(Path(self.outfile), overwrite=self.overwrite)
@staticmethod
def pad_array(array, size_to_match, pad_value=0.):
return np.pad(array, (0, size_to_match - len(array)), constant_values=pad_value)
def _model_hdu(self, corr_funcs, params):
"""Create HDU with the computed model correlations,
and the parameters used to compute them
Parameters
----------
corr_funcs : dict
Output correlations given compute_model
params : dict
Computation parameters
Returns
-------
astropy.io.fits.hdu.table.BinTableHDU
HDU with the model correlation
"""
sizes = {name: len(cf) for name, cf in corr_funcs.items()}
num_rows = np.max(list(sizes.values()))
columns = []
for name, cf in corr_funcs.items():
if len(self.data[name].data_vec) > num_rows:
raise ValueError('Data coordinate grid is larger than the model grid.')
columns.append(fits.Column(
name=name+'_MODEL', format='D', array=self.pad_array(cf, num_rows)))
columns.append(fits.Column(
name=name+'_MODEL_MASK', format='L',
array=self.pad_array(self.data[name].model_mask, num_rows, False)
))
columns.append(fits.Column(
name=name+'_MASK', format='L',
array=self.pad_array(self.data[name].data_mask, num_rows, False)
))
columns.append(fits.Column(
name=name+'_DATA', format='D',
array=self.pad_array(self.data[name].data_vec, num_rows)
))
columns.append(fits.Column(
name=name+'_VAR', format='D',
array=self.pad_array(self.data[name].cov_mat.diagonal(), num_rows)
))
if num_rows < self.corr_items[name].rp_rt_grid.shape[1]:
columns.append(fits.Column(
name=name+'_RP', format='D',
array=self.pad_array(self.data[name].rp_rt_custom_grid[0], num_rows)
))
columns.append(fits.Column(
name=name+'_RT', format='D',
array=self.pad_array(self.data[name].rp_rt_custom_grid[1], num_rows)
))
else:
columns.append(fits.Column(
name=name+'_RP', format='D',
array=self.pad_array(self.corr_items[name].rp_rt_grid[0], num_rows)
))
columns.append(fits.Column(
name=name+'_RT', format='D',
array=self.pad_array(self.corr_items[name].rp_rt_grid[1], num_rows)
))
if num_rows < self.corr_items[name].z_grid.shape[0]:
columns.append(fits.Column(name=name+'_Z', format='D', array=np.zeros(num_rows)))
else:
columns.append(fits.Column(
name=name+'_Z', format='D',
array=self.pad_array(self.corr_items[name].z_grid, num_rows)
))
if self.data[name].nb is not None:
columns.append(fits.Column(name=name+'_NB', format='K',
array=self.pad_array(self.data[name].nb, num_rows)))
model_hdu = fits.BinTableHDU.from_columns(columns)
model_hdu.name = 'Model'
for name, size in sizes.items():
card_name = 'hierarch ' + name + '_size'
model_hdu.header[card_name] = size
for par, val in params.items():
card_name = 'hierarch ' + par
model_hdu.header[card_name] = val
return model_hdu
def _bestfit_hdu(self, minimizer):
"""Create HDU with the bestfit info
Parameters
----------
minimizer : Minimizer
Minimizer object after minimization was done
Returns
-------
astropy.io.fits.hdu.table.BinTableHDU
HDU with the bestfit data
"""
# Get parameter names
names = np.array(list(minimizer.values.keys()))
# Check if any parameter name is too long
max_length = np.max([len(name) for name in names])
name_format = str(max_length) + 'A'
# Get parameter values and errors
values = np.array([minimizer.values[name] for name in names])
errors = np.array([minimizer.errors[name] for name in names])
num_pars = len(names)
# Build the covariance matrix
cov_mat = np.array(minimizer.covariance)
cov_format = str(num_pars) + 'D'
# Create the columns with the bestfit data
col1 = fits.Column(name='names', format=name_format, array=names)
col2 = fits.Column(name='values', format='D', array=values)
col3 = fits.Column(name='errors', format='D', array=errors)
col4 = fits.Column(name='covariance', format=cov_format, array=cov_mat)
# Create the Table HDU from the columns
bestfit_hdu = fits.BinTableHDU.from_columns([col1, col2, col3, col4])
bestfit_hdu.name = 'Bestfit'
# Add all the attributes of the minimum to the header
# for item, value in minimizer.fmin.items():
# name = item
# if len(item) > 8:
# name = 'hierarch ' + item
bestfit_hdu.header['FVAL'] = minimizer.fmin.fval
bestfit_hdu.header['VALID'] = minimizer.minuit.valid
bestfit_hdu.header['ACCURATE'] = minimizer.minuit.accurate
bestfit_hdu.header.comments['TTYPE1'] = 'Names of sampled parameters'
bestfit_hdu.header.comments['TTYPE2'] = 'Bestfit values of sampled parameters'
bestfit_hdu.header.comments['TTYPE3'] = 'Errors around the bestfit'
bestfit_hdu.header.comments['TTYPE4'] = 'Covariance matrix around the bestfit'
bestfit_hdu.header.comments['FVAL'] = 'Bestfit chi^2 value'
bestfit_hdu.header.comments['VALID'] = 'Flag for valid fit'
bestfit_hdu.header.comments['ACCURATE'] = 'Flag for accurate fit'
return bestfit_hdu
def _scan_hdu(self, scan_results, minimizer):
"""Create HDU with the scan info
Parameters
----------
scan_results : list, optional
List of scan results, by default None
names : list
Parameter names
name_format : string
Format for writing parameter names to a fits file
Returns
-------
astropy.io.fits.hdu.table.BinTableHDU
HDU with the scan data
"""
# Get parameter names
names = np.array(list(scan_results[0].keys()))
# Check if any parameter name is too long
max_length = np.max([len(name) for name in names])
name_format = str(max_length) + 'A'
# Get list of parameters and results
results = []
for res in scan_results:
results.append([res[par] for par in names])
results = np.array(results)
# Create the columns
name_col = fits.Column(name='names', format=name_format, array=names)
columns = [name_col]
comms = []
for col, name in zip(results.T, names):
columns.append(fits.Column(name=name, format='D', array=col))
comms.append('Bestfit grid values for ' + name)
# Create the Table HDU from the columns
scan_hdu = fits.BinTableHDU.from_columns(columns)
scan_hdu.name = 'SCAN'
# Add extra info to the header if we have the analysis object
if self.analysis is not None:
params = self.analysis.grids.keys()
for par in params:
grid = self.analysis.grids[par]
scan_hdu.header[par + '_min'] = grid[0]
scan_hdu.header[par + '_max'] = grid[-1]
scan_hdu.header[par + '_num_bins'] = len(grid)
scan_hdu.header.comments[par + '_min'] = 'Grid start for ' + par
scan_hdu.header.comments[par + '_max'] = 'Grid end for ' + par
scan_hdu.header.comments[par + '_num_bins'] = 'Grid size for ' + par
scan_hdu.header.comments['TTYPE1'] = 'Names of sampled parameters'
for i, comm in enumerate(comms):
scan_hdu.header.comments['TTYPE' + str(i+2)] = comm
return scan_hdu
def _pk_hdu(self, component, model):
"""Create HDU with Pk data for a component
Parameters
----------
component : string
Name of component
model : vega.Model
Model object for the component
Returns
-------
astropy.io.fits.hdu.table.BinTableHDU
HDU with the Pk data for the component
"""
# Get the Pk components and create the columns
columns = self._get_components(model.pk)
# Create the Table HDU from the columns
pk_hdu = fits.BinTableHDU.from_columns(columns)
pk_hdu.name = 'PK_' + component
return pk_hdu
def _cf_hdu(self, component, model):
"""Create HDU with correlation function data for a component
Parameters
----------
component : string
Name of component
model : vega.Model
Model object for the component
Returns
-------
astropy.io.fits.hdu.table.BinTableHDU
HDU with the Xi data for the component
"""
# Get the Xi components, before and after distortion
columns = self._get_components(model.xi, name_prefix='raw_')
columns += self._get_components(model.xi_distorted,
name_prefix='distorted_')
# Create the Table HDU from the columns
cf_hdu = fits.BinTableHDU.from_columns(columns)
cf_hdu.name = 'Xi_' + component
return cf_hdu
@staticmethod
def _get_components(model_components, name_prefix=''):
"""Get the saved model components and create astropy Columns
Parameters
----------
model_components : dict
Dictionary with saved Xi/Pk data
name_prefix : str, optional
Prefix for column names, by default ''
Returns
-------
list
List of astropy Columns for HDU creation
"""
columns = []
# Parts are smooth and/or peak
for part, data in model_components.items():
if not data:
continue
shape = np.shape(data['core'])
if len(shape) == 1:
form = 'D'
else:
size = shape[1]
form = str(size) + 'D'
for key, item in data.items():
if key == 'core':
name = name_prefix + part + '_core'
columns.append(fits.Column(name=name, format=form,
array=item))
else:
name = name_prefix + part + '_' + key[0] + '_' + key[1]
columns.append(fits.Column(name=name, format=form,
array=item))
return columns
[docs] def write_results_hdf(self, minimizer, scan_results=None):
"""Write Vega analysis results, including the bestfit
and chi2 scan results if they exist.
Parameters
----------
minimizer : Minimizer
Minimizer object after minimization was done
scan_results : list, optional
List of scan results, by default None
"""
if minimizer is None:
raise ValueError("The hdf output format is outdated and"
" does not work without minimization")
h5_file = h5py.File(Path(self.outfile), 'w')
# Write bestfit
bf_group = h5_file.create_group("best fit")
bf_group = self._write_bestfit_hdf(bf_group, minimizer)
# Write scan results
if scan_results is not None:
scan_group = h5_file.create_group("chi2 scan")
scan_group = self._write_scan_hdf(scan_group, scan_results)
h5_file.close()
@staticmethod
def _write_bestfit_hdf(bf_group, minimizer):
# Write the parameters values
for param, value in minimizer.values.items():
error = minimizer.errors[param]
bf_group.attrs[param] = (value, error)
# Write the covariance
for (par1, par2), cov in minimizer.covariance.items():
bf_group.attrs["cov[{}, {}]".format(par1, par2)] = cov
# Write down all attributes of the minimum
for item, value in minimizer.fmin.items():
bf_group.attrs[item] = value
return bf_group
@staticmethod
def _write_scan_hdf(scan_group, scan_results):
# Get list of parameters and results
params = list(scan_results[0].keys())
results = []
for res in scan_results:
results.append([res[par] for par in params])
results = np.array(results)
# Write parameter indeces
for i, par in enumerate(params):
scan_group.attrs[par] = i
# Write results
values = scan_group.create_dataset("values", np.shape(results),
dtype="f")
values[...] = results
return scan_group