from __future__ import annotations
import glob as glob
import os
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
from numpy import float64, int64, ndarray
from pandas.core.frame import DataFrame
from .. import io
if TYPE_CHECKING:
from ..my_rassine_tools import spec_time_series
# =============================================================================
# IMPORT ALL RASSINE DICTIONNARY
# =============================================================================
[docs]def import_rassine_output(self: spec_time_series, return_name: bool=False, kw1: None=None, kw2: None=None) -> Any:
"""
Import all the RASSINE dictionnaries in a list
Parameters
----------
return_name : True/False to also return the filenames
Returns
-------
Return the list containing all thedictionnary
"""
directory = self.directory
files = glob.glob(directory + "RASSI*.p")
if len(files) <= 1: # 1 when merged directory
print("No RASSINE file found in the directory : %s" % (directory))
if return_name:
return [], []
else:
return []
else:
files = np.sort(files)
file = []
for i, j in enumerate(files):
self.debug = j
file.append(pd.read_pickle(j))
if kw1 is not None:
file[-1] = file[-1][kw1]
if kw2 is not None:
file[-1] = file[-1][kw2]
if return_name:
return file, files
else:
return file
# =============================================================================
# IMPORT SUMMARY TABLE
# =============================================================================
[docs]def import_star_info(self: spec_time_series) -> None:
self.star_info = pd.read_pickle(
self.dir_root + "STAR_INFO/Stellar_info_" + self.starname + ".p"
)
[docs]def import_table(self: spec_time_series) -> None:
self.table = pd.read_pickle(self.directory + "Analyse_summary.p")
[docs]def import_material(self: spec_time_series) -> None:
self.material = pd.read_pickle(self.directory + "Analyse_material.p")
# =============================================================================
# IMPORT THE FULL DICO CHAIN
# =============================================================================
[docs]def import_dico_tree(self: spec_time_series) -> None:
file_test = self.import_spectrum()
kw = list(file_test.keys())
kw_kept = []
kw_chain = []
for k in kw:
if len(k.split("matching_")) == 2:
kw_kept.append(k)
kw_kept = np.array(kw_kept)
info = []
for n in kw_kept:
try:
s = file_test[n]["parameters"]["step"]
dico = file_test[n]["parameters"]["sub_dico_used"]
info.append([n, s, dico])
except:
pass
info = pd.DataFrame(info, columns=["dico", "step", "dico_used"])
self.dico_tree = info.sort_values(by="step")
# =============================================================================
# IMPORT a RANDOM SPECTRUM
# =============================================================================
[docs]def import_spectrum(self: spec_time_series, num: Optional[int64]=None) -> Dict[str, Any]:
"""
Import a pickle file of a spectrum to get fast common information shared by all spectra
Parameters
----------
num : index of the spectrum to extract (if None random selection)
Returns
-------
Return the open pickle file
"""
directory = self.directory
files = glob.glob(directory + "RASSI*.p")
files = np.sort(files)
if not len(files):
files = glob.glob(directory.replace("WORKSPACE", "TEMP") + "RASSI*.p")
if num is None:
try:
num = np.random.choice(np.arange(len(files)), 1)
file = files[num][0]
except:
file = files[0]
else:
try:
file = files[num]
except:
file = files[0]
return pd.read_pickle(file)
[docs]def yarara_star_info(
self: spec_time_series,
Rv_sys: None=None,
simbad_name: None=None,
magB: None=None,
magV: None=None,
magR: None=None,
BV: None=None,
VR: None=None,
sp_type: None=None,
Mstar: None=None,
Rstar: None=None,
Vsini: None=None,
Vmicro: None=None,
Teff: None=None,
log_g: None=None,
FeH: None=None,
Prot: None=None,
Fwhm: Optional[List[Union[str, float64]]]=None,
Contrast: Optional[List[Union[str, float64]]]=None,
CCF_delta: None=None,
Pmag: None=None,
stellar_template: None=None,
) -> None:
kw = [
"Rv_sys",
"Simbad_name",
"Sp_type",
"magB",
"magV",
"magR",
"BV",
"VR",
"Mstar",
"Rstar",
"Vsini",
"Vmicro",
"Teff",
"Log_g",
"FeH",
"Prot",
"FWHM",
"Contrast",
"Pmag",
"stellar_template",
"CCF_delta",
]
val = [
Rv_sys,
simbad_name,
sp_type,
magB,
magV,
magR,
BV,
VR,
Mstar,
Rstar,
Vsini,
Vmicro,
Teff,
log_g,
FeH,
Prot,
Fwhm,
Contrast,
Pmag,
stellar_template,
CCF_delta,
]
self.import_star_info()
self.import_table()
self.import_material()
table = self.table
snr = np.array(table["snr"]).argmax()
file_test = self.import_spectrum(num=snr)
for i, j in zip(kw, val):
if j is not None:
if type(j) != list:
j = ["fixed", j]
if i in self.star_info.keys():
self.star_info[i][j[0]] = j[1]
else:
self.star_info[i] = {j[0]: j[1]}
# TODO:
# Here, we removed the Gray temperature and the MARCS atlas atmospheric model
# initialization
io.pickle_dump(
self.star_info,
open(self.dir_root + "STAR_INFO/Stellar_info_" + self.starname + ".p", "wb"),
)
# =============================================================================
# MAKE SUMMARY
# =============================================================================
# io
[docs]def yarara_analyse_summary(self: spec_time_series, rm_old: bool=False) -> None:
"""
Produce a summary table with the RASSINE files of the specified directory
"""
directory = self.directory
if rm_old:
os.system("rm " + directory + "Analyse_summary.p")
os.system("rm " + directory + "Analyse_summary.csv")
test, names = self.import_rassine_output(return_name=True)
# info obs
ins = np.array([test[j]["parameters"].get("instrument") for j in range(len(test))])
snr = np.array([test[j]["parameters"].get("SNR_5500") for j in range(len(test))])
snr2 = np.array([test[j]["parameters"].get("SNR_computed") for j in range(len(test))])
jdb = np.array([test[j]["parameters"].get("jdb") for j in range(len(test))])
mjd = np.array([test[j]["parameters"].get("mjd") for j in range(len(test))])
berv = np.array([test[j]["parameters"].get("berv") for j in range(len(test))])
berv_planet = np.array([test[j]["parameters"].get("berv_planet") for j in range(len(test))])
lamp_offset = np.array([test[j]["parameters"].get("lamp_offset") for j in range(len(test))])
rv = np.array([test[j]["parameters"].get("rv") for j in range(len(test))])
rv_shift = np.array([test[j]["parameters"].get("RV_shift") for j in range(len(test))])
rv_sec = np.array([test[j]["parameters"].get("RV_sec") for j in range(len(test))])
rv_moon = np.array([test[j]["parameters"].get("RV_moon") for j in range(len(test))])
airmass = np.array([test[j]["parameters"].get("airmass") for j in range(len(test))])
texp = np.array([test[j]["parameters"].get("texp") for j in range(len(test))])
seeing = np.array([test[j]["parameters"].get("seeing") for j in range(len(test))])
humidity = np.array([test[j]["parameters"].get("humidity") for j in range(len(test))])
rv_planet = np.array([test[j]["parameters"].get("rv_planet") for j in range(len(test))])
rv_dace = np.array([test[j]["parameters"].get("rv_dace") for j in range(len(test))])
rv_dace_std = np.array([test[j]["parameters"].get("rv_dace_std") for j in range(len(test))])
flux_balance = np.array([test[j]["parameters"].get("flux_balance") for j in range(len(test))])
transit = np.array([test[j]["parameters"].get("transit_in") for j in range(len(test))])
night_drift = np.array([test[j]["parameters"].get("drift_used") for j in range(len(test))])
night_drift_std = np.array(
[test[j]["parameters"].get("drift_used_std") for j in range(len(test))]
)
# info activity
rhk = np.array([test[j]["parameters"].get("RHK") for j in range(len(test))])
rhk_std = np.array([test[j]["parameters"].get("RHK_std") for j in range(len(test))])
kernel_rhk = np.array([test[j]["parameters"].get("Kernel_CaII") for j in range(len(test))])
kernel_rhk_std = np.array(
[test[j]["parameters"].get("Kernel_CaII_std") for j in range(len(test))]
)
ca2k = np.array([test[j]["parameters"].get("CaIIK") for j in range(len(test))])
ca2k_std = np.array([test[j]["parameters"].get("CaIIK_std") for j in range(len(test))])
ca2h = np.array([test[j]["parameters"].get("CaIIH") for j in range(len(test))])
ca2h_std = np.array([test[j]["parameters"].get("CaIIH_std") for j in range(len(test))])
ca2 = np.array([test[j]["parameters"].get("CaII") for j in range(len(test))])
ca2_std = np.array([test[j]["parameters"].get("CaII_std") for j in range(len(test))])
ca1 = np.array([test[j]["parameters"].get("CaI") for j in range(len(test))])
ca1_std = np.array([test[j]["parameters"].get("CaI_std") for j in range(len(test))])
mg1 = np.array([test[j]["parameters"].get("MgI") for j in range(len(test))])
mg1_std = np.array([test[j]["parameters"].get("MgI_std") for j in range(len(test))])
mg1a = np.array([test[j]["parameters"].get("MgIa") for j in range(len(test))])
mg1a_std = np.array([test[j]["parameters"].get("MgIa_std") for j in range(len(test))])
mg1b = np.array([test[j]["parameters"].get("MgIb") for j in range(len(test))])
mg1b_std = np.array([test[j]["parameters"].get("MgIb_std") for j in range(len(test))])
mg1c = np.array([test[j]["parameters"].get("MgIc") for j in range(len(test))])
mg1c_std = np.array([test[j]["parameters"].get("MgIc_std") for j in range(len(test))])
d3 = np.array([test[j]["parameters"].get("HeID3") for j in range(len(test))])
d3_std = np.array([test[j]["parameters"].get("HeID3_std") for j in range(len(test))])
nad = np.array([test[j]["parameters"].get("NaD") for j in range(len(test))])
nad_std = np.array([test[j]["parameters"].get("NaD_std") for j in range(len(test))])
nad1 = np.array([test[j]["parameters"].get("NaD1") for j in range(len(test))])
nad1_std = np.array([test[j]["parameters"].get("NaD1_std") for j in range(len(test))])
nad2 = np.array([test[j]["parameters"].get("NaD2") for j in range(len(test))])
nad2_std = np.array([test[j]["parameters"].get("NaD2_std") for j in range(len(test))])
ha = np.array([test[j]["parameters"].get("Ha") for j in range(len(test))])
ha_std = np.array([test[j]["parameters"].get("Ha_std") for j in range(len(test))])
hb = np.array([test[j]["parameters"].get("Hb") for j in range(len(test))])
hb_std = np.array([test[j]["parameters"].get("Hb_std") for j in range(len(test))])
hc = np.array([test[j]["parameters"].get("Hc") for j in range(len(test))])
hc_std = np.array([test[j]["parameters"].get("Hc_std") for j in range(len(test))])
hd = np.array([test[j]["parameters"].get("Hd") for j in range(len(test))])
hd_std = np.array([test[j]["parameters"].get("Hd_std") for j in range(len(test))])
heps = np.array([test[j]["parameters"].get("Heps") for j in range(len(test))])
heps_std = np.array([test[j]["parameters"].get("Heps_std") for j in range(len(test))])
wbk = np.array([test[j]["parameters"].get("WB_K") for j in range(len(test))])
wbk_std = np.array([test[j]["parameters"].get("WB_K_std") for j in range(len(test))])
wbh = np.array([test[j]["parameters"].get("WB_H") for j in range(len(test))])
wbh_std = np.array([test[j]["parameters"].get("WB_H_std") for j in range(len(test))])
wb = np.array([test[j]["parameters"].get("WB") for j in range(len(test))])
wb_std = np.array([test[j]["parameters"].get("WB_std") for j in range(len(test))])
wb_h1 = np.array([test[j]["parameters"].get("WB_H1") for j in range(len(test))])
wb_h1_std = np.array([test[j]["parameters"].get("WB_H1_std") for j in range(len(test))])
kernel_wb = np.array([test[j]["parameters"].get("Kernel_WB") for j in range(len(test))])
kernel_wb_std = np.array(
[test[j]["parameters"].get("Kernel_WB_std") for j in range(len(test))]
)
cb = np.array([test[j]["parameters"].get("CB") for j in range(len(test))])
cb_std = np.array([test[j]["parameters"].get("CB_std") for j in range(len(test))])
cb2 = np.array([test[j]["parameters"].get("CB2") for j in range(len(test))])
cb2_std = np.array([test[j]["parameters"].get("CB2_std") for j in range(len(test))])
# pca shells coefficient
shell_compo = np.array([test[j]["parameters"].get("shell_fitted") for j in range(len(test))])
shell1 = np.array([test[j]["parameters"].get("shell_v1") for j in range(len(test))])
shell2 = np.array([test[j]["parameters"].get("shell_v2") for j in range(len(test))])
shell3 = np.array([test[j]["parameters"].get("shell_v3") for j in range(len(test))])
shell4 = np.array([test[j]["parameters"].get("shell_v4") for j in range(len(test))])
shell5 = np.array([test[j]["parameters"].get("shell_v5") for j in range(len(test))])
# pca rv vectors
pca_compo = np.array([test[j]["parameters"].get("pca_fitted") for j in range(len(test))])
pca1 = np.array([test[j]["parameters"].get("pca_v1") for j in range(len(test))])
pca2 = np.array([test[j]["parameters"].get("pca_v2") for j in range(len(test))])
pca3 = np.array([test[j]["parameters"].get("pca_v3") for j in range(len(test))])
pca4 = np.array([test[j]["parameters"].get("pca_v4") for j in range(len(test))])
pca5 = np.array([test[j]["parameters"].get("pca_v5") for j in range(len(test))])
# telluric ccf
vec_background = np.array(
[test[j]["parameters"].get("proxy_background") for j in range(len(test))]
)
# telluric ccf
tell_ew = np.array([test[j]["parameters"].get("telluric_ew") for j in range(len(test))])
tell_contrast = np.array(
[test[j]["parameters"].get("telluric_contrast") for j in range(len(test))]
)
tell_rv = np.array([test[j]["parameters"].get("telluric_rv") for j in range(len(test))])
tell_fwhm = np.array([test[j]["parameters"].get("telluric_fwhm") for j in range(len(test))])
tell_center = np.array(
[test[j]["parameters"].get("telluric_center") for j in range(len(test))]
)
tell_depth = np.array([test[j]["parameters"].get("telluric_depth") for j in range(len(test))])
# telluric ccf h2o
h2o_ew = np.array([test[j]["parameters"].get("h2o_ew") for j in range(len(test))])
h2o_contrast = np.array([test[j]["parameters"].get("h2o_contrast") for j in range(len(test))])
h2o_rv = np.array([test[j]["parameters"].get("h2o_rv") for j in range(len(test))])
h2o_fwhm = np.array([test[j]["parameters"].get("h2o_fwhm") for j in range(len(test))])
h2o_center = np.array([test[j]["parameters"].get("h2o_center") for j in range(len(test))])
h2o_depth = np.array([test[j]["parameters"].get("h2o_depth") for j in range(len(test))])
# telluric ccf o2
o2_ew = np.array([test[j]["parameters"].get("o2_ew") for j in range(len(test))])
o2_contrast = np.array([test[j]["parameters"].get("o2_contrast") for j in range(len(test))])
o2_rv = np.array([test[j]["parameters"].get("o2_rv") for j in range(len(test))])
o2_fwhm = np.array([test[j]["parameters"].get("o2_fwhm") for j in range(len(test))])
o2_center = np.array([test[j]["parameters"].get("o2_center") for j in range(len(test))])
o2_depth = np.array([test[j]["parameters"].get("o2_depth") for j in range(len(test))])
# teff
t_eff = np.array([test[j]["parameters"].get("Teff") for j in range(len(test))])
t_eff_std = np.array([test[j]["parameters"].get("Teff_std") for j in range(len(test))])
# ghost
ghost = np.array([test[j]["parameters"].get("ghost") for j in range(len(test))])
# sas
sas = np.array([test[j]["parameters"].get("SAS") for j in range(len(test))])
sas_std = np.array([test[j]["parameters"].get("SAS_std") for j in range(len(test))])
sas1y = np.array([test[j]["parameters"].get("SAS1Y") for j in range(len(test))])
sas1y_std = np.array([test[j]["parameters"].get("SAS1Y_std") for j in range(len(test))])
bis = np.array([test[j]["parameters"].get("BIS") for j in range(len(test))])
bis_std = np.array([test[j]["parameters"].get("BIS_std") for j in range(len(test))])
bis2 = np.array([test[j]["parameters"].get("BIS2") for j in range(len(test))])
bis2_std = np.array([test[j]["parameters"].get("BIS2_std") for j in range(len(test))])
# parabola ccf
try:
para_rv = np.array([test[j]["ccf_parabola"].get("para_rv") for j in range(len(test))])
para_depth = np.array(
[test[j]["ccf_parabola"].get("para_depth") for j in range(len(test))]
)
except KeyError:
para_rv = np.zeros(len(snr))
para_depth = np.zeros(len(snr))
# gaussian ccf
try:
ccf_ew = np.array([test[j]["ccf"].get("ew") for j in range(len(test))])
ccf_contrast = np.array(
[test[j]["ccf_gaussian"].get("contrast") for j in range(len(test))]
)
ccf_contrast_std = np.array(
[test[j]["ccf_gaussian"].get("contrast_std") for j in range(len(test))]
)
ccf_rv = np.array([test[j]["ccf_gaussian"].get("rv") for j in range(len(test))])
ccf_rv_std = np.array([test[j]["ccf_gaussian"].get("rv_std") for j in range(len(test))])
ccf_fwhm = (
np.array([test[j]["ccf_gaussian"].get("fwhm") for j in range(len(test))]) * 2.355
)
ccf_fwhm_std = (
np.array([test[j]["ccf_gaussian"].get("fwhm_std") for j in range(len(test))]) * 2.355
)
ccf_offset = np.array([test[j]["ccf_gaussian"].get("offset") for j in range(len(test))])
ccf_offset_std = np.array(
[test[j]["ccf_gaussian"].get("offset_std") for j in range(len(test))]
)
ccf_vspan = np.array([test[j]["ccf_gaussian"].get("vspan") for j in range(len(test))])
ccf_vspan_std = np.array([test[j]["ccf_gaussian"].get("rv_std") for j in range(len(test))])
ccf_svrad_phot = np.array(
[test[j]["ccf_gaussian"].get("rv_std_phot") for j in range(len(test))]
)
except KeyError:
ccf_ew = np.zeros(len(snr))
ccf_contrast = np.zeros(len(snr))
ccf_contrast_std = np.zeros(len(snr))
ccf_rv = np.zeros(len(snr))
ccf_rv_std = np.zeros(len(snr))
ccf_fwhm = np.zeros(len(snr))
ccf_fwhm_std = np.zeros(len(snr))
ccf_offset = np.zeros(len(snr))
ccf_offset_std = np.zeros(len(snr))
ccf_vspan = np.zeros(len(snr))
ccf_vspan_std = np.zeros(len(snr))
ccf_svrad_phot = np.zeros(len(snr))
dico = pd.DataFrame(
{
"filename": names,
"ins": ins,
"snr": snr,
"snr_computed": snr2,
"flux_balance": flux_balance,
"jdb": jdb,
"mjd": mjd,
"berv": berv,
"berv_planet": berv_planet,
"lamp_offset": lamp_offset,
"rv": rv,
"rv_shift": rv_shift,
"rv_sec": rv_sec,
"rv_moon": rv_moon,
"rv_planet": rv_planet,
"rv_dace": rv_dace,
"rv_dace_std": rv_dace_std,
"rv_drift": night_drift,
"rv_drift_std": night_drift_std,
"airmass": airmass,
"texp": texp,
"seeing": seeing,
"humidity": humidity,
"transit_in": transit,
"RHK": rhk,
"RHK_std": rhk_std,
"Kernel_CaII": kernel_rhk,
"Kernel_CaII_std": kernel_rhk_std,
"CaIIK": ca2k,
"CaIIK_std": ca2k_std,
"CaIIH": ca2h,
"CaIIH_std": ca2h_std,
"CaII": ca2,
"CaII_std": ca2_std,
"CaI": ca1,
"CaI_std": ca1_std,
"MgI": mg1,
"MgI_std": mg1_std,
"MgIa": mg1a,
"MgIa_std": mg1a_std,
"MgIb": mg1b,
"MgIb_std": mg1b_std,
"MgIc": mg1c,
"MgIc_std": mg1c_std,
"HeID3": d3,
"HeID3_std": d3_std,
"NaD": nad,
"NaD_std": nad_std,
"NaD1": nad1,
"NaD1_std": nad1_std,
"NaD2": nad2,
"NaD2_std": nad2_std,
"Ha": ha,
"Ha_std": ha_std,
"Hb": hb,
"Hb_std": hb_std,
"Hc": hc,
"Hc_std": hc_std,
"Hd": hd,
"Hd_std": hd_std,
"Heps": heps,
"Heps_std": heps_std,
"WBK": wbk,
"WBK_std": wbk_std,
"WBH": wbh,
"WBH_std": wbh_std,
"WB": wb,
"WB_std": wb_std,
"WB_H1": wb_h1,
"WB_H1_std": wb_h1_std,
"Kernel_WB": kernel_wb,
"Kernel_WB_std": kernel_wb_std,
"CB": cb,
"CB_std": cb_std,
"CB2": cb2,
"CB2_std": cb2_std,
"shell_fitted": shell_compo,
"shell1": shell1,
"shell2": shell2,
"shell3": shell3,
"shell4": shell4,
"shell5": shell5,
"pca_fitted": pca_compo,
"pca1": pca1,
"pca2": pca2,
"pca3": pca3,
"pca4": pca4,
"pca5": pca5,
"telluric_ew": tell_ew,
"telluric_contrast": tell_contrast,
"telluric_rv": tell_rv,
"telluric_fwhm": tell_fwhm,
"telluric_center": tell_center,
"telluric_depth": tell_depth,
"h2o_ew": h2o_ew,
"h2o_contrast": h2o_contrast,
"h2o_rv": h2o_rv,
"h2o_fwhm": h2o_fwhm,
"h2o_center": h2o_center,
"h2o_depth": h2o_depth,
"o2_ew": o2_ew,
"o2_contrast": o2_contrast,
"o2_rv": o2_rv,
"o2_fwhm": o2_fwhm,
"o2_center": o2_center,
"o2_depth": o2_depth,
"proxy_background": vec_background,
"ccf_rv_para": para_rv,
"ccf_depth_para": para_depth,
"ccf_ew": ccf_ew,
"ccf_contrast": ccf_contrast,
"ccf_contrast_std": ccf_contrast_std,
"ccf_vspan": ccf_vspan,
"ccf_vspan_std": ccf_vspan_std,
"ccf_rv": ccf_rv,
"ccf_rv_std": ccf_rv_std,
"ccf_rv_std_phot": ccf_svrad_phot,
"ccf_fwhm": ccf_fwhm,
"ccf_fwhm_std": ccf_fwhm_std,
"ccf_offset": ccf_offset,
"ccf_offset_std": ccf_offset_std,
"Teff": t_eff,
"Teff_std": t_eff_std,
"ghost": ghost,
"SAS": sas,
"SAS_std": sas_std,
"SAS1Y": sas1y,
"SAS1Y_std": sas1y_std,
"BIS": bis,
"BIS_std": bis_std,
"BIS2": bis2,
"BIS2_std": bis2_std,
}
)
if os.path.exists(directory + "Analyse_summary.p"):
summary = pd.read_pickle(directory + "Analyse_summary.p")
merge = summary.merge(dico, on="filename", how="outer", indicator=True)
update = pd.DataFrame({"filename": np.array(merge["filename"])})
merge = merge.drop(columns="filename")
summary = summary.drop(columns="filename")
dico = dico.drop(columns="filename")
all_keys = np.array(summary.keys())
for j in all_keys:
try:
merge.loc[merge["_merge"] != "left_only", j + "_x"] = merge.loc[
merge["_merge"] != "left_only", j + "_y"
]
update[j] = merge[j + "_x"]
except KeyError:
update[j] = merge[j]
all_keys = np.setdiff1d(np.array(dico.keys()), np.array(summary.keys()))
if len(all_keys) != 0:
for j in all_keys:
update[j] = merge[j]
dico = update
dico = dico.sort_values(by="jdb").reset_index(drop=True)
io.save_pickle(directory + "/Analyse_summary.p", dico)
dico.to_csv(directory + "/Analyse_summary.csv")
print("\n [INFO] Summary table updated")
# ===
# Other things
# ===
[docs]def yarara_obs_info(
self: spec_time_series,
kw: DataFrame=[None, None],
jdb: None=None,
berv: None=None,
rv: None=None,
airmass: None=None,
texp: None=None,
seeing: None=None,
humidity: None=None,
) -> None:
"""
Add some observationnal information in the RASSINE files and produce a summary table
Parameters
----------
kw: list-like with format [keyword,array]
jdb : array-like with same size than the number of files in the directory
berv : array-like with same size than the number of files in the directory
rv : array-like with same size than the number of files in the directory
airmass : array-like with same size than the number of files in the directory
texp : array-like with same size than the number of files in the directory
seeing : array-like with same size than the number of files in the directory
humidity : array-like with same size than the number of files in the directory
"""
directory = self.directory
files = glob.glob(directory + "RASSI*.p")
files = np.sort(files)
nb = len(files)
if type(kw) == pd.core.frame.DataFrame: # in case of a pandas dataframe
kw = [list(kw.keys()), [i for i in np.array(kw).T]]
else:
try:
if len(kw[1]) == 1:
kw[1] = [kw[1][0]] * nb
except TypeError:
kw[1] = [kw[1]] * nb
kw[0] = [kw[0]]
kw[1] = [kw[1]]
for i, j in enumerate(files):
file = pd.read_pickle(j)
for kw1, kw2 in zip(kw[0], kw[1]):
if kw1 is not None:
if len(kw1.split("ccf_")) - 1:
file["ccf_gaussian"][kw1.split("ccf_")[1]] = kw2[i]
else:
file["parameters"][kw1] = kw2[i]
if jdb is not None:
file["parameters"]["jdb"] = jdb[i]
if berv is not None:
file["parameters"]["berv"] = berv[i]
if rv is not None:
file["parameters"]["rv"] = rv[i]
if airmass is not None:
file["parameters"]["airmass"] = airmass[i]
if texp is not None:
file["parameters"]["texp"] = texp[i]
if seeing is not None:
file["parameters"]["seeing"] = seeing[i]
if humidity is not None:
file["parameters"]["humidity"] = humidity[i]
io.save_pickle(j, file)
self.yarara_analyse_summary()
[docs]def supress_time_spectra(
self: spec_time_series,
liste: Optional[ndarray]=None,
jdb_min: None=None,
jdb_max: None=None,
num_min: None=None,
num_max: None=None,
supress: bool=False,
name_ext: str="temp",
) -> None:
"""
Supress spectra according to time
Parameters
----------
jdb or num are both inclusive in lower and upper limit
snr_cutoff : treshold value of the cutoff below which spectra are removed
supress : True/False to delete of simply hide the files
"""
self.import_table()
jdb = self.table["jdb"]
name = self.table["filename"]
directory = "/".join(np.array(name)[0].split("/")[:-1])
if num_min is not None:
jdb_min = jdb[num_min]
if num_max is not None:
jdb_max = jdb[num_max]
if jdb_min is None:
jdb_min = jdb_max
if jdb_max is None:
jdb_max = jdb_min
if (num_min is None) & (num_max is None) & (jdb_min is None) & (jdb_max is None):
jdb_min = 1e9
jdb_max = -1e9
if liste is None:
mask = np.array((jdb >= jdb_min) & (jdb <= jdb_max))
else:
if type(liste[0]) == np.bool_:
mask = liste
else:
mask = np.in1d(np.arange(len(jdb)), liste)
if sum(mask):
idx = np.arange(len(jdb))[mask]
print(" [INFO] Following spectrum indices will be supressed : ", idx)
print(" [INFO] Number of spectrum supressed : %.0f \n" % (sum(mask)))
maps = glob.glob(self.dir_root + "CORRECTION_MAP/*.p")
if len(maps):
for names in maps:
correction_map = pd.read_pickle(names)
correction_map["correction_map"] = np.delete(
correction_map["correction_map"], idx, axis=0
)
io.pickle_dump(correction_map, open(names, "wb"))
print("%s modified" % (names.split("/")[-1]))
name = name[mask]
files_to_process = np.sort(np.array(name))
for j in files_to_process:
if supress:
print("File deleted : %s " % (j))
os.system("rm " + j)
else:
new_name = name_ext + "_" + j.split("/")[-1]
print("File renamed : %s " % (directory + "/" + new_name))
os.system("mv " + j + " " + directory + "/" + new_name)
os.system("rm " + directory + "/Analyse_summary.p")
os.system("rm " + directory + "/Analyse_summary.csv")
self.yarara_analyse_summary()
self.supress_time_RV(mask)