Source code for gadfly.psd

import numpy as np
import astropy.units as u
from astropy.units import cds  # noqa
from astropy.timeseries import LombScargle

from .interp import interpolate_missing_data

# aliased here for convenience
ppm = u.cds.ppm

__all__ = ['PowerSpectrum', 'plot_power_spectrum']


def to_psd_units(power, assumed_flux_unit=ppm, assumed_freq_unit=u.uHz):
    """
    Return ``power`` with units of ppm^2/uHz.
    If ``power`` has no units, assume it has units of
    ``[ppm^2 / uHz]``.
    """
    if not hasattr(power, 'unit'):
        power = power * assumed_flux_unit**2 / assumed_freq_unit
    return power.to(ppm**2 / u.uHz)


def to_freq_units(freq, assumed_freq_unit=u.uHz):
    """
    Return ``freq`` with units of uHz.
    If ``freq`` has no units, assume it has units of
    ``[uHz]``.
    """
    if not hasattr(freq, 'unit'):
        freq = freq * assumed_freq_unit
    return freq.to(assumed_freq_unit)


[docs]def plot_power_spectrum( ax=None, kernel=None, obs=None, freq=None, figsize=(8, 4), n_samples=1000, p_mode_inset=False, legend=True, scaling_low_freq='loglog', scaling_p_mode='semilogy', inset_xlim=[1800, 4500], inset_ylim=[0.005, 1.3], inset_bounds=[0.5, 0.5, 0.47, 0.47], title=None, label_kernel=None, label_obs=None, label_inset='p-modes', kernel_kwargs=dict(), obs_kwargs=dict(), inset_kwargs=dict(), create_new_figure=False ): """ Plot a power spectrum. Requires ``matplotlib``. Parameters ---------- ax : :py:class:`~matplotlib.axes.Axes` kernel : None or subclass of :py:class:`~celerite2.terms.Term` obs : ~gadfly.psd.PowerSpectrum freq : ~astropy.units.Quantity figsize : list of floats n_samples : int p_mode_inset : bool legend : bool scaling_low_freq : str scaling_p_mode : str inset_xlim : list of floats inset_ylim : list of floats title : str label_inset : str label_obs : str label_kernel : str kernel_kwargs : dict obs_kwargs : dict inset_kwargs : dict create_new_figure : bool inset_bounds : list Returns ------- fig : :py:class:`~matplotlib.figure.Figure` ax : :py:class:`~matplotlib.axes.Axes` """ if obs is None and kernel is None: raise ValueError("Requires an observed power " "spectrum, a kernel, or both.") import matplotlib.pyplot as plt if freq is None: frequencies_all = np.logspace( -1, 3.5, int(n_samples) // 2 ) * u.uHz frequencies_p_mode = np.linspace( 2000, 4500, int(n_samples) // 2 ) * u.uHz freq = np.sort( np.concatenate([frequencies_all, frequencies_p_mode]) ) if ax is None and create_new_figure: fig, ax = plt.subplots(figsize=figsize) elif ax is None: ax = plt.gca() fig = plt.gcf() if p_mode_inset: ax_inset = ax.inset_axes(inset_bounds) else: ax_inset = None # Set default values, then overwrite them if supplied by user: obs_kw = dict(marker='o', lw=0, mfc='none') for k, v in obs_kwargs.items(): obs_kw[k] = v kernel_kw = dict(lw=0.8) for k, v in kernel_kwargs.items(): kernel_kw[k] = v inset_kw = dict(marker='.', lw=0) for k, v in inset_kwargs.items(): inset_kw[k] = v for i, (axis, plot_method, obs_plot_kwargs) in enumerate(zip( [ax, ax_inset], [scaling_low_freq, scaling_p_mode], [obs_kw, inset_kw] )): if axis is not None: if kernel is not None: if label_kernel is None: if kernel.name is None: label_kernel = 'Model' else: label_kernel = kernel.name # call the plot method (i.e.: plt.semilogy, plt.loglog) getattr(axis, plot_method)( to_freq_units(freq), to_psd_units(kernel.get_psd(2 * np.pi * freq.to(u.uHz).value)), label=label_kernel, **kernel_kw ) if obs is not None: if obs.name is not None and label_obs is None: label_obs = obs.name elif label_obs is None: label_obs = 'Observations' # call the plot method (i.e.: plt.semilogy, plt.loglog) getattr(axis, plot_method)( to_freq_units(obs.frequency), to_psd_units(obs.power), label=label_obs, **obs_plot_kwargs ) axis.set( xlabel='Frequency [$\\mu$Hz]', ylabel='Power [ppm$^2$ / $\\mu$Hz]', ) if p_mode_inset: ax_inset.set_xlim(inset_xlim) ax_inset.set_ylim(inset_ylim) ax_inset.annotate( label_inset, (0.97 * inset_xlim[1], 0.5 * inset_ylim[1]), ha='right' ) ax.indicate_inset_zoom(ax_inset, edgecolor="silver") if legend: ax.legend() ax.set_title(title) fig.tight_layout() return fig, ax
def spectral_binning(y, all_x, all_y): """ Spectral binning via trapezoidal approximation. """ if y[0] in all_y and y[-1] in all_y: min_ind = np.argwhere(all_y == y[0])[0, 0] max_ind = np.argwhere(all_y == y[-1])[0, 0] if max_ind > min_ind and all_x[max_ind] - all_x[min_ind] > 0: return ( np.trapz(y, all_x[min_ind:max_ind + 1]) / (all_x[max_ind] - all_x[min_ind]) ) return y[0] return np.nan def spectral_binning_err(y, all_x, all_y, constant=1): """ Approximate uncertainties for spectral bins estimated from a solar/stellar power spectrum. """ if y[0] in all_y and y[-1] in all_y: min_ind = np.argwhere(all_y == y[0])[0, 0] max_ind = np.argwhere(all_y == y[-1])[0, 0] mean_x = np.nanmean(all_x[min_ind:max_ind + 1]) # This term scales down the stddev (uncertainty) by the root of the # number of points in the bin == Gaussian uncertainty if max_ind > min_ind and all_x[max_ind] - all_x[min_ind] > 0: gaussian_term = np.nanstd(y) / len(y) ** 0.5 # This term scales the uncertainty with the spectral resolution of the bin non_gaussian_term = ( mean_x / (all_x[max_ind] - all_x[min_ind]) / constant ) return gaussian_term * non_gaussian_term return y[0] return np.nan def bin_power_spectrum(power_spectrum, bins=None, log=True, **kwargs): """ Bin a power spectrum, with log-spaced frequency bins. Parameters ---------- power_spectrum : ~gadfly.PowerSpectrum log : bool If true, compute bin edges based on the log base 10 of the frequency. bins : int or ~numpy.ndarray Number of bins, or the bin edges Returns ------- new_ps : ~gadfly.PowerSpectrum """ from scipy.stats import binned_statistic freq = power_spectrum.frequency power = power_spectrum.power if log: freq_axis = np.log10(freq.to(u.uHz).value) else: freq_axis = freq.to(u.uHz).value # Set the number of log-spaced frequency bins if bins is None: bins = len(freq_axis) // 10000 # Bin the power spectrum: bs = binned_statistic( freq_axis, power.value, statistic=lambda y: spectral_binning( y, all_x=freq_axis, all_y=power.value ), bins=bins ) # Compute the error in the power spectrum bins bs_err = binned_statistic( freq_axis, power.value, statistic=lambda y: spectral_binning_err( y, all_x=freq_axis, all_y=power.value, **kwargs ), bins=bins ) if log: freq_bins = 10 ** ( 0.5 * (bs.bin_edges[1:] + bs.bin_edges[:-1]) ) * freq.unit else: freq_bins = ( 0.5 * (bs.bin_edges[1:] + bs.bin_edges[:-1]) ) * freq.unit power_bins = to_psd_units(bs.statistic * power.unit) power_bins_err = to_psd_units(bs_err.statistic * power.unit) name = ( power_spectrum.name if power_spectrum.name is not None else 'Power spectrum' ) + ' (binned)' return PowerSpectrum( freq_bins, power_bins, power_bins_err, name=name ) def linear_space_to_jax_parameterization(all_S0s, all_omegas): """ Convert from (linear-space) S0 and w0 coordinates to the "differential" coordinates used within the jax minimizer. """ omega_diffs = [all_omegas[0]] for i in range(1, len(all_omegas)): omega_diffs.append(np.log10(all_omegas[i] - all_omegas[i - 1])) S0_diffs = [all_S0s[-1]] for i in np.arange(1, len(all_S0s))[::-1]: S0_diffs.append(np.log10(all_S0s[i - 1] - all_S0s[i])) S0_diffs = S0_diffs[::-1] initp = np.array(list(map( lambda x: np.array(x, dtype=np.float64), [S0_diffs, omega_diffs] ))).T.flatten() return initp def jax_parameterization_to_linear_space(p): """ Convert the best-fit jax parameters into the (linear-space) S0 and w0 parameters that celerite2 uses. """ delta_S0_0, w0_0 = p[0:2] delta_S0_1, delta_w0_1 = p[2:4] delta_S0_2, delta_w0_2 = p[4:6] delta_S0_3, delta_w0_3 = p[6:8] S0_4, delta_w0_4 = p[8:10] S0_3 = 10 ** delta_S0_3 + S0_4 S0_2 = 10 ** delta_S0_2 + S0_3 S0_1 = 10 ** delta_S0_1 + S0_2 S0_0 = 10 ** delta_S0_0 + S0_1 w0_1 = w0_0 + 10 ** delta_w0_1 w0_2 = w0_1 + 10 ** delta_w0_2 w0_3 = w0_2 + 10 ** delta_w0_3 w0_4 = w0_3 + 10 ** delta_w0_4 all_S0s = np.array([S0_0, S0_1, S0_2, S0_3, S0_4]) all_omegas = np.array([w0_0, w0_1, w0_2, w0_3, w0_4]) return all_S0s, all_omegas def linear_space_to_dicts(S0s, omegas, fixed_Q): """ Create a list of dictionaries for SHOTerm kwargs out of linear-space S0 and w0 coordinates """ result = [] for S0, w0 in zip(S0s, omegas): result.append( dict( S0=S0, w0=w0, Q=fixed_Q ) ) return result
[docs]class PowerSpectrum: """ An observed power spectrum. """ @u.quantity_input(frequency=u.uHz, power=ppm**2/u.uHz) def __init__( self, frequency, power, error=None, name=None, norm=None, detrended_lc=None ): """ Parameters ---------- frequency : ~astropy.units.Quantity Frequency of each power spectral density measurement power : ~astropy.units.Quantity Power spectral density at each frequency error : ~astropy.units.Quantity Uncertainty on PSD measurements name : str Name of the target/instrument norm : ~astropy.units.Quantity Normalization factor detrended_lc : ~lightkurve.LightCurve Detrended light curve to cache along with the PSD. """ self.frequency = frequency self.power = power self.error = error self.name = name self.norm = norm self.detrended_lc = detrended_lc @property def omega(self): """ Angular frequency. Returns a numpy array of :math:`\\omega = 2 \\pi f` with f in units of [uHz]. Returns ------- omegas : ~numpy.ndarray """ return 2 * np.pi * self.frequency.to(u.uHz).value @property def light_curve_rms(self): """ Convert from power spectral density to the approximate root mean square of the original light curve. Returns ------- rms : ~astropy.units.Quantity """ return (self.power * self.norm).to(ppm**2) ** 0.5
[docs] def bin(self, bins=None, **kwargs): """ Bin the power spectrum. Requires scipy. Other keyword arguments passed to :py:func:`~gadfly.psd.bin_power_spectrum`. Parameters ---------- bins : int or ~numpy.ndarray Number of bins or an array of bin edges. Returns ------- new_ps : ~gadfly.PowerSpectrum A new, binned power spectrum """ return bin_power_spectrum(self, bins, **kwargs)
[docs] @classmethod def from_light_curve( cls, light_curve, method='fft', include_zero_freq=False, name=None, detrend=True, detrend_poly_order=3, save_detrended_lc=True ): """ Compute the power spectrum from a light curve. Parameters ---------- light_curve : ~lightkurve.lightcurve.LightCurve, ~lightkurve.lightkurve.LightCurveCollection # noqa Light curve(s) method : str, options are "fft" or "lomb-scargle" Method used to compute the power spectrum. include_zero_freq : bool Include ``frequency=0`` in the first entry of the results. name : str Name for the power spectrum detrend : bool If ``True``, detrend the light curve. detrend_poly_order : int Polynomial order used for detrending the light curve save_detrended_lc : bool If ``True``, save the detrended light curve in an attribute on the PSD object. """ from lightkurve import LightCurve, LightCurveCollection available_methods = ['fft', 'lomb-scargle'] if not method.lower() in available_methods: raise ValueError(f'PowerSpectrum.from_lightcurve was given ' f'method="{method}", but it must be ' f'one of: {available_methods}.') # Interpolation is required if the PSD computation method is FFT method_is_fft = method.lower() == 'fft' if not isinstance(light_curve, LightCurveCollection): light_curve = LightCurveCollection([light_curve]) if detrend: # Interpolate over missing data points in each quarter, normalize by a # nth order polynomial to remove systematic trends lcs_interped = [] for lc in light_curve: lc_flux_unit = lc.flux.unit # we take the unmasked fluxes to avoid an astropy # MaskedQuantity bug raised when calling np.median later # to normalize the light curve: if hasattr(lc.flux, 'unmasked'): lc.flux = lc.flux.unmasked lc = lc.remove_nans().remove_outliers() if method_is_fft: t, f = interpolate_missing_data(lc.time.jd, lc.flux.value) else: t, f = lc.time.jd, lc.flux.value # if the light curve is not given in units combatible with ppm, # normalize it with a polynomial, center it at zero if not lc_flux_unit.is_equivalent(ppm): fit = np.polyval( np.polyfit(t - t.mean(), f, detrend_poly_order), t - t.mean() ) normed_flux = f / fit if hasattr(normed_flux, 'value'): normed_flux = normed_flux.value median_flux = np.median(normed_flux) flux_ppm = 1e6 * np.array(normed_flux / median_flux - 1) * ppm else: flux_ppm = f.copy() lc_int = LightCurve( time=t, # convert detrended flux to ppm with zero-mean: flux=flux_ppm, ) lcs_interped.append(lc_int) if len(lcs_interped) > 1: slc = LightCurveCollection(lcs_interped).stitch(lambda x: x) else: slc = lcs_interped[0] if method_is_fft: # After stitching together all quarters, interpolate again to fill gaps # between the quarters: t, f = interpolate_missing_data(slc.time.jd, slc.flux.value) else: t, f = slc.time.jd, slc.flux.value d = np.median(np.diff(t)) * u.day flux = f.copy() * ppm time = t.copy() * u.day name = light_curve[0].meta.get('name', name) else: if isinstance(light_curve, LightCurveCollection): light_curve = light_curve.stitch(lambda x: x) d = np.median(np.diff(light_curve.time.jd)) * u.day time = light_curve.time.jd * u.day flux = light_curve.flux name = light_curve.meta.get('name', name) if not hasattr(flux, 'unit'): # if flux has no units, assume ppm: flux = flux * ppm if method_is_fft: freq, power, norm = cls._fft(flux, d) else: freq, power, norm = cls._lomb_scargle(time, flux, d) # skip `frequency==0` if necessary: if not include_zero_freq: freq = freq[1:] power = power[1:] detrended_lc = LightCurve(time=time, flux=flux) if (detrend and save_detrended_lc) else None return cls(freq, power, name=name, norm=norm, detrended_lc=detrended_lc)
@staticmethod def _fft(flux, d): """ Compute the power spectrum using the FFT. Inputs and outputs are ~astropy.units.Quantity objects. """ # Strip units from flux: flux_ppm = flux.to(ppm).value # Measure the observed power spectrum via FFT: freq = np.fft.rfftfreq(len(flux_ppm), d).to(u.uHz) fft = np.fft.rfft(flux_ppm) # The FFT must be scaled by this factor, in addition # to the normalization by the length of the flux vector: norm = d / (2 * np.pi) ** 0.5 / len(flux_ppm) # The power spectrum in the usual asteroseismic units: power = to_psd_units( np.real(fft * np.conj(fft)) * ppm ** 2 * norm ) return freq, power, norm @staticmethod def _lomb_scargle(time, flux, d): """ Compute the power spectrum using the Lomb-Scargle method. Inputs and outputs are ~astropy.units.Quantity objects. """ # use the rfftfreq method to get frequencies for consistency with fft method: freq = np.fft.rfftfreq(len(flux), d).to(u.uHz) lomb_scargle = LombScargle(time, flux, normalization='psd') norm = d / (2 * np.pi) ** 0.5 power = to_psd_units(norm * lomb_scargle.power(freq)) return freq, power, norm
[docs] def plot(self, **kwargs): return plot_power_spectrum( obs=self, **kwargs )
# Add docstring to function that this method wraps: plot.__doc__ = plot_power_spectrum.__doc__
[docs] def cutout(self, frequency_min=None, frequency_max=None): """ Cut out a section of the power spectrum. Parameters ---------- frequency_min : ~astropy.units.Quantity Cut out any measurements below this frequency frequency_max : ~astropy.units.Quantity Cut out any measurements above this frequency Returns ------- new_ps : ~gadfly.PowerSpectrum A new, cropped power spectrum """ if frequency_min is None: frequency_min = 0 * u.Hz if frequency_max is None: frequency_max = np.inf * u.Hz bounds = ( (self.frequency <= frequency_max) & (self.frequency >= frequency_min) ) name = ( self.name if self.name is not None else 'Power spectrum' ) + ' (cutout)' args = [] if self.error is not None: args.append(self.error[bounds]) return PowerSpectrum( self.frequency[bounds], self.power[bounds], *args, name=name, norm=self.norm )