Source code for simqso.sqphoto

#!/usr/bin/env python

import os
import numpy as np
from collections import OrderedDict
from scipy.interpolate import interp1d
from scipy.integrate import simps
from scipy.constants import c
c_Angs = c*1e10

from astropy.io import fits
from astropy.table import Table

from .sqbase import datadir

softening_parameter = np.array([1.4,0.9,1.2,1.8,7.4])*1e-10

[docs]def nmgy2abmag(_b,f,df=None): ii = np.where(f>0) mag = 99.99 + np.zeros_like(f) mag[ii] = 22.5 - 2.5*np.log10(f[ii]) if df is None: return mag else: err = np.zeros_like(mag) err[ii] = 1.0857 * df[ii]/f[ii] return mag,err
[docs]def abmag2nmgy(_b,m): return 10**(-0.4*(m - 22.5))
[docs]def nmgy2asinhmag(_b,f,df=None): b = softening_parameter['ugriz'.find(_b)] mag = -1.0857*(np.arcsinh(1e-9*f/(2*b)) + np.log(b)) if df is None: return mag else: err = 1.0857 * 1e-9*df/(2*b) / np.sqrt(1+((1e-9*f)/(2*b))**2) return mag,err
[docs]def asinhmag2nmgy(_b,m): b = softening_parameter['ugriz'.find(_b)] return 2*b*np.sinh(m/(-1.0857) - np.log(b)) / 1e-9
# Vega... # # SDSS photometry model # _sdss_phot_pars = { 'gain':[ 1.6, 3.925, 4.7225, 4.86, 4.76, ], 'darkVariance':[ 9.45625, 1.63125, 1.16125, 6.25, 1.105, ], 'sky':[ 1.33136, 1.70364, 4.35521, 8.10383, 25.3321, ], 'skyErr':[ 0.00657128, 0.00254991, 0.00409365, 0.00670879, 0.0255355, ], 'nEffPsf':[ 36.4706, 32.1649, 27.2765, 24.854, 25.9643, ], 'nMgyPerCount':[ 0.00981, 0.00378, 0.00507, 0.00662, 0.0337, ], }
[docs]class sdssPhotoUnc(object): ''' In a given SDSS band "b", provide the uncertainty for a given flux in nanomaggies (f_nmgy) based on the distribution of observing conditions. --> Currently underestimates true scatter by using gaussians for the scatter, whereas the true distributions generally have long tails to higher values for sky, nEff, etc. see http://classic.sdss.org/dr7/algorithms/fluxcal.html for details ''' def __init__(self,b): i = 'ugriz'.find(b) self.pixArea = 0.396**2 # pix -> arcsec^2 self.gain = _sdss_phot_pars['gain'][i] self.darkVar = _sdss_phot_pars['darkVariance'][i] self.skyMean = _sdss_phot_pars['sky'][i] self.skyStd = {'u':0.4,'g':0.4,'r':1.2,'i':2.0,'z':5.0}[b] self.skyMin = {'u':0.6,'g':1.0,'r':2.2,'i':3.2,'z':8.3}[b] #skyErr_nmgy = _sdss_phot_pars['skyErr'][i] # not used... # nEffPsf distribution is roughly similar in all bands self.npixMean = _sdss_phot_pars['nEffPsf'][i] self.npixStd = 5.0 self.npixMin = 10.0 self.c2fMean = _sdss_phot_pars['nMgyPerCount'][i] self.c2fStd = {'u':2.3e-3,'g':3.9e-4,'r':2.7e-4, 'i':3.7e-4,'z':5.6e-3}[b] self.c2fMin = {'u':5.7e-3,'g':2.3e-3,'r':3.5e-3, 'i':4.7e-3,'z':1.4e-2}[b] # add in a global photometric calibration error term self.calibrationError = 0.015 def __call__(self,f_nmgy): shape = f_nmgy.shape gain = self.gain pixArea = self.pixArea darkVar = self.darkVar sky_nmgy_asec2 = np.clip(np.random.normal(self.skyMean, self.skyStd,shape), self.skyMin,np.inf) npix = np.clip(np.random.normal(self.npixMean,self.npixStd,shape), self.npixMin,np.inf) c2f = np.clip(np.random.normal(self.c2fMean,self.c2fStd,shape), self.c2fMin,np.inf) df = np.sqrt( f_nmgy*(c2f/gain) + sky_nmgy_asec2*pixArea*npix*(c2f/gain) + darkVar*npix*(c2f/gain)**2 + (self.calibrationError*f_nmgy)**2 ) return df
[docs]class empiricalPhotoUnc(object): '''approximation only valid in sky-dominated regime''' def __call__(self,f_nmgy): shape = f_nmgy.shape # set the flux for non-detections to be at d(mag) = 1.0 magLim = self.b / self.a magAB = np.clip(nmgy2abmag(self.b,f_nmgy),0,magLim) scatter = np.clip(self.scatter_a*magAB + self.scatter_b, 0.01, np.inf) b = self.b + scatter*np.random.normal(size=shape) log_dm = 2.5*(self.a*magAB - b) dm = np.clip(10**log_dm, self.err_floor, np.inf) f = np.clip(f_nmgy,abmag2nmgy(self.band,magLim),np.inf) return f * dm / 1.0857
[docs]class ukidsslasPhotoUnc(empiricalPhotoUnc): def __init__(self,b): UKIDSS_LAS_terms = np.array([[0.13616,3.1360,0.029], [0.14665,3.3081,0.043], [0.14429,3.2105,0.040], [0.15013,3.3053,0.028]]) self.band = b i = 'YJHK'.find(b) self.a,self.b,self.scatter_b = UKIDSS_LAS_terms[i] # ignoring magnitude-dependent scatter since all useful fluxes are # in the sky-dominated regime self.scatter_a = 0.0 # scatter seems to be slightly overestimated self.scatter_b *= 0.9 # calibration uncertainty floor self.err_floor = 0.015
[docs]class ukidssdxsPhotoUnc(empiricalPhotoUnc): '''as with Stripe82, not valid at bright magnitudes (m<~20)''' def __init__(self,b): UKIDSS_DXS_terms = np.array([[0.13408,3.3978,0.016], [0.14336,3.5461,0.023]]) self.band = b i = 'JK'.find(b) self.a,self.b,self.scatter_b = UKIDSS_DXS_terms[i] # ignoring magnitude-dependent scatter since all useful fluxes are # in the sky-dominated regime self.scatter_a = 0.0 # scatter seems to be slightly overestimated again (?) self.scatter_b *= 0.8 # calibration uncertainty floor self.err_floor = 0.015
[docs]class sdssStripe82PhotoUnc(empiricalPhotoUnc): '''this fails at m<~18 when SDSS detections are no longer sky-dominated, but not really interested in bright objects on the Stripe... also, dominated by calibration uncertainty for bright objects anyway ''' def __init__(self,b): stripe82terms = np.array([[0.15127,3.8529,0.00727,-0.1308], [0.15180,4.0233,0.00486,-0.0737], [0.14878,3.8970,0.00664,-0.1077], [0.14780,3.8024,0.00545,-0.0678], [0.14497,3.5437,0.00715,-0.1121]]) self.band = b i = 'ugriz'.find(b) self.a,self.b,self.scatter_a,self.scatter_b = stripe82terms[i] # calibration uncertainty floor self.err_floor = 0.015
[docs]class cfhtlsWidePhotoUnc(empiricalPhotoUnc): '''as with Stripe82, not valid at bright magnitudes (m<~19)''' def __init__(self,b): cfhtlswideterms = np.array([[0.16191,4.4005,0.037], [0.15508,4.3392,0.034], [0.15902,4.3399,0.015], [0.15721,4.2786,0.028], [0.16092,4.1967,0.034]]) self.band = b i = 'ugriz'.find(b) self.a,self.b,self.scatter_b = cfhtlswideterms[i] # ignoring magnitude-dependent scatter since all useful fluxes are # in the sky-dominated regime self.scatter_a = 0.0 # calibration uncertainty floor self.err_floor = 0.015
# WISE photometric model # need to find original reference, values updated to AllWISE by Feige _wise_phot_pars = { 'n':{'W1':1.10688849e-08, 'W2':4.80175292e-08, 'W3':4.4392921685e-06, 'W4':0.000104547518424 }, 'n_lo':{'W1':8.54987472e-09, 'W2':3.69296068e-08, }, 'n_hi':{'W1':1.35878952e-08, 'W2':5.91054515e-08, }, 'a':{'W1':2.22977172e-02, 'W2':1.92250161e-02, 'W3':0.01, 'W4':0.01}, 'a_lo':{'W1':2.18542185e-02, 'W2':1.89920736e-02, }, 'a_hi':{'W1':2.27412159e-02, 'W2':1.94579587e-02, }, # http://wise2.ipac.caltech.edu/docs/release/prelim/expsup/sec4_3g.html#WISEZMA 'ABtoVega':{'W1':2.699,'W2':3.339,'W3':5.174,'W4':6.620}, }
[docs]class allwisePhotoUnc(object): def __init__(self,b): self.band = b self.n = _wise_phot_pars['n'][b] self.n_lo = _wise_phot_pars['n_lo'][b] self.n_hi = _wise_phot_pars['n_hi'][b] self.a = _wise_phot_pars['a'][b] self.a_lo = _wise_phot_pars['a_lo'][b] self.a_hi = _wise_phot_pars['a_hi'][b] self.vegaConv = _wise_phot_pars['ABtoVega'][b] def __call__(self,f_nmgy): vegaMag = nmgy2abmag(self.band,f_nmgy) - self.vegaConv sig_m = self.a + 1.0857*self.n/(10**(-0.4*vegaMag)) s = f_nmgy.shape try: # this is legacy code for approximating the depth variations # over the sky, not really sure how valid it is sig_m_lo = self.a_lo + 1.0857*self.n_lo/(10**(-0.4*vegaMag)) sig_m_hi = self.a_hi + 1.0857*self.n_hi/(10**(-0.4*vegaMag)) lo = np.abs(np.random.normal(scale=1.0*(sig_m-sig_m_lo),size=s)) hi = np.abs(np.random.normal(scale=1.0*(sig_m_hi-sig_m),size=s)) x = np.random.random(size=s) sig_m += np.choose(x<0.5,[hi,-lo]) except: pass sig_m = np.clip(sig_m,0.02,np.inf) return sig_m * f_nmgy / 1.0857
_tmass_phot_pars = { 'x0':{'J':14.47706168, 'H':13.74112897, 'K':13.10023926 }, 'y0':{'J':-3.56813635, 'H':-3.51598265, 'K':-3.60406916 }, 'k':{'J':0.69856564, 'H':0.75697424, 'K':0.79939991 }, 'c':{'J':0.21028606, 'H':0.30856437, 'K':0.28210028 }, 'ABtoVega':{'J':0.894,'H':1.374,'K':1.84} }
[docs]def piecewise_linear(x, x0, y0, k): return np.piecewise(x, [x < x0], [lambda x: y0, lambda x:k*x + y0-k*x0])
[docs]class tmassPhotoUnc(object): def __init__(self,b): self.band = b self.x0 = _tmass_phot_pars['x0'][b] self.y0 = _tmass_phot_pars['y0'][b] self.k = _tmass_phot_pars['k'][b] self.c = _tmass_phot_pars['c'][b] self.vegaConv = _tmass_phot_pars['ABtoVega'][b] def __call__(self,f_nmgy): abMag = nmgy2abmag(self.band,f_nmgy) vegaMag = abMag - self.vegaConv sig_m = piecewise_linear(vegaMag,self.x0,self.y0,self.k) s = f_nmgy.shape sig_m_scatter = np.random.normal(scale=0.5*self.c,size=s) sig_m +=sig_m_scatter # sig_flux = np.exp(sig_m) # sig_flux = (-0.4*np.log(10) * sig_m * np.power(10,-0.4*abMag) * 3631) sig_m = np.exp(sig_m) return sig_m * f_nmgy /1.0857
supported_photo_systems = { 'SDSS':{ 'Legacy':{'bands':'ugriz','magSys':'asinh','uncMap':sdssPhotoUnc}, 'Stripe82':{'bands':'ugriz','magSys':'AB','uncMap':sdssStripe82PhotoUnc}, }, 'CFHT':{ 'CFHTLS_Wide':{'bands':'ugriz','magSys':'AB','uncMap':cfhtlsWidePhotoUnc}, }, 'UKIRT':{ 'UKIDSS_LAS':{'bands':'YJHK','magSys':'AB','uncMap':ukidsslasPhotoUnc}, 'UKIDSS_DXS':{'bands':'JHK','magSys':'AB','uncMap':ukidssdxsPhotoUnc}, }, 'WISE':{ 'AllWISE':{'bands':['W1','W2'],'magSys':'AB','uncMap':allwisePhotoUnc}, }, 'TMASS':{ 'Allsky':{'bands':['J','H','K'],'magSys':'AB','uncMap':tmassPhotoUnc}, }, 'DECam':{ 'DECaLS':{'bands':'grz','magSys':'AB','uncMap':None}, 'DES':{'bands':'grizy','magSys':'AB','uncMap':None}, }, 'BASS-MzLS':{ 'BASS-MzLS':{'bands':'grz','magSys':'AB','uncMap':None}, }, 'HSC':{ 'Wide':{'bands':'grizy','magSys':'AB','uncMap':None}, }, 'LSST':{ 'Wide':{'bands':'ugrizy','magSys':'AB','uncMap':None}, }, } # should find a better container / organization for this
[docs]def load_photo_map(photSystems): bandpasses = OrderedDict() filterdata = fits.open(os.path.join(datadir,'filtercurves.fits')) mapObserved = {} magSys = {} filtName = {} # ugh for photDesc in photSystems: try: photSysName,survey,bands = photDesc except ValueError: photSysName,survey = photDesc bands = None try: photSys = supported_photo_systems[photSysName][survey] except: raise ValueError('%s-%s not a valid photo system' % (photSysName,survey)) if bands is None: bands = photSys['bands'] for band in bands: bpName = '-'.join([photSysName,survey,band]) # a workaround for the naming of the extension in the filter file _photSysName = {'UKIRT':'UKIDSS'}.get(photSysName,photSysName) bpExt = '-'.join([_photSysName,band]) fdat = filterdata[bpExt].data fcurv = interp1d(fdat.lam.astype(np.float64), fdat.Rlam.astype(np.float64), bounds_error=False,fill_value=0.0,kind='slinear') # precompute the bandpass normalization norm = simps(fdat.Rlam/fdat.lam, fdat.lam) bandpasses[bpName] = dict(Rlam=fcurv,norm=norm,data=fdat) if photSys['uncMap'] is not None: mapObserved[bpName] = photSys['uncMap'](band) magSys[bpName] = photSys['magSys'] filtName[bpName] = bpExt filterdata.close() return dict(bandpasses=bandpasses,mapObserved=mapObserved, magSys=magSys,filtName=filtName)
[docs]def getPhotoCache(wave,photoMap): photoCache = OrderedDict() for b,bp in photoMap['bandpasses'].items(): bpdata = bp['data'] i1,i2 = np.searchsorted(wave,bpdata['lam'][[0,-1]],side='right') if i1==i2: lam,Rlam,dlam = 0.0,0.0,0.0 else: lam = wave[i1:i2] dlam = np.diff(wave[i1:i2]) dlam = np.concatenate([dlam,[dlam[-1],]]) Rlam = bp['Rlam'](wave[i1:i2]) photoCache[b] = {'ii':(i1,i2),'lam_Rlam_dlam':lam*Rlam*dlam, 'norm':bp['norm']} return photoCache
conv_Slam_to_Snu = 1/(c_Angs * 3631e-23)
[docs]def calcSynPhot(spec,photoMap=None,photoCache=None,mags=None,fluxes=None): if photoCache is None: photoCache = getPhotoCache(spec.wave,photoMap) if mags is None: mags = np.zeros(len(photoCache)) if fluxes is None: fluxes = np.zeros(len(photoCache)) for j,b in enumerate(photoCache): fnorm = photoCache[b]['norm'] i1,i2 = photoCache[b]['ii'] lamRlamdlam = photoCache[b]['lam_Rlam_dlam'] flam = spec.f_lambda[i1:i2] flux = np.sum(flam*lamRlamdlam) / fnorm fluxes[j] = flux * conv_Slam_to_Snu if fluxes[j] == 0: mags[j] = 99.99 else: mags[j] = min(-2.5*np.log10(fluxes[j]),99.99) # AB mag fluxes *= 1e9 # nanomaggies return mags,fluxes
[docs]def calcObsPhot(synFlux,photoMap,seed=None): obsFlux = np.empty_like(synFlux) obsFluxErr = np.empty_like(synFlux) obsMag = np.empty_like(synFlux) obsMagErr = np.empty_like(synFlux) gridShape = synFlux.shape[:-1] if seed: np.random.seed(seed) for j,b in enumerate(photoMap['bandpasses']): _b = b.split("-")[-1] # the short filter name obsFluxErr[...,j] = photoMap['mapObserved'][b](synFlux[...,j]) obsFlux[...,j] = synFlux[...,j] + \ obsFluxErr[...,j]*np.random.randn(*gridShape) if photoMap['magSys'][b]=='AB': obsMag[...,j],obsMagErr[...,j] = nmgy2abmag(_b,obsFlux[...,j], obsFluxErr[...,j]) elif photoMap['magSys'][b]=='asinh': obsMag[...,j],obsMagErr[...,j] = nmgy2asinhmag(_b,obsFlux[...,j], obsFluxErr[...,j]) else: raise ValueError return Table({'obsFlux':obsFlux,'obsFluxErr':obsFluxErr, 'obsMag':obsMag,'obsMagErr':obsMagErr})
[docs]class LazyPhotoMap(object): '''Needed for using photoMap in multiprocessing calls''' def __init__(self,photSystems): self.photSystems = photSystems self.photoMap = None self.photoCache = None def __loadup(self,spec=None): if self.photoMap is None: self.photoMap = load_photo_map(self.photSystems) if self.photoCache is None and spec is not None: self.photoCache = getPhotoCache(spec.wave,self.photoMap)
[docs] def calcSynPhot(self,spec): self.__loadup(spec) return calcSynPhot(spec,photoCache=self.photoCache)
[docs] def getBandpasses(self): self.__loadup() return self.photoMap['bandpasses']