"""
Spectrum utilities
----
.. include license and copyright
.. include:: ../include/copy.rst
----
.. include common links, assuming primary doc root is up one directory
.. include:: ../include/links.rst
"""
import os
import warnings
from IPython import embed
import numpy
from scipy import interpolate
from astropy.io import fits
from astropy.wcs import WCS
import astropy.constants
import astropy.units
from matplotlib import pyplot
from pydl.goddard.astro import airtovac
from ..util.lineprofiles import IntegratedGaussianLSF
from .sampling import Resample
from .resolution import match_spectral_resolution
[docs]def spectral_coordinate_step(wave, log=False, base=10.0):
"""
Return the sampling step for the input wavelength vector.
If the sampling is logarithmic, return the change in the logarithm
of the wavelength; otherwise, return the linear step in angstroms.
Args:
wave (array-like):
Wavelength coordinates of each spectral channel in
angstroms.
log (:obj:`bool`, optional):
Input spectrum has been sampled geometrically.
base (:obj:`float`, optional):
If sampled geometrically, the sampling is done using a
logarithm with this base. For natural logarithm, use
numpy.exp(1).
Returns:
:obj:`float`: Spectral sampling step in either angstroms
(`log=False`) or the step in log(angstroms).
Raises:
ValueError:
Raised if the wavelength vector is not uniformly (either
linearly or log-linearly) sampled to numerical accuracy.
"""
dw = numpy.diff(numpy.log(wave))/numpy.log(base) if log else numpy.diff(wave)
if numpy.any(numpy.absolute(numpy.diff(dw)) > 1e-11): #00*numpy.finfo(dw.dtype).eps):
raise ValueError('Wavelength vector is not uniformly sampled to numerical accuracy.')
return numpy.mean(dw)
[docs]def angstroms_per_pixel(wave, log=False, base=10.0, regular=True):
"""
Return a vector with the angstroms per pixel at each channel.
When `regular=True`, the function assumes that the wavelengths are
either sampled linearly or geometrically. Otherwise, it calculates
the size of each pixel as the difference between the wavelength
coordinates. The first and last pixels are assumed to have a width
as determined by assuming the coordinate is at its center.
.. note::
If the regular is False and log is True, the code does *not*
assume the wavelength coordinates are at the geometric center of
the pixel.
Args:
wave (`numpy.ndarray`_):
(Geometric) centers of the spectrum pixels in angstroms.
log (`numpy.ndarray`_, optional):
The vector is geometrically sampled.
base (:obj:`float`, optional):
Base of the logarithm used in the geometric sampling.
regular (:obj:`bool`, optional):
The vector is regularly sampled.
Returns:
numpy.ndarray: The angstroms per pixel.
"""
if regular:
ang_per_pix = spectral_coordinate_step(wave, log=log, base=base)
return ang_per_pix*wave*numpy.log(base) if log else numpy.repeat(ang_per_pix, len(wave))
return numpy.diff([(3*wave[0]-wave[1])/2] + ((wave[1:] + wave[:-1])/2).tolist()
+ [(3*wave[-1]-wave[-2])/2])
[docs]def convert_flux_density(wave, flux, error=None, density='ang'):
r"""
Convert a spectrum with flux per unit wavelength to per unit
frequency or vice versa.
For converting from per unit wavelength, this function returns
.. math::
F_{\nu} = F_{\lambda} \frac{d\lambda}{d\nu} = F_{\lambda}
\frac{\lambda^2}{c}.
The spectrum independent variable (`wave`) is always expected to
be the wavelength in angstroms. The input/output units always
expect :math:`F_{\lambda}` in :math:`10^{-17}\ {\rm erg\ s}^{-1}\
{\rm cm}^{-2}\ {\rm A}^{-1}` and :math:`F_{\nu}` in microjanskys
(:math:`10^{-29} {\rm erg\ s}^{-1}\ {\rm cm}^{-2}\ {\rm
Hz}^{-1}`). Beyond this, the function is ignorant of the
input/output units. For example, if you provide the function with
an input spectrum with :math:`F_{\lambda}` in :math:`10^{-11}\
{\rm erg\ s}^{-1}\ {\rm cm}^{-2}\ {\rm A}^{-1}`, the output will
be :math:`F_{\nu}` in Janskys.
Args:
wave (:obj:`float`, array-like):
The vector with the wavelengths in angstroms.
flux (:obj:`float`, array-like):
The vector with the flux density; cf. `density`.
error (:obj:`float`, array-like, optional):
The error in the flux measurements. If None, no errors
are returned.
density (:obj:`str`, optional):
The density unit of the *input* spectrum. Must be either
'ang' or 'Hz'. If the input spectrum is :math:`F_{\lambda}`
(`density='ang'`), the returned spectrum is :math:`F_{\nu}`.
Returns:
:obj:`float`, `numpy.ndarray`_, :obj:`tuple`: The flux with
the converted units. If the spectrum errors are not provided,
only the flux value or array is returned.
Raises:
ValueError:
Raised if the `wave` and `flux` arguments do not have the
same shape.
"""
# Set to be at least vectors
_wave = numpy.atleast_1d(wave)
_flux = numpy.atleast_1d(flux)
if _wave.shape != _flux.shape:
raise ValueError('Wavelength and flux arrays must have the same shape.')
if error is not None:
_error = numpy.atleast_1d(error)
if _error.shape != _flux.shape:
raise ValueError('Error and flux arrays must have the same shape.')
if density == 'ang':
# Convert Flambda to Fnu
factor = numpy.square(_wave)*1e12/astropy.constants.c.to('angstrom/s').value
fnu = _flux*factor
if error is not None:
fnu_err = _error*factor
return (fnu[0], fnu_err[0]) if isinstance(flux, float) else (fnu, fnu_err)
return fnu[0] if isinstance(flux, float) else fnu
if density == 'Hz':
# Convert Fnu to Flambda
factor = astropy.constants.c.to('angstrom/s').value/numpy.square(_wave)/1e12
flambda = _flux*factor
if error is not None:
flambda_err = _error*factor
return (flambda[0], flambda_err[0]) if isinstance(flux, float) \
else (flambda, flambda_err)
return flambda[0] if isinstance(flux, float) else flambda
raise ValueError('Density units must be either \'ang\' or \'Hz\'.')
[docs]def convert_flux_units(wave, flux, current_units, error=None):
r"""
Convert flux density units to ``'1e-17 erg / (cm2 s angstrom)'``.
The flux density can be per unit frequency (:math:`F_{\nu}`) or
per unit wavelength (:math:`F_{\lambda}`). If a basic unit
conversion from the current units to ``'1e-17 erg / (cm2 s
angstrom)'`` fails, it's assumed that the user provided
:math:`F_{\nu}`. If the conversion from the current units to
``'1e-29 erg / (cm2 Hz s)'`` also fails, the method faults.
Args:
wave (`numpy.ndarray`_):
Wavelength in angstroms.
flux (`numpy.ndarray`_):
Flux density measurements.
current_units (:obj:`str`):
Input units of the flux density. Must be interpretable by
`astropy.units.Unit`_.
error (`numpy.ndarray`_):
Error vector. Ignored if None.
Returns:
`numpy.ndarray`_, :obj:`tuple`: The converted flux vector. If
an error vector is provided, it is also returned as the
second element.
"""
try:
# Get unit-conversion factor needed to convert to 1e-17
# erg/s/cm^2/angstrom. First assume the fluxes are in
# F_lambda
conversion = astropy.units.Unit(current_units).to('1e-17 erg / (cm2 s angstrom)')
except astropy.units.core.UnitConversionError as e:
# Assume this unit conversion failure is because the flux is
# F_nu, not F_lambda.
try:
# Convert F_nu units to those expected by convert_flux_density
flux *= astropy.units.Unit(current_units).to('1e-29 erg / (cm2 Hz s)')
except:
# If that failed, we're really hosed.
raise ValueError('Unable to handle flux units') from e
return convert_flux_density(wave, flux, error=error, density='Hz')
return flux*conversion if error is None else (flux*conversion, error*conversion)
[docs]class Spectrum:
r"""
Define a spectrum.
Units are expected to be wavelength in angstroms and flux in 1e-17
erg/s/cm^2/angstrom.
If wavelengths are provided in air (instantiated with
``airwave=True``), wavelengths are converted to vacuum and the
sampling is set to be irregular.
.. todo::
- allow for different units?
Args:
wave (array-like):
1D wavelength data in angstroms. Can be sampled
irregularly.
flux (array-like):
1D flux data in 1e-17 erg/s/cm^2/angstrom. Use
:func:`convert_flux_units`, if necessary.
error (array-like, optional):
1-sigma error in the flux data
mask (array-like, optional):
Boolean mask for flux data (True=bad)
resolution (:obj:`float`, array-like, optional):
1D spectral resolution (:math:`$R=\lambda/\Delta\lambda$`)
regular (:obj:`bool`, optional):
Spectrum is regularly sampled, either linearly or log-linearly
log (:obj:`bool`, optional):
Spectrum is sampled in steps of log base 10. If regular
is False, this is ignored.
airwave (:obj:`bool`, optional):
Spectrum wavelengths are provided in air. If True, the
wavelengths will be converted to vacuum, and the
wavelength step will be set to be irregular (if it isn't
already).
check (:obj:`bool`, optional):
Check sampling to confirm input.
use_sampling_assessments (:obj:`bool`, optional):
Override any values provided for ``regular`` and ``log``
and assess the spectral sampling directly from the
provided wavelength array.
"""
def __init__(self, wave, flux, error=None, mask=None, resolution=None, regular=True,
log=False, airwave=False, check=True, use_sampling_assessments=False):
# Check the input
_wave = numpy.atleast_1d(wave)
if airwave:
_wave = airtovac(_wave)
_flux = numpy.atleast_1d(flux)
if _wave.ndim != 1:
raise ValueError('Spectrum can only accommodate single vectors for now.')
if _wave.shape != _flux.shape:
raise ValueError('Wavelength and flux vectors do not match.')
# Sampling
if use_sampling_assessments:
self.regular, self.log = Spectrum.assess_sampling(_wave)
else:
self.regular = False if airwave else regular
self.log = log
if check:
found_regular, found_log = Spectrum.assess_sampling(_wave)
if self.regular != found_regular:
raise ValueError('Expected {0} sampling, found {1} sampling.'.format(
'regular' if self.regular else 'irregular',
'regular' if found_regular else 'irregular'))
if self.log != found_log:
raise ValueError('Geometric sampling {0} found, but {1} expected.'.format(
'was' if found_log else 'was not',
'was' if self.log else 'was not'))
self.sres = None if resolution is None else numpy.atleast_1d(resolution)
if self.sres is not None:
# Allow resolution to be a single value
if self.sres.size == 1:
self.sres = numpy.repeat(self.sres, _flux.size)
if self.sres.shape != _flux.shape:
raise ValueError('Resolution vector must be a single number or a vector that '
'matches the length of flux vector.')
self.error = None if error is None else numpy.atleast_1d(error)
if self.error is not None and self.error.shape != _flux.shape:
raise ValueError('Error vector must match length of flux vector.')
self.mask = None if mask is None else numpy.atleast_1d(mask)
if self.mask is not None and self.mask.shape != _flux.shape:
raise ValueError('Mask vector must match length of flux vector.')
# TODO: Need interpolators for the error and mask?
self.interpolator = interpolate.interp1d(_wave, _flux, assume_sorted=True)
self.size = _wave.size
# TODO: Check log against the input wavelength vector
self.nu = self._frequency()
self.fnu = None
# Do this brute force Vega magnitude calculations for the time
# being
self._vega = None
[docs] @staticmethod
def assess_sampling(wave):
"""
Assess the wavelength sampling directly from the wavelength vector.
Args:
wave (array-like):
Array with the wavelengths.
Returns:
:obj:`tuple`: Two booleans are returned. The first
indicates if the wavelength vector is regularly sampled
(either linearly or log-linearly), the second indicates
if the sampling is log-linear.
"""
# Use absolute to allow for a monotonically decreasing wavelength vector
ddw = numpy.absolute(numpy.diff(numpy.diff(wave)))
regular = numpy.all(ddw < 1e-11) #100*numpy.finfo(ddw.dtype).eps)
if regular:
return regular, False
ddw = numpy.absolute(numpy.diff(numpy.diff(numpy.log(wave))))
regular = numpy.all(ddw < 1e-11) #100*numpy.finfo(ddw.dtype).eps)
if regular:
return regular, True
return False, False
@property
def wave(self):
"""
The wavelength data vector.
"""
return self.interpolator.x
@property
def flux(self):
"""
The flux data vector.
"""
return self.interpolator.y
[docs] def copy(self):
"""
Return a deep copy of the spectrum.
"""
return Spectrum(self.wave.copy(), self.flux.copy(),
error=None if self.error is None else self.error.copy(),
mask=None if self.mask is None else self.mask.copy(),
resolution=None if self.sres is None else self.sres.copy(),
regular=self.regular, log=self.log, check=False)
[docs] def _arith(self, other, func):
if isinstance(other, Spectrum):
return func(other)
_other = numpy.atleast_1d(other)
if _other.size == 1:
_other = numpy.repeat(_other, self.size)
if _other.size != self.size:
raise ValueError('Vector to add has incorrect length.')
return func(Spectrum(self.interpolator.x, _other))
def __add__(self, rhs):
return self._arith(rhs, self._add_spectrum)
def __radd__(self, lhs):
return self + lhs
def __sub__(self, rhs):
return self + rhs*-1
def __rsub__(self, lhs):
return self*-1 + lhs
def __mul__(self, rhs):
return self._arith(rhs, self._mul_spectrum)
def __rmul__(self, lhs):
return self * lhs
def __truediv__(self, rhs):
if isinstance(rhs, Spectrum):
return self * rhs.inverse()
# NOTE: Parentheses are important below because they force the
# call to __mul__ for the correct quantity and avoid an
# infinite loop!
return self * (1/float(rhs))
def __rtruediv__(self, lhs):
return self.inverse() * lhs
[docs] def inverse(self):
error = None if self.error is None else self.error/numpy.square(self.interpolator.y)
return Spectrum(self.wave, 1./self.interpolator.y, error=error,
mask=None if self.mask is None else self.mask.copy(), resolution=self.sres,
regular=self.regular, log=self.log)
[docs] def _mul_spectrum(self, rhs):
"""
rhs must have type Spectrum
"""
if not numpy.array_equal(rhs.wave, self.wave):
raise NotImplementedError('To perform arithmetic on spectra, their wavelength arrays '
'must be identical.')
flux = self.flux * rhs.flux
if self.error is None and rhs.error is None:
error = None
else:
error = numpy.zeros(self.size, dtype=float)
if self.error is not None:
error += numpy.square(self.error/self.flux)
if rhs.error is not None:
error += numpy.square(rhs.error/rhs.flux)
error = numpy.sqrt(error) * numpy.absolute(flux)
if self.mask is None and rhs.mask is None:
mask = None
else:
mask = numpy.zeros(self.size, dtype=bool)
if self.mask is not None:
mask |= self.mask
if rhs.mask is not None:
mask |= rhs.mask
if self.sres is not None and rhs.sres is not None \
and not numpy.array_equal(self.sres, rhs.sres):
warnings.warn('Spectral resolution is not correctly propagated.')
sres = self.sres if self.sres is not None else rhs.sres
return Spectrum(self.wave, flux, error=error, mask=mask, resolution=sres, regular=regular,
log=self.log)
[docs] def _add_spectrum(self, rhs):
"""
rhs must have type Spectrum
"""
if not numpy.array_equal(rhs.wave, self.wave):
raise NotImplementedError('To perform arithmetic on spectra, their wavelength arrays '
'must be identical.')
flux = self.flux + rhs.flux
if self.error is None and rhs.error is None:
error = None
else:
error = numpy.zeros(self.size, dtype=float)
if self.error is not None:
error += numpy.square(self.error)
if rhs.error is not None:
error += numpy.square(rhs.error)
error = numpy.sqrt(error)
if self.mask is None and rhs.mask is None:
mask = None
else:
mask = numpy.zeros(self.size, dtype=bool)
if self.mask is not None:
mask |= self.mask
if rhs.mask is not None:
mask |= rhs.mask
if self.sres is not None and rhs.sres is not None \
and not numpy.array_equal(self.sres, rhs.sres):
warnings.warn('Spectral resolution is not correctly propagated.')
sres = self.sres if self.sres is not None else rhs.sres
return Spectrum(self.wave, flux, error=error, mask=mask, resolution=sres,
regular=self.regular, log=self.log)
def __len__(self):
return self.interpolator.x.size
def __getitem__(self, s):
"""
Access the flux data directly via slicing.
"""
return self.interpolator.y[s]
[docs] def _frequency(self):
"""Calculate the frequency in terahertz."""
return 10*astropy.constants.c.to('km/s').value/self.wave
[docs] def interp(self, w):
"""
Linearly interpolate the flux at a provided wavelength.
Args:
w (:obj:`float`, `numpy.ndarray`_):
Wavelength in angstroms. Can be a single wavelength
or a wavelength array.
"""
if not isinstance(w, numpy.ndarray) and w > self.interpolator.x[0] \
and w < self.interpolator.x[-1]:
return self.interpolator(w)
indx = (w > self.interpolator.x[0]) & (w < self.interpolator.x[-1])
sampled = numpy.zeros_like(w, dtype=float)
sampled[indx] = self.interpolator(w[indx])
return sampled
[docs] @staticmethod
def wcs_wavelength_vector(wcs, nwave, naxis=1, axis=0):
"""
Generate the wavelength vector using the provided
`astropy.wcs.WCS`_ object.
Returns:
`numpy.ndarray`_: Vector with wavelengths defined by
:attr:`wcs`.
"""
coo = numpy.ones((nwave,naxis), dtype=float)
coo[...,axis] = numpy.arange(nwave)+1
wave = wcs.all_pix2world(coo, 1)[...,axis]
dimensionless = wcs.wcs.cunit[axis] is astropy.units.dimensionless_unscaled
if dimensionless:
# NOTE: If the axis is dimensionless, assume that the units
# are angstroms!
warnings.warn('No units defined for wavelength axis. Assuming angstroms.')
return wave if dimensionless else wave*wcs.wcs.cunit[axis].to('angstrom')
[docs] @classmethod
def from_fits(cls, ifile, waveext='WAVE', waveunits='angstrom', fluxext='FLUX',
fluxunits=None, errext=None, ivarext=None, maskext=None, resext=None,
tblext=None, **kwargs):
"""
Construct a spectrum using data from a fits file.
Fluxes must be in units of flux density (i.e., flux per unit
angstrom or Hz).
.. todo::
- Allow for a "frequency" extension instead of
wavelengths?
Args:
ifile (:obj:`str`):
Name of the fits file.
waveext (:obj:`str`, :obj:`int`, optional):
Extension with the wavelength data. If set to 'WCS',
the wavelengths are constructed using the WCS in the
flux extension header. If the data is instead one
column in a binary table extension, see ``tblext``.
waveunits (:obj:`str`, optional):
Units of the wavelengths in the fits file. WARNING:
If you try to build the wavelength array using the
WCS in the header, the wavelength array will be
converted to angstroms based on the WCS units. If the
WCS units are not defined, the values read from the
WCS are not altered, but can be fixed using this
keyword.
fluxext (:obj:`str`, :obj:`int`, optional):
Extension with the flux data. If the data is instead
one column in a binary table extension, see
``tblext``. All fluxes are expected to be in units of
flux density.
fluxunits (:obj:`str`, optional):
Units of the flux density in the fits file. Must be
interpretable by `astropy.units.Unit`_. If None, the
code will check the header of the flux extension (or
the binary table extension; see ``tblext``) for the
``BUNIT`` keyword. If not None, this overrides any
units provided by the fits file. If None and no
``BUNIT`` keyword is found, the units are assumed to
be ``'1e-17 erg / (cm2 s angstrom)'``.
errext (:obj:`str`, :obj:`int`, optional):
Extension with the flux error. If None, no errors
will be read. Keyword is mutually exclusive with
``ivarext``. If the data is instead one column in a
binary table extension, see ``tblext``.
ivarext (:obj:`str`, :obj:`int`, optional):
Extension with the flux inverse variance. If None, no
errors will be read. Otherwise, the data are read and
then converted to 1-sigma errors. Keyword is mutually
exclusive with ``errext``. If the data is instead one
column in a binary table extension, see ``tblext``.
maskext (:obj:`str`, :obj:`int`, optional):
Extension with the mask data. If not None, the data
in this extension must be castable to a boolean
array. If None, all spectral data is assumed to be
unmasked. If the data is instead one column in a
binary table extension, see ``tblext``.
resext (:obj:`str`, :obj:`int`, optional):
Extension with the spectral resolution. If None, the
spectral resolution will be undefined. If the data is
instead one column in a binary table extension, see
``tblext``.
tblext (:obj:`str`, :obj:`int`, optional):
Instead of being organized in a series of ImageHDU
extensions, the spectral data are in a BinTableHDU
with this extension name/index. If provided (not
None), the "extension" names/indices provided by the
other keywords are instead interpreted as the columns
in this binary table.
**kwargs:
Keyword arguments passed directly to the class
instantiation (see :class:`Spectrum`). NOTE: If
``resolution`` is provided as one of the kwargs, it
is given priority over the ``resext`` keyword in this
method.
Returns:
:class:`Spectrum`: Object read from the fits file.
Raises:
ValueError:
Raised if both ``errext`` and ``ivarext`` are set, or
if the wavelength vector is supposed to be built from
a WCS, but the data is organized in a binary table.
"""
# Check the input
if waveext.lower() == 'wcs' and tblext is not None:
raise ValueError('WCS cannot be used to define the wavelength coordinate system '
'using a binary table data model.')
if ivarext is not None and errext is not None:
raise ValueError('An extensions/column can be provided for the inverse variance or '
'the error, not both.')
_resext = resext
if 'resolution' in kwargs and kwargs['resolution'] is not None and resext is not None:
warnings.warn('Resolution and extension provided. Preference given to the former.')
_resext = None
hdu = fits.open(ifile)
# Get the flux
flux = hdu[fluxext].data if tblext is None else hdu[tblext].data[fluxext]
# Check the dimensionality
if flux.ndim != 1:
raise ValueError('Flux data must be one-dimensional.')
# Get the mask. This needs to be instantiated before the error
# vector, in case the mask incorporates ivar-to-error
# conversion issues.
mask = None if maskext is None \
else (hdu[maskext].data.astype(bool) if tblext is None
else hdu[tblext].data[errext].astype(bool))
# Get the error
error = None if errext is None \
else (hdu[errext].data if tblext is None else hdu[tblext].data[errext])
if ivarext is not None:
error = numpy.ma.power(hdu[ivarext].data if tblext is None
else hdu[tblext].data[ivarext], -0.5)
if mask is not None:
mask |= error.mask
error = error.data
# Get the wavelength vector
if waveext.lower() == 'wcs':
wave = Spectrum.wcs_wavelength_vector(WCS(header=hdu[fluxext].header, fix=True),
flux.size)
else:
wave = hdu[waveext].data if tblext is None else hdu[tblext].data[waveext]
# Impose the wavelength units
wave *= astropy.units.Unit(waveunits).to('angstrom')
# Fix the flux units
if fluxunits is None:
if tblext is not None and 'BUNIT' in hdu[tblext].header:
fluxunits = hdu[tblext].header['BUNIT']
elif 'BUNIT' in hdu[fluxext].header:
fluxunits = hdu[fluxext].header['BUNIT']
if fluxunits is not None:
print('Converting flux units from {0} to 1e-17 erg/s/cm2/angstrom'.format(fluxunits))
if error is None:
flux = convert_flux_units(wave, flux, fluxunits)
else:
flux, error = convert_flux_units(wave, flux, fluxunits, error=error)
# Get the spectral resolution
sres = None if _resext is None \
else (hdu[_resext].data if tblext is None else hdu[tblext].data[_resext])
return cls(wave, flux, error=error, mask=mask, **kwargs) if sres is None \
else cls(wave, flux, error=error, mask=mask, resolution=sres, **kwargs)
[docs] @classmethod
def from_ascii(cls, ifile, wavecol=0, waveunits='angstrom', fluxcol=1,
fluxunits='1e-17 erg / (cm2 s angstrom)', errcol=None, ivarcol=None,
maskcol=None, rescol=None, **kwargs):
"""
Construct a spectrum using data from an ascii file.
.. note::
- All the columns are 0-indexed. I.e., the first column
is column 0.
- The core function used to read the data is
`numpy.genfromtxt`_; i.e., anything that can be parsed
by that function (e.g., gzipped files), can be parsed
by this method. Note that all columns are read as
floats. This means that any string columns are
converted to nan, and any mask column must have the
typical cast of 0 for False and anything else for True.
Args:
ifile (:obj:`str`):
Name of the fits file.
wavecol (:obj:`int`, optional):
Column index with the wavelength data.
fluxcol (:obj:`int`, optional):
Column index with the flux data.
fluxunits (:obj:`str`, optional):
Units of the flux density read from the file. Must be
interpretable by `astropy.units.Unit`_. If None,
units are assumed to be ``'1e-17 erg / (cm2 s
angstrom)'``.
errcol (:obj:`int`, optional):
Column index with the flux error. If None, no errors
will be read. Keyword is mutually exclusive with
``ivarcol``.
ivarcol (:obj:`int`, optional):
Column index with the flux inverse variance. If None,
no errors will be read. Otherwise, the data are read
and then converted to 1-sigma errors. Keyword is
mutually exclusive with ``errcol``.
maskcol (:obj:`int`, optional):
Column index with the mask data. If not None, the
data in this extension must be castable to a boolean
array. If None, all spectral data is assumed to be
unmasked.
rescol (:obj:`str`, :obj:`int`, optional):
Column index with the spectral resolution. If None,
the spectral resolution will be undefined.
**kwargs:
Keyword arguments passed directly to the class
instantiation (see :class:`Spectrum`).
Returns:
:class:`Spectrum`: Object read from the ascii file.
Raises:
ValueError:
Raised if both ``errcol`` and ``ivarcol`` are set.
"""
# Check the input
if ivarcol is not None and errcol is not None:
raise ValueError('A column can be provided for the inverse variance or '
'the error, not both.')
_rescol = rescol
if 'resolution' in kwargs and kwargs['resolution'] is not None and rescol is not None:
warnings.warn('Resolution and column provided. Preference given to the former.')
_rescol = None
db = numpy.genfromtxt(ifile)
flux = db[:,fluxcol]
# Check the dimensionality
if flux.ndim != 1:
raise ValueError('Flux data must be one-dimensional.')
wave = db[:,wavecol] * astropy.units.Unit(waveunits).to('angstrom')
mask = None if maskcol is None else db[:,maskcol].astype(bool)
error = None if errcol is None else db[:,errcol]
if ivarcol is not None:
error = numpy.ma.power(db[:,ivarcol], -0.5)
if mask is not None:
mask |= error.mask
error = error.data
if fluxunits is not None:
print('Converting flux units from {0} to 1e-17 erg/s/cm2/angstrom'.format(fluxunits))
if error is None:
flux = convert_flux_units(wave, flux, fluxunits)
else:
flux, error = convert_flux_units(wave, flux, fluxunits, error=error)
sres = None if _rescol is None else db[:,_rescol]
return cls(wave, flux, error=error, mask=mask, **kwargs) if sres is None \
else cls(wave, flux, error=error, mask=mask, resolution=sres, **kwargs)
[docs] def wavelength_step(self):
"""
Return the wavelength step per pixel.
Returns:
`numpy.ndarray`_: Change in angstroms per pixel.
"""
return angstroms_per_pixel(self.wave, log=self.log, regular=self.regular)
[docs] def frequency_step(self):
"""
Return the frequency step per pixel in THz.
"""
return 10*astropy.constants.c.to('km/s').value*self.wavelength_step()/self.wave/self.wave
[docs] def magnitude(self, wavelength=None, band=None, system='AB'):
"""
Calculate the magnitude of the object in the specified band.
If no arguments are provided, the magnitude is calculated for
the entire :attr:`flux` vector. Otherwise, the magnitude is
calculated at a single wavelength (see ``wavelength``) or
over a filter (see ``band``).
Args:
wavelength (:obj:`float`, optional):
The wavelength at which to calculate the magnitude.
band (:class:`~enyo.etc.efficiency.FilterResponse`, optional):
Object with the filter response function
system (:obj:`str`, optional):
Photometric system. Currently must be ``AB`` or
``Vega``.
Returns:
:obj:`float`, `numpy.ndarray`_: The one or more magnitude
measurements, depending on the input.
Raises:
NotImplementedError:
Raised if the photometric system is not known.
"""
if wavelength is not None and band is not None:
warnings.warn('Provided both wavelength and band; wavelength takes precedence.')
# TODO: Check input.
if system == 'AB':
if self.nu is None:
self._frequency()
if self.fnu is None:
# Flux in microJanskys
self.fnu = convert_flux_density(self.wave, self.flux)
if wavelength is not None:
fnu = self.fnu[numpy.argmin(numpy.absolute(self.wave - wavelength))]
elif band is not None:
dnu = self.frequency_step()
fnu = numpy.sum(band(self.wave)*self.fnu*dnu) / numpy.sum(band(self.wave)*dnu)
else:
fnu = self.fnu
# NOTE: This is identically -2.5*numpy.log10(fnu) - 48.6,
# just accounting for the units of fnu
return -2.5*numpy.log10(fnu) + 23.9
if system == 'Vega':
# NOTE:
# - Vega mags are done brute force, not using a
# precomputed set of zero-points.
# - This doesn't account for any resolution differences
# between this spectrum and the Vega spectrum.
if self._vega is None:
# Lazy load the Vega spectrum
self._vega = VegaSpectrum()
if wavelength is not None:
# At one wavelength
vega_flux = self._vega.flux[numpy.argmin(numpy.absolute(self._vega.wave
- wavelength))]
flux = self.flux[numpy.argmin(numpy.absolute(self.wave - wavelength))]
elif band is not None:
# Over a band
vega_flux = numpy.sum(band(self.wave) * self.wave * self._vega.interp(self.wave)
* self.wavelength_step())
flux = numpy.sum(band(self.wave) * self.wave * self.flux * self.wavelength_step())
else:
# Full spectrum
flux = self.flux
vega_flux = self._vega.interp(self.wave)
return -2.5 * numpy.log10(flux / vega_flux) + 0.03
raise NotImplementedError('Photometric system {0} not implemented.'.format(system))
[docs] def rescale(self, factor):
"""
Rescale the spectrum by the provided factor.
The spectral data are modified *in-place*; nothing is
returned.
Args:
factor (scalar-like or array-like):
Factor to multiply the fluxes. If it's a vector, it
must have the same length as the existing spectrum.
"""
self.interpolator.y *= factor
if self.error is not None:
# Adjust the error
self.error *= numpy.absolute(factor)
if self.fnu is not None:
# Adjust the flux density per Hz
self.fnu *= factor
[docs] def rescale_flux(self, wave, flux):
"""
Rescale the spectrum to be specifically the flux value at the
provided wavelength. The input flux should be in units of
1e-17 erg/s/cm^2/angstrom.
Args:
wave (:obj:`float`):
The wavelength at which to base the rescaling.
flux (:obj:`float`):
The target value of the flux at the provided
wavelength.
"""
self.rescale(flux/self.interp(wave))
[docs] def rescale_magnitude(self, new_mag, wavelength=None, band=None, system='AB'):
"""
Rescale the spectrum to an input magnitude.
Must provide either ``wavelength`` or ``band``. The object is
edited in-place.
Args:
new_mag (:obj:`float`):
The target magnitude
wavelength (:obj:`float`, optional):
The wavelength at which to calculate the magnitude.
band (:class:`~enyo.etc.efficiency.FilterResponse`, optional):
Object with the filter response function
system (:obj:`str`, optional):
Photometric system. Currently must be ``AB`` or
``Vega``.
Raises:
ValueError:
Raised if neither ``wavelength`` nor ``band`` are provided.
NotImplementedError:
Raised if the photometric system is not known.
"""
if wavelength is None and band is None:
raise ValueError('Must provide either wavelength or the bandpass filter.')
if system not in ['AB', 'Vega']:
raise NotImplementedError('Photometric system {0} not implemented.'.format(system))
dmag = new_mag - self.magnitude(wavelength=wavelength, band=band, system=system)
self.rescale(numpy.power(10., -dmag/2.5))
[docs] def photon_flux(self, inplace=True):
r"""
Convert the spectrum from 1e-17 erg/s/cm^2/angstrom to
photons/s/cm^2/angstrom.
Args:
inplace (:obj:`bool`, optional):
If ``inplace is True``, the spectrum is modified in
place and None is returned; otherwise, the converted
flux vector is returned.
Returns:
`numpy.ndarray`_: Flux vector in photons/s/cm^2/angstrom
if ``inplace is False``; otherwise, nothing is returned.
"""
ergs_per_photon = astropy.constants.h.to('erg s') * astropy.constants.c.to('angstrom/s') \
/ (self.wave * astropy.units.angstrom)
return self.rescale(1e-17 / ergs_per_photon.value) if inplace \
else self.interpolator.y * 1e-17 / ergs_per_photon.value
[docs] def plot(self, ax=None, show=False, **kwargs):
"""
Construct a plot of the spectrum.
Args:
ax (`matplotlib.axes.Axes`_):
Axes for the plot. If None, a new instance is
generated using ``pyplot.subplot``.
show (:obj:`bool`, optional):
Show the plot instead of just returning the modified
`matplotlib.axes.Axes`_ instance.
**kwargs:
Other keywords (like ``color``) passed directly to
``pyplot.plot``.
Returns:
`matplotlib.axes.Axes`_: Modified or new instance;
returned only if ``show is False``.
"""
_ax = pyplot.subplot() if ax is None else ax
_ax.plot(self.wave, self.flux, **kwargs)
if show:
pyplot.show()
return _ax
[docs] def resample(self, wave=None, dwave=None, log=False, step=True):
"""
Resample the spectrum to a uniform wavelength grid.
Args:
wave (`numpy.ndarray`_, optional):
New wavelength array. Must be linearly or
log-linearly sampled. If None, the new wavelength
grid covers the full extent of the current grid with
a sampling based in the minimum pixel separation
(geometrically or linearly) in the current spectrum.
dwave (:obj:`float`, optional):
Step for each pixel. If ``log`` is True, this must be
the geometric step; i.e., the difference in the
base-10 log of the wavelength of adjacent pixels.
log (:obj:`bool`, optional):
Flag that the wavelength array is log-linearly
sampled.
step (:obj:`bool`, optional):
Treat the input function as a step function during
the resampling integration. If False, use a linear
interpolation between pixel samples.
Returns:
:class:`Spectrum`: Returns a resampled version of itself.
"""
if wave is None:
wave = self.wave
rng = numpy.log10(wave[[0,-1]]) if log else wave[[0,-1]]
if dwave is None:
dwave = numpy.amin(numpy.diff(numpy.log10(wave))) \
if log else numpy.amin(numpy.diff(wave))
npix = int(numpy.diff(rng)/dwave)+1
r = Resample(self.flux, e=self.error, mask=self.mask, x=self.wave,
inLog=self.regular and self.log, newRange=wave[[0,-1]], newpix=npix,
newLog=log, step=step)
sres = None if self.sres is None \
else interpolate.interp1d(self.wave, self.sres, assume_sorted=True,
bounds_error=False,
fill_value=(self.sres[0],self.sres[-1]))(r.outx)
return Spectrum(r.outx, r.outy, error=r.oute, mask=r.outf < 0.8, resolution=sres,
regular=True, log=log)
[docs] def match_resolution(self, resolution, wave=None):
r"""
Match the spectral resolution of the spectrum to the provided
value.
The object must be a regularly sampled, either linearly or
log-linearly (see :attr:`log`). If it isn't, use
:func:`resample`.
Args:
resolution (:obj:`float`, array-like):
The single value for the spectral resolution
(:math:`R=\lambda/\Delta\lambda`) or a vector for a
wavelength-dependent spectral resolution. If a
vector, the length of the vector must exactly match
the wavelength vector of the object or the wavelength
vector must also be provided (see ``wave``).
wave (`numpy.ndarray`_, optional):
Wavelength vector for the provided spectral
resolution. If None, ``resolution`` must either be a
single number or a vector with the same length as
:attr:`wave`.
Returns:
:class:`Spectrum`: Returns an object with the spectral
resolution matched (as best as can be done). If
necessary, the spectrum is masked where the spectral
resolution could not be matched in detail.
"""
if not self.regular:
raise ValueError('The spectrum must be regularly binned; try running resample first.')
if wave is None:
wave = self.wave
new_sres = numpy.atleast_1d(resolution)
if new_sres.size == 1:
new_sres = numpy.full(self.wave.size, resolution, dtype=float)
if new_sres.size != wave.size:
raise ValueError('Spectral resolution does not match length of wavelength vector.')
ivar = None if self.error is None else numpy.ma.power(self.error, -2).data
new_flux, new_sres, _, new_mask, new_ivar \
= match_spectral_resolution(self.wave, self.flux, self.sres, wave, new_sres,
ivar=ivar, mask=self.mask, log10=self.log,
new_log10=self.log)
new_mask |= numpy.invert(new_flux > 0)
if new_ivar is None:
new_error = None
else:
new_error = numpy.ma.power(new_ivar, -0.5)
new_mask |= new_error.mask
new_error = new_error.data
return Spectrum(self.wave, new_flux, error=new_error, mask=new_mask, resolution=new_sres,
use_sampling_assessments=True)
[docs] def redshift(self, z):
"""
Redshift the spectrum.
Spectrum is in 1e-17 erg/s/cm^2/angstrom, so this shifts the
wavelength vector by 1+z and rescales the flux by 1+z to keep
the flux per *observed* wavelength constant; this is not the
same as surface-brightness dimming, **which is not accounted
for**. S/N is kept fixed.
Args:
z (:obj:`float`):
Target redshift
"""
self.interpolator.x *= (1+z)
self.rescale(1/(1+z))
[docs] def write(self, ofile, overwrite=False):
"""
Write the spectrum to a fits file.
"""
if os.path.isfile(ofile) and not overwrite:
raise FileExistsError('File exists. Use a different filename or set overwrite=True.')
hdu = fits.HDUList([fits.PrimaryHDU(),
fits.ImageHDU(name='WAVE', data=self.wave),
fits.ImageHDU(name='FLUX', data=self.flux)])
if self.error is not None:
hdu += [fits.ImageHDU(name='ERROR', data=self.error)]
if self.mask is not None:
hdu += [fits.ImageHDU(name='MASK', data=self.mask.astype(int))]
if self.sres is not None:
hdu += [fits.ImageHDU(name='SPECRES', data=self.sres)]
hdu.writeto(ofile, overwrite=True)
[docs]class EmissionLineSpectrum(Spectrum):
r"""
Define an emission-line spectrum.
.. todo::
- use MaNGA DAP functions
Flux units are 1e-17 erg/s/cm^2/angstrom.
Args:
wave (array-like):
1D wavelength data in angstroms. Expected to be sampled
linearly or geometrically. These are the *observed*
wavelengths.
flux (array-like):
The total fluxes of one or more emission lines in 1e-17 erg/s/cm^2.
restwave (array-like):
The central rest wavelengths of one or emission lines in angstroms.
fwhm (array-like):
The FWHM of the Gaussian line profiles. If the resolution
vector is provided, these are assumed to be the *intrinsic*
widths, such that the line included in the spectrum has an
observed width determined by the quadrature sum of the
intrinsic and instrumental widths. If the resolution vector
is not provided, the line simply has the provided FWHM.
units (:obj:`str`, array-like, optional):
The units of the provided FWHM data. Must be either
'km/s' or 'ang'. Can be a single string or an array with
the units for each provided value.
redshift (scalar-like, optional):
If provided, the emission-line wavelengths are redshifted.
continuum (array-like, optional):
If provided, this is the 1D continuum placed below the line,
which must have the same length as the input wavelength
vector. The continuum is 0 if not provided.
resolution (array-like, optional):
1D spectral resolution
(:math:`$R=\lambda/\Delta\lambda$`). If None, the width
of the lines is set by ``fwhm`` only.
log (:obj:`bool`, optional):
Spectrum is sampled in steps of log base 10.
"""
def __init__(self, wave, flux, restwave, fwhm, units='ang', redshift=0.0, continuum=None,
resolution=None, log=False):
_flux = numpy.atleast_1d(flux).ravel()
nlines = _flux.size
_restwave = numpy.atleast_1d(restwave).ravel()
if _restwave.size != nlines:
raise ValueError('Number of rest wavelengths does not match the number of fluxes.')
_fwhm = numpy.atleast_1d(fwhm).ravel()
if _fwhm.size != nlines:
raise ValueError('Number of FWHM values does not match the number of fluxes.')
_units = numpy.atleast_1d(units).ravel()
# Check the input
if not numpy.all(numpy.isin(_units, ['km/s', 'ang'])):
raise ValueError('FWHM units must be \'km/s\' or \'ang\'.')
if _units.size == 1:
_units = numpy.repeat(_units, _flux.size)
if _units.size != nlines:
raise ValueError('Number of unit values does not match the number of fluxes.')
if resolution is not None and hasattr(resolution, '__len__') \
and len(wave) != len(resolution):
raise ValueError('Resolution vector must match length of wavelength vector.')
if continuum is not None and len(wave) != len(continuum):
raise ValueError('Continuum vector must match length of wavelength vector.')
# Set the line parameters
_linewave = _restwave*(1+redshift)
indx = (_linewave > wave[0]) & (_linewave < wave[-1])
if not numpy.any(indx):
raise ValueError('Redshifted lines are all outside of the provided wavelength range.')
sig2fwhm = numpy.sqrt(8.0 * numpy.log(2.0))
sigma = _fwhm/sig2fwhm
# Convert the FWHM as needed based on the sampling and units
in_ang = _units == 'ang'
in_kms = _units == 'km/s'
if log and numpy.any(in_ang):
# Convert to km/s
sigma[in_ang] = astropy.constants.c.to('km/s').value*sigma[in_ang]/_linewave[in_ang]
elif not log and numpy.any(in_kms):
# Convert to angstroms
sigma[in_kms] = _linewave[in_kms]*sigma[in_kms]/astropy.constants.c.to('km/s').value
# Add the instrumental resolution in quadrature to the
# intrinsic width, if the resolution is provided
if resolution is None:
_resolution = None
else:
_resolution = resolution if hasattr(resolution, '__len__') \
else numpy.full_like(wave, resolution, dtype=float)
sigma_inst = astropy.constants.c.to('km/s').value/_resolution/sig2fwhm if log else \
wave/_resolution/sig2fwhm
interp = interpolate.interp1d(wave, sigma_inst, assume_sorted=True, bounds_error=False,
fill_value=0.)
sigma = numpy.sqrt(numpy.square(sigma) + numpy.square(interp(_linewave)))
# Convert parameters to pixel units
_dw = spectral_coordinate_step(wave, log=log)
_linepix = (numpy.log10(_linewave) - numpy.log10(wave[0]) \
if log else _linewave - wave[0])/_dw
# Flux to pixel units so that the spectrum has units of flux
# density (flux per angstrom); less accurate when spectrum is
# logarithmically binned
dl = _linewave*(numpy.power(10.,_dw/2)-numpy.power(10.,-_dw/2)) if log else _dw
_flux /= dl
# Convert sigma to pixels
sigma /= (astropy.constants.c.to('km/s').value*_dw*numpy.log(10.) if log else dl)
# Construct the emission-line spectrum
pix = numpy.arange(wave.size)
spectrum = numpy.zeros(wave.size, dtype=float) if continuum is None else continuum.copy()
profile = IntegratedGaussianLSF()
for i in range(nlines):
if not indx[i]:
continue
p = profile.parameters_from_moments(_flux[i], _linepix[i], sigma[i])
spectrum += profile(pix, p)
# Instantiate
super(EmissionLineSpectrum, self).__init__(wave, spectrum, resolution=_resolution, log=log)
# 8329-6104
[docs]class BlueGalaxySpectrum(Spectrum):
"""
An example blue galaxy spectrum pulled from the MaNGA survey.
"""
def __init__(self, redshift=0.0):
fitsfile = os.path.join(os.environ['ENYO_DIR'], 'data/galaxy/blue_galaxy_8329-6104.fits')
hdu = fits.open(fitsfile)
wave = hdu['WAVE'].data * (1+redshift)
flux = hdu['FLUX'].data
super(BlueGalaxySpectrum, self).__init__(wave, flux, log=True)
[docs] @classmethod
def from_file(cls):
raise NotImplementedError('Spectrum for blue galaxy is fixed.')
# 8131-6102
[docs]class RedGalaxySpectrum(Spectrum):
"""
An example red galaxy spectrum pulled from the MaNGA survey.
"""
def __init__(self, redshift=0.0):
fitsfile = os.path.join(os.environ['ENYO_DIR'], 'data/galaxy/red_galaxy_8131-6102.fits')
hdu = fits.open(fitsfile)
wave = hdu['WAVE'].data * (1+redshift)
flux = hdu['FLUX'].data
super(RedGalaxySpectrum, self).__init__(wave, flux, log=True)
[docs] @classmethod
def from_file(cls):
raise NotImplementedError('Spectrum for red galaxy is fixed.')
#class MaunakeaSkySpectrumOLD(Spectrum):
# def __init__(self):
# fitsfile = os.path.join(os.environ['ENYO_DIR'], 'data/sky/manga/apo2maunakeasky.fits')
# hdu = fits.open(fitsfile)
# wave = hdu['WAVE'].data
# flux = hdu['FLUX'].data
# super(MaunakeaSkySpectrumOLD, self).__init__(wave, flux, log=True)
#
# @classmethod
# def from_file(cls):
# raise NotImplementedError('Maunakea sky spectrum is fixed.')
[docs]class MaunakeaSkySpectrum(Spectrum):
"""
The default, empirical dark night-sky spectrum at Maunakea
provided by Chuck Steidel.
"""
def __init__(self):
fitsfile = os.path.join(os.environ['ENYO_DIR'], 'data/sky/lris_esi_skyspec_fnu.fits')
init = Spectrum.from_fits(fitsfile, waveext='WCS', fluxext=0, airwave=True,
use_sampling_assessments=True)
init.regular = False
init.log = False
wave = numpy.arange(3100., 10500., 2.)
init = init.resample(wave=wave, dwave=2., log=False)
wave = init.wave
flux = init.flux
indx = numpy.invert(flux > 0) # & (wave < 3210)
flux[indx] = numpy.median(flux[(wave > 3210) & (wave < 4000)])
super(MaunakeaSkySpectrum, self).__init__(wave, flux)
[docs] @classmethod
def from_file(cls):
raise NotImplementedError('Maunakea sky spectrum is fixed.')
[docs]class ABReferenceSpectrum(Spectrum):
"""
Construct a spectrum with a constant flux of 3631 Jy.
Inherits from :class:`Spectrum`, which we take to mean that the flux
is always in units of 1e-17 erg/s/cm^2/angstrom.
Args:
wave (`numpy.ndarray`_):
Wavelength vector for the spectrum.
resolution (:obj:`float`, array-like, optional):
1D spectral resolution (:math:`$R=\lambda/\Delta\lambda$`)
regular (:obj:`bool`, optional):
Spectrum is regularly sampled, either linearly or log-linearly
log (:obj:`bool`, optional):
Spectrum is sampled in steps of log base 10. If regular
is False, this is ignored.
"""
def __init__(self, wave, resolution=None, regular=True, log=False):
norm = numpy.power(10., 29 - 48.6/2.5) # Reference flux in microJanskys
fnu = numpy.full_like(wave, norm, dtype=float)
flambda = convert_flux_density(wave, fnu, density='Hz')
super(ABReferenceSpectrum, self).__init__(wave, flambda, resolution=resolution, log=log,
regular=regular)
[docs]class VegaSpectrum(Spectrum):
"""
Return the spectrum of Vega constructed using data from:
https://ssb.stsci.edu/cdbs/calspec/alpha_lyr_stis_009.fits
Downloaded on 3 Apr 2020. Wavelengths are in vacuum.
Args:
waverange (array-like, optional):
Used to limit the wavelength range of the returned
spectrum. The maximum wavelength range of the current
spectrum goes from 900 Angstrom to 300 microns.
"""
def __init__(self, waverange=None):
if waverange is not None and numpy.asarray(waverange).size != 2:
raise ValueError('Wavelength range must be a two-element list, tuple, etc.')
fitsfile = os.path.join(os.environ['ENYO_DIR'], 'data/spectra/alpha_lyr_stis_009.fits')
hdu = fits.open(fitsfile)
indx = numpy.ones(hdu[1].data['WAVELENGTH'].size, dtype=bool) if waverange is None \
else (hdu[1].data['WAVELENGTH'] > waverange[0]) \
& (hdu[1].data['WAVELENGTH'] < waverange[1])
resolution=hdu[1].data['WAVELENGTH']/hdu[1].data['FWHM']
super(VegaSpectrum, self).__init__(hdu[1].data['WAVELENGTH'][indx],
hdu[1].data['FLUX'][indx]*1e17,
resolution=resolution[indx], regular=False)