import os
import json
import warnings
import numpy as np
from celerite2 import terms as celerite2_terms
import astropy.units as u
from astropy.units import cds # noqa
from astropy.utils.exceptions import AstropyUserWarning
import tynt
from . import scale
from .psd import plot_power_spectrum
from .sun import _p_mode_fit_to_sho_hyperparams
__all__ = [
'Hyperparameters',
'StellarOscillatorKernel',
'SolarOscillatorKernel',
'ShotNoiseKernel',
'Filter'
]
dirname = os.path.dirname(os.path.abspath(__file__))
default_hyperparameter_path = os.path.join(
dirname, 'data', 'hyperparameters.json'
)
def _sho_psd(omega, S0, w0, Q):
"""
Stochastically driven, damped harmonic oscillator.
"""
# This is the celerite2 SHO PSD:
return (
np.sqrt(2/np.pi) * S0 * w0**4 /
((omega**2 - w0**2)**2 + (omega**2 * w0**2 / Q**2))
)
[docs]
class Hyperparameters(list):
"""
Gaussian process hyperparameters for approximating
the total stellar irradiance power spectrum.
"""
def __init__(self, hyperparameters, name=None, magnitude=None):
"""
Parameters
----------
hyperparameters : list of dict
List of dictionaries containing hyperparameters. Each dict has two entries.
``"hyperparameters"`` contains a dictionary with the keyword arguments that
must be passed to the celerite2 SHOTerm constructor. ``"metadata"`` is a
dictionary describing which hyperparameters were fixed in the fit.
name : str
Label or name for the hyperparameter set.
magnitude : float
Magnitude of the target in the observational band.
"""
super().__init__(hyperparameters)
self.name = name
self.magnitude = magnitude
def __repr__(self):
first = json.dumps(self[0], indent=4)
return (
f'<{self.__class__.__name__} ' +
(f'"{self.name}" ' if self.name is not None else '') +
f'(showing 1 of {len(self)}):\n[{first}...]>'
)
@staticmethod
def _load_from_json(path):
with open(path, 'r') as param_file:
hyperparameters = json.load(param_file)
return hyperparameters
[docs]
@classmethod
def from_soho_virgo(cls, path=None, name='SOHO VIRGO/PMO6'):
"""
Load the SOHO VIRGO/PMO6 total solar irradiance hyperparameters.
See [1]_ for more information on SOHO VIRGO/PMO6.
Parameters
----------
path : str or None
Path to the solar hyperparameter JSON file. If None, loads
the default gadfly solar hyperparameter list.
name : str
Name of the hyperparameter set
References
----------
.. [1] `VIRGO Data Products Archive
<https://www.pmodwrc.ch/en/research-development/solar-physics/virgo-data-products-archived_webpage/>`_
"""
if path is None:
path = default_hyperparameter_path
hyperparameters = cls._load_from_json(path)
return cls(hyperparameters, name=name)
[docs]
@classmethod
@u.quantity_input(mass=u.g, temperature=u.K, radius=u.m, luminosity=u.L_sun)
def for_star(
cls, mass, radius, temperature, luminosity,
bandpass=None, name=None, quiet=False, magnitude=None
):
"""
Applying scaling relations to the SOHO VIRGO/PMO6 total solar
irradiance hyperparameters for given stellar properties.
Parameters
----------
mass : ~astropy.units.Quantity
Stellar mass
radius : ~astropy.units.Quantity
Stellar radius
temperature : ~astropy.units.Quantity
Effective temperature
luminosity : ~astropy.units.Quantity
Stellar luminosity
bandpass : str
Name of the observing bandpass.
name : str
Name for the star or set of hyperparameters
quiet : bool
Turn off warning for invalid input parameters to scaling
relations when ``quiet=True``.
magnitude : float
Magnitude of the target star in the observing band
"""
hyperparameters = cls._load_from_json(default_hyperparameter_path)
# extract the granulation hyperparameters
granulation_hyperparams = [
item for item in hyperparameters
if item['metadata']['source'] == 'granulation'
]
# extract the p-mode hyperparameters
p_mode_hyperparams = [
item for item in
sorted(
hyperparameters,
key=lambda x: x['metadata'].get('degree', -1)
)
if item['metadata']['source'] == 'oscillation'
]
# put the p-mode hyperparams in a vector format, which is needed
# shortly for the call to _p_mode_fit_to_sho_hyperparams
p_mode_hyperparams = np.transpose(
[[param_set['hyperparameters'].get(param)
for param in ['S0', 'Q']]
for param_set in p_mode_hyperparams]
).ravel()
# get the hyperparameter sets and the corresponding labels to the
# spherical degree "ell" for each set:
(S0_fit, solar_w0, Q_fit), ell_labels = (
_p_mode_fit_to_sho_hyperparams(p_mode_hyperparams)
)
solar_gran_S0, solar_gran_w0, solar_gran_Q = np.transpose(
[[param_set['hyperparameters'].get(param)
for param in ['S0', 'w0', 'Q']]
for param_set in granulation_hyperparams]
)
# basic asteroseismic parameters:
solar_nu_max = scale._solar_nu_max
scaled_nu_max = solar_nu_max * scale.nu_max(mass, temperature, radius)
# get access to the astronomical filter bandpass via tynt:
filt = Filter(bandpass)
# scale the amplitudes for the observing bandpass:
amp_with_wavelength = scale.amplitude_with_wavelength(
filt, temperature
)
# scale the amplitudes by a term for granulation
granulation_amp = scale.granulation_amplitude(
mass, temperature, luminosity
)
granulation_timescale = scale.tau_gran(
mass, temperature, luminosity
)
# scale the hyperparameters for each of the granulation components
scaled_hyperparameters = []
for item in granulation_hyperparams:
params = item['hyperparameters']
scale_S0 = (
params['S0'] *
# scale the amplitudes by a term for granulation:
granulation_amp *
# also scale the amplitudes for the observing bandpass:
amp_with_wavelength
)
# scale the timescales:
scaled_w0 = params['w0'] / granulation_timescale
if scaled_w0 > 0:
scaled_hyperparameters.append(
dict(
hyperparameters=dict(
S0=scale_S0,
w0=scaled_w0,
Q=params['Q']),
metadata=item['metadata']
)
)
else:
if not quiet:
msg = (
"The scaled solar hyperparameter with frequency "
f"w0(old)={params['w0']:.0f} is being scaled to "
f"w0(new)={scaled_w0:.0f}, which is not positive. "
f"This kernel term will be omitted."
)
warnings.warn(msg, AstropyUserWarning)
# prepare to scale the hyperparameters for each of the
# oscillation components, which also depend on the
# amplitude of granulation at the frequency where the
# oscillations occur:
solar_nu = solar_w0 / (2 * np.pi) * u.uHz
granulation_background_solar = _sho_psd(
2 * np.pi * solar_nu[:, None],
solar_gran_S0[None, :] * u.cds.ppm**2 / u.uHz,
solar_gran_w0[None, :] * u.uHz,
solar_gran_Q[None, :]
) * amp_with_wavelength
scale_delta_nu = scale.delta_nu(mass, radius)
solar_delta_nu = solar_nu - solar_nu_max
scaled_delta_nu = solar_delta_nu * scale_delta_nu
scaled_nu = scaled_nu_max + scaled_delta_nu
scaled_w0 = 2 * np.pi * scaled_nu.to(u.uHz).value
# limit to positive scaled frequencies:
only_positive_omega = scaled_w0 > 0
solar_nu = solar_nu[only_positive_omega]
S0_fit = S0_fit[only_positive_omega]
Q_fit = Q_fit[only_positive_omega]
scaled_nu = scaled_nu[only_positive_omega]
scaled_w0 = scaled_w0[only_positive_omega]
# if bandpass isn't SOHO, use mean wavelength:
wavelength = (
filt.mean_wavelength
if filt.mean_wavelength is not None
else 550 * u.nm
)
p_mode_scale_factor = (
# scale p-mode "heights" like Kiefer et al. (2018)
# as a function of frequency (scaled_nu)
scale.p_mode_intensity(
temperature, scaled_nu, scaled_nu_max,
scale._solar_delta_nu * scale_delta_nu,
wavelength
) *
# scale the p-mode amplitudes according to
# stellar spectroscopic parameters:
scale.p_mode_amplitudes(
mass, temperature, luminosity
)
)
# scale the quality factors:
scaled_Gamma = 1.02 * np.exp(
(temperature - scale._solar_temperature) / (436 * u.K)
)
solar_Gamma = solar_nu.to_value(u.uHz) / Q_fit / 2 # [uHz]
scaled_Q = Q_fit * scaled_Gamma / solar_Gamma
solar_psd_at_p_mode_peaks = _sho_psd(
2 * np.pi * solar_nu,
S0_fit * u.cds.ppm**2 / u.uHz,
solar_w0[only_positive_omega] * u.uHz,
Q_fit
)
# Following Chaplin 2008 Eqn 3:
A = 2 * np.sqrt(4 * np.pi * solar_nu * solar_psd_at_p_mode_peaks)
unscaled_height = 2 * A ** 2 / (np.pi * solar_Gamma)
scaled_height = (
unscaled_height * p_mode_scale_factor
)
scaled_A = np.sqrt(np.pi * scaled_Gamma * scaled_height / 2)
scaled_psd_at_p_mode_peaks = (
(scaled_A / 2)**2 / (4 * np.pi * scaled_nu)
).to_value(u.cds.ppm**2/u.uHz)
scaled_S0 = (
0.5 *
(np.pi / 2)**0.5 *
scaled_psd_at_p_mode_peaks /
scaled_Q ** 2
) * granulation_background_solar.sum(1)[only_positive_omega].value
scaled_w0 = np.ravel(
np.repeat(scaled_w0[None, :], len(S0_fit), 0)
)
scaled_Q = np.ravel(scaled_Q)
for S0, w0, Q, degree in zip(scaled_S0, scaled_w0, scaled_Q, ell_labels):
if np.all(np.array([S0, w0]) > 0):
scaled_hyperparameters.append(
dict(
hyperparameters=dict(
S0=S0,
w0=w0,
Q=Q),
metadata=dict(
source='oscillation',
scaled=True,
degree=degree
)
)
)
return cls(scaled_hyperparameters, name, magnitude)
[docs]
class StellarOscillatorKernel(celerite2_terms.TermConvolution):
"""
A sum of :py:class:`~celerite2.terms.SHOTerm` simple harmonic oscillator
kernels generated by gadfly to approximate the total solar irradiance
power spectrum.
:py:class:`~gadfly.core.StellarOscillatorKernel` inherits from
:py:class:`~celerite2.terms.TermConvolution`.
"""
@u.quantity_input(texp=u.s)
def __init__(self, hyperparameters=None, texp=None, delta=None, name=None, terms=None):
"""
Parameters
----------
hyperparameters : ~gadfly.Hyperparameters
Iterable of hyperparameters, each passed to the
:py:class:`~celerite2.terms.SHOTerm` constructor.
name : str
Name for this set of hyperparameters
texp : ~astropy.units.Quantity
Exposure time, convertible to inverse microhertz.
delta : float (optional)
Exposure time, in units of inverse microhertz.
terms : list
Kernel terms to add together. This argument
is intended only for internal use.
"""
kernel_components = []
if hyperparameters is not None:
self.hyperparameters = hyperparameters
if name is None and hyperparameters.name is not None:
name = hyperparameters.name
kernel_components += [
celerite2_terms.SHOTerm(**p['hyperparameters']) for p in self.hyperparameters
]
if terms is not None:
kernel_components += terms
self.name = name
term_sum = celerite2_terms.TermSum(*kernel_components)
if delta is None:
if texp is None:
default_exp = 1 * u.min
msg = (
"An exposure time is required to construct the kernel. gadfly will assume "
f"a default exposure time of {default_exp}. To prevent this warning, supply "
f"the kernel with the `texp` keyword argument."
)
warnings.warn(msg, AstropyUserWarning)
texp = default_exp
delta = texp.to(1/u.uHz).value
super().__init__(term_sum, delta)
[docs]
def plot(self, **kwargs):
return plot_power_spectrum(kernel=self, **kwargs)
plot.__doc__ = plot_power_spectrum.__doc__
@classmethod
def _from_terms(cls, terms, delta=None, name=None):
return cls(terms=terms, delta=delta, name=name)
def __add__(self, other):
"""
Assumes ``other`` is a SHOTerm or subclass.
"""
if not isinstance(other, list):
other_names = [other.name]
other = [other]
else:
other_names = [t.name for t in other.terms]
name = ""
if self.name is not None:
name += self.name
for other_name in other_names:
if other_name is not None:
if len(name):
name += " + " + other_name
else:
name += other_name
return StellarOscillatorKernel._from_terms(
list(self.term.terms) + other, delta=self.delta, name=name
)
[docs]
class SolarOscillatorKernel(StellarOscillatorKernel):
"""
Like a :py:class:`~gadfly.core.StellarOscillatorKernel`, but initialized
with the default solar SOHO VIRGO/PMO6 kernel hyperparameters. The hyperparameters
are initialized with the :py:class:`~gadfly.core.Hyperparameters` class method
:py:meth:`~gadfly.core.Hyperparameters.for_star` assuming exactly
solar mass, radius, temperature, and luminosity.
The primary difference with :py:class:`~gadfly.core.StellarOscillatorKernel` is
that the user need not provide :py:meth:`~gadfly.core.Hyperparameters`.
:py:class:`~gadfly.core.SolarOscillatorKernel` inherits from
:py:class:`~celerite2.terms.TermConvolution` and
:py:class:`~gadfly.core.StellarOscillatorKernel`.
"""
@u.quantity_input(texp=u.s)
def __init__(self, texp=None, delta=None, bandpass=None, name=None):
"""
Parameters
----------
texp : ~astropy.units.Quantity
Exposure time, convertible to inverse microhertz.
"""
hp = Hyperparameters.for_star(
mass=1*u.M_sun, radius=1*u.R_sun,
temperature=5777*u.K, luminosity=1*u.L_sun,
bandpass=bandpass
)
super().__init__(
hp, texp=texp, delta=delta, name=name
)
[docs]
class ShotNoiseKernel(celerite2_terms.SHOTerm):
"""
A subclass of :py:class:`~celerite2.terms.SHOTerm` which approximates
shot noise in Kepler observations, for example.
"""
# This is intentionally really large. In [uHz].
w0 = 1e7
# value does not matter much if w0 >>> 1
Q = 0.5
def __init__(self, *args, name=None, **kwargs):
"""
Parameters
----------
args : dict
Hyperparameters for the SHO kernel, :math:`S_0, \\omega_0, Q`.
name : str
Name to store for the target/instrument.
kwargs : dict
Extra keyword arguments to pass to the
:py:class:`~celerite2.terms.SHOTerm` constructor.
"""
if name is None:
name = "Shot noise"
self.name = name
super().__init__(*args, **kwargs)
[docs]
@classmethod
def from_kepler_light_curve(cls, light_curve):
"""
Estimate Kepler shot noise from the light curve.
The light curve must have a Kepler magnitude in the metadata.
Assumes the noise relation in Jenkins et al. (2010) [1]_.
Parameters
----------
light_curve : ~lightkurve.lightcurve.LightCurve
Kepler light curve.
References
----------
.. [1] `Jenkins et al. (2010)
<https://ui.adsabs.harvard.edu/abs/2010ApJ...713L.120J/abstract>`_
"""
from lightkurve import LightCurveCollection
if isinstance(light_curve, LightCurveCollection):
light_curve = light_curve.stitch(lambda x: x)
kepler_mag = light_curve.meta['KEPMAG']
norm = 2 * np.pi / len(light_curve.time) ** 0.5
_, unscaled_S0 = cls.kepler_mag_to_noise_amplitude(kepler_mag)
S0 = (unscaled_S0 * norm) ** 0.5
return cls(S0=S0.to(u.cds.ppm).value, w0=cls.w0, Q=cls.Q)
[docs]
@staticmethod
def kepler_mag_to_noise_amplitude(kepler_mag):
"""
Kepler noise in 6 hour bins, from Jenkins et al. (2010) [1]_.
Parameters
----------
kepler_mag : float
Kepler magnitude (:math:`K_p`) of the star.
References
----------
.. [1] `Jenkins et al. (2010)
<https://ui.adsabs.harvard.edu/abs/2010ApJ...713L.120J/abstract>`_
"""
c = 3.46 * 10 ** (0.4 * (12 - kepler_mag) + 8)
sigma_lower = np.sqrt(
c + 7e6 * np.max([np.ones_like(kepler_mag), kepler_mag / 14], axis=0) ** 4
) / c # units of relative flux
sigma_upper = np.sqrt(c + 7e7) / c
# convert to ppm:
return 1e6 * np.array([sigma_lower, sigma_upper]) * u.cds.ppm ** 2
[docs]
class Filter(tynt.Filter):
"""
Convenience subclass for the ``tynt`` API for photometric
filter transmittance curves.
``tynt`` is a parameterized filter bandpass package [1]_.
References
----------
.. [1] `tynt source code on GitHub <https://github.com/bmorris3/tynt>`_.
"""
generator = tynt.FilterGenerator()
available_filters = generator.available_filters()
default_filter = 'Kepler/Kepler.K'
def __init__(self, identifier_or_filter, download=False):
"""
Parameters
----------
identifier_or_filter : str, ~gadfly.Filter, ~tynt.Filter
Input name of the filter, or a ``gadfly`` or ``tynt``
implementation of the filter object.
download : bool
If True, download the true transmittance curve. Default
is False.
"""
# Use Kepler bandpass if None specified, and give a warning.
if identifier_or_filter is None:
msg = (
"An observing bandpass is required to construct the kernel. gadfly "
f'will assume the default filter "{self.default_filter}". To prevent '
f"this warning, supply the Hyperparameters with the `bandpass` "
"keyword argument."
)
warnings.warn(msg, AstropyUserWarning)
identifier_or_filter = self.default_filter
# Define a "bandpass" for SOHO, which is bolometric
if isinstance(identifier_or_filter, str) and identifier_or_filter.upper() == 'SOHO VIRGO':
wavelength = np.logspace(-1.5, 1.5, 1000) * u.um
super().__init__(wavelength, np.ones_like(wavelength.value))
else:
if isinstance(identifier_or_filter, (tynt.Filter, Filter)):
# this happens if the user supplies a filter, we just
# return the same filter:
super().__init__(
identifier_or_filter.wavelength,
identifier_or_filter.transmittance
)
else:
# otherwise try to reconstruct the bandpass transmittance from
# the tynt FFT parameterization:
if identifier_or_filter in self.available_filters and not download:
filt = self.generator.reconstruct(identifier_or_filter)
elif not download:
msg = (
f'The observing bandpass "{identifier_or_filter}" is not recognized '
f'in the pre-loaded bandpasses in tynt, and the `download` keyword is '
f'"{download}". To trigger a remote filter bandpass download from the '
f'SVO FPS, set `download=True`.'
)
raise ValueError(msg)
else:
filt = self.generator.download_true_transmittance(identifier_or_filter)
super().__init__(filt.wavelength, filt.transmittance)
@property
def mean_wavelength(self):
"""
Transmittance-weighted mean wavelength of the filter bandpass.
"""
if np.all(self.transmittance == 1):
return None
return np.average(self.wavelength, weights=self.transmittance)