Source code for simqso.sqbase

#!/usr/bin/env python

import os
import time
import numpy as np
from astropy.convolution import convolve,Gaussian1DKernel
from scipy.interpolate import interp1d,interp2d
from astropy import units as u

from pkg_resources import resource_filename
datadir = resource_filename('simqso', 'data')

[docs]def fixed_R_dispersion(lam1,lam2,R): '''Generate a wavelength grid at a fixed resolution d(log(lambda))^-1. Parameters ---------- lam1,lam2 : float Wavelengths endpoints in Angstroms. R : float Spectral resolution. Returns ------- wave : `~numpy.ndarray` Wavelength array between lam1 and lam2 with resolution R. Examples -------- >>> from simqso.sqbase import fixed_R_dispersion >>> import numpy as np; np.set_printoptions(precision=2) >>> wave = fixed_R_dispersion(3000,3010,1000) >>> print wave [ 3000. 3003. 3006.01 3009.01 3012.02] ''' loglam1 = np.log(lam1) loglam2 = np.log(lam2) dloglam = R**-1 loglam = np.arange(loglam1,loglam2+dloglam,dloglam) return np.exp(loglam)
[docs]def deres(f,Rin,Rout,fout=None): ''' Downgrade the resolution of a spectrum from Rin to Rout. Parameters ---------- f : ndarray Input spectrum. Rin : float Resolution of input spectrum (as in R=lambda/dlambda). Rout : float Resolution of output spectrum, Rout<Rin. fout : ndarray Optional array for output spectrum. Updates f in-place if fout=f. ''' assert Rout < Rin if fout is None: fout = np.zeros_like(f) kern = Gaussian1DKernel(Rin/Rout) fout[:] = convolve(f,kern,boundary='extend') return fout
[docs]def resample(x1,y1,x2): ''' Resample function onto new grid using simple interpolation. ''' resampfun = interp1d(x1,y1) return resampfun(x2)
[docs]def continuum_kcorr(obsBand,restBand,z,alpha_nu=-0.5): ''' A simple power-law k-correction. Parameters ---------- obsBand : str or float Observed band. Can be one of "SDSS-[ugriz]", "CFHT-[gri]", or a wavelength in Angstroms. restBand : str or float Rest-frame band. Can be one of "SDSS-[ugriz]", "CFHT-[gri]", or a wavelength in Angstroms. z : float or ndarray Emission redshift(s). alpha_nu : float Spectral index used to get k-correction, as f_nu ~ nu^alpha_nu. Returns ------- k : ndarray K(z) is spectral k-correction for a simple power-law continuum. ''' z = np.asarray(z) # CFHT: http://www.cfht.hawaii.edu/Science/mswg/filters.html effWave = {'SDSS-g':4670.,'SDSS-r':6165.,'SDSS-i':7471.,'SDSS-z':8918, 'CFHT-g':4770.,'CFHT-r':6230.,'CFHT-i':7630.} try: obsWave = float(obsBand) except: obsWave = effWave[obsBand] try: restWave = float(restBand) except: restWave = effWave[restBand] # Following continuum K-corrections given in # Richards et al. 2006, AJ 131, 2766 kcorr = ( -2.5*(1+alpha_nu)*np.log10(1+z) - 2.5*alpha_nu*np.log10(restWave/obsWave) ) return kcorr
[docs]class SimKCorr(object): def __init__(self,obsBand,restBand): self.obsBand = obsBand self.restBand = restBand def __call__(self,m,z,inverse=False): raise NotImplementedError
[docs]class ContinuumKCorr(SimKCorr): def __init__(self,obsBand,restBand,alpha_nu=-0.5,effWaveBand=None): super(ContinuumKCorr,self).__init__(obsBand,restBand) self.alpha_nu = alpha_nu if effWaveBand is None: self._obsBand = self.obsBand else: self._obsBand = effWaveBand def __call__(self,m,z,inverse=False): return continuum_kcorr(self._obsBand,self.restBand,z, alpha_nu=self.alpha_nu)
[docs]class EmissionLineKCorr(SimKCorr): def __init__(self,obsBand,restBand,datFile=None): super(EmissionLineKCorr,self).__init__(obsBand,restBand) if not obsBand.startswith('SDSS'): raise ValueError if datFile is None: datFile = os.path.join(datadir,'bossdr9kcorr_ugriz.npz') self.data = np.load(datFile) b_j = 'ugriz'.find(obsBand[-1]) self.kcorr = self.data['kcorr'][...,b_j] self.mbins = self.data['mbins'][...,b_j] self.Mbins = self.data['Mbins'] self.zbins = self.data['zbins'] self.zz = np.tile(self.zbins,(len(self.Mbins),1)) self.K_M = interp2d(self.Mbins,self.zbins,self.kcorr.transpose()) self.K_m = interp2d(self.mbins,self.zz,self.kcorr) def __call__(self,m,z,inverse=False): Kfun = self.K_M if inverse else self.K_m if np.isscalar(m) or np.isscalar(z): return Kfun(m,z) return np.array([ Kfun(_m,_z)[0] for _m,_z in zip(m,z) ])
# if inverse: # return griddata([self.Mbins,self.zbins],self.kcorr,(m,z)) # else:
[docs]class AppToAbsMag(object): def __init__(self,cosmo,kcorr): self.cosmo = cosmo self.kcorr = kcorr def __call__(self,m,z,inverse=False): return self.cosmo.distmod(z).value + self.kcorr(m,z,inverse=inverse)
[docs]class Spectrum(object): ''' Base class for one-dimensional spectra. Parameters ---------- wave : `~numpy.ndarray` Input wavelength grid. f_lambda : `~numpy.ndarray` Input flux density. Default is None, which sets f_lambda[:] = 0. z : float Input redshift. Default is 0.0. ''' def __init__(self,wave,f_lambda=None,z=0.0): self.wave = wave.astype(float) # XXX if f_lambda is None: self.f_lambda = np.zeros_like(self.wave) else: self.f_lambda = f_lambda self.z = z # def _getotherspec(self,other): if np.isscalar(other): a2 = other else: if ((self.wave.size == other.wave.size) and (np.max(np.abs(self.wave-other.wave)/self.wave) < 1e-3)): a2 = other.f_lambda else: warnings.warn('interpolated spectrum!') ofunc = interp1d(other.wave,other.f_lambda, bounds_error=False,fill_value=0.0) a2 = ofunc(self.wave) return a2 def _op(self,other,op): '''generic math operations on spectrum, interpolating in wavelength if necessary.''' a1 = self.f_lambda a2 = self._getotherspec(other) return Spectrum(self.wave,op(a1,a2),self.z) # implement +-*/ def __add__(self,other): return self._op(other,np.add) def __sub__(self,other): return self._op(other,np.subtract) def __mul__(self,other): return self._op(other,np.multiply) def __div__(self,other): return self._op(other,np.divide) #
[docs] def setRedshift(self,z): ''' Set the redshift of the spectrum. *Does not modify the spectrum itself*. ''' self.z = z
[docs] def clear(self): ''' Clear the spectrum by zeroing the flux density vector and setting z=-1. ''' self.z = -1.0 self.f_lambda[:] = 0
#
[docs] def waveslice(self,w1,w2): ''' Return a slice of the spectrum between two wavelengths. ''' ii = np.where((self.wave > w1) & (self.wave < w2))[0] return Spectrum(self.wave[ii],self.f_lambda[ii],self.z)
[docs] def resample(self,newWave): ''' Resample the spectrum onto a new wavelength grid. ''' newFlux = interp1d(self.wave,self.f_lambda, bounds_error=False,fill_value=0.0) self.wave = newWave self.f_lambda = newFlux(newWave)
[docs] def convolve_restframe(self,g,*args): r''' Convolves the spectrum with the input function as f_lambda' = g(wave/(1+z),f_lambda). Optional \*args are passed to the convolution function. ''' self.f_lambda = g(self.wave/(1+self.z),self.f_lambda,*args)
[docs]class TimerLog(): ''' Simple utility for tracking wall time execution for various steps of the simulation. ''' def __init__(self): self.stages = ['StartSimulation'] self.times = [time.time()] def __call__(self,stage): self.stages.append(stage) self.times.append(time.time())
[docs] def dump(self): self.__call__('Finish') stages = self.stages[1:] times = np.array(self.times[1:]) - self.times[0] #itimes = np.concatenate([[0,],np.diff(times)]) itimes = np.diff(self.times) ftimes = itimes / times[-1] print('%20s %8s %8s %8s' % ('stage','time','elapsed','frac')) for t in zip(stages,itimes,times,ftimes): print('%20s %8.3f %8.3f %8.3f' % t) print()