From fd0b5820a40b9175c91e4f577dc7b58d6b1f93f3 Mon Sep 17 00:00:00 2001 From: Mads Christian Lund Date: Wed, 20 Sep 2023 09:31:25 +0200 Subject: [PATCH] Finished StaticQC module * static_qc default threshold * Added debug logging to static_qc.py * Cleaned whitespaces in L1toL2.py --- src/pypromice/process/L1toL2.py | 436 ++++++++++++++--------------- src/pypromice/qc/static_qc.py | 66 ++++- src/pypromice/qc/static_qc_test.py | 218 ++++++++------- 3 files changed, 376 insertions(+), 344 deletions(-) diff --git a/src/pypromice/process/L1toL2.py b/src/pypromice/process/L1toL2.py index 79d08833..2dd1f024 100644 --- a/src/pypromice/process/L1toL2.py +++ b/src/pypromice/process/L1toL2.py @@ -12,10 +12,7 @@ from pypromice.qc.static_qc import apply_static_qc -# from IPython import embed - -def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., - eps_clear=9.36508e-6, emissivity=0.97): +def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., eps_clear=9.36508e-6, emissivity=0.97): '''Process one Level 1 (L1) product to Level 2 Parameters @@ -25,10 +22,10 @@ def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., T_0 : float, optional Ice point temperature in K. The default is 273.15. ews : float, optional - Saturation pressure (normal atmosphere) at steam point temperature. + Saturation pressure (normal atmosphere) at steam point temperature. The default is 1013.246. ei0 : float, optional - Saturation pressure (normal atmosphere) at ice-point temperature. The + Saturation pressure (normal atmosphere) at ice-point temperature. The default is 6.1071. eps_overcast : int, optional Cloud overcast. The default is 1.. @@ -47,86 +44,83 @@ def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., ds = adjustTime(ds) # Adjust time after a user-defined csv files ds = flagNAN(ds) # Flag NaNs after a user-defined csv files ds = adjustData(ds) # Adjust data after a user-defined csv files - except Exception as e: + except Exception as e: print('Flagging and fixing failed:') print(e) - - #stid = ds.station_id - - - + + ds = apply_static_qc(ds) # Flag and remove percentile outliers - T_100 = _getTempK(T_0) - ds['rh_u_cor'] = correctHumidity(ds['rh_u'], ds['t_u'], - T_0, T_100, ews, ei0) - + T_100 = _getTempK(T_0) + ds['rh_u_cor'] = correctHumidity(ds['rh_u'], ds['t_u'], + T_0, T_100, ews, ei0) + # Determiune cloud cover for on-ice stations if not ds.attrs['bedrock']: cc = calcCloudCoverage(ds['t_u'], T_0, eps_overcast, eps_clear, # Calculate cloud coverage - ds['dlr'], ds.attrs['station_id']) + ds['dlr'], ds.attrs['station_id']) ds['cc'] = (('time'), cc.data) else: # Default cloud cover for bedrock station for which tilt should be 0 anyway. cc = 0.8 - + # Determine surface temperature ds['t_surf'] = calcSurfaceTemperature(T_0, ds['ulr'], ds['dlr'], # Calculate surface temperature emissivity) if not ds.attrs['bedrock']: ds['t_surf'] = ds['t_surf'].where(ds['t_surf'] <= 0, other = 0) - - # Determine station position relative to sun + + # Determine station position relative to sun doy = ds['time'].to_dataframe().index.dayofyear.values # Gather variables to calculate sun pos hour = ds['time'].to_dataframe().index.hour.values minute = ds['time'].to_dataframe().index.minute.values - + if hasattr(ds, 'latitude') and hasattr(ds, 'longitude'): lat = ds.attrs['latitude'] # TODO Why is mean GPS lat lon not preferred for calcs? lon = ds.attrs['longitude'] else: lat = ds['gps_lat'].mean() lon = ds['gps_lon'].mean() - - deg2rad, rad2deg = _getRotation() # Get degree-radian conversions - phi_sensor_rad, theta_sensor_rad = calcTilt(ds['tilt_x'], ds['tilt_y'], # Calculate station tilt - deg2rad) - + + deg2rad, rad2deg = _getRotation() # Get degree-radian conversions + phi_sensor_rad, theta_sensor_rad = calcTilt(ds['tilt_x'], ds['tilt_y'], # Calculate station tilt + deg2rad) + Declination_rad = calcDeclination(doy, hour, minute) # Calculate declination - HourAngle_rad = calcHourAngle(hour, minute, lon) # Calculate hour angle + HourAngle_rad = calcHourAngle(hour, minute, lon) # Calculate hour angle ZenithAngle_rad, ZenithAngle_deg = calcZenith(lat, Declination_rad, # Calculate zenith - HourAngle_rad, deg2rad, + HourAngle_rad, deg2rad, rad2deg) - + # Correct Downwelling shortwave radiation - DifFrac = 0.2 + 0.8 * cc + DifFrac = 0.2 + 0.8 * cc CorFac_all = calcCorrectionFactor(Declination_rad, phi_sensor_rad, # Calculate correction - theta_sensor_rad, HourAngle_rad, - ZenithAngle_rad, ZenithAngle_deg, - lat, DifFrac, deg2rad) + theta_sensor_rad, HourAngle_rad, + ZenithAngle_rad, ZenithAngle_deg, + lat, DifFrac, deg2rad) ds['dsr_cor'] = ds['dsr'].copy(deep=True) * CorFac_all # Apply correction AngleDif_deg = calcAngleDiff(ZenithAngle_rad, HourAngle_rad, # Calculate angle between sun and sensor - phi_sensor_rad, theta_sensor_rad) - + phi_sensor_rad, theta_sensor_rad) + ds['albedo'], OKalbedos = calcAlbedo(ds['usr'], ds['dsr_cor'], # Determine albedo AngleDif_deg, ZenithAngle_deg) # Correct upwelling and downwelling shortwave radiation sunonlowerdome =(AngleDif_deg >= 90) & (ZenithAngle_deg <= 90) # Determine when sun is in FOV of lower sensor, assuming sensor measures only diffuse radiation - ds['dsr_cor'] = ds['dsr_cor'].where(~sunonlowerdome, + ds['dsr_cor'] = ds['dsr_cor'].where(~sunonlowerdome, other=ds['dsr'] / DifFrac) # Apply to downwelling ds['usr_cor'] = ds['usr'].copy(deep=True) - ds['usr_cor'] = ds['usr_cor'].where(~sunonlowerdome, + ds['usr_cor'] = ds['usr_cor'].where(~sunonlowerdome, other=ds['albedo'] * ds['dsr'] / DifFrac) # Apply to upwelling bad = (ZenithAngle_deg > 95) | (ds['dsr_cor'] <= 0) | (ds['usr_cor'] <= 0) # Set to zero for solar zenith angles larger than 95 deg or either values are (less than) zero ds['dsr_cor'][bad] = 0 ds['usr_cor'][bad] = 0 - ds['dsr_cor'] = ds['usr_cor'].copy(deep=True) / ds['albedo'] # Correct DWR using more reliable USWR when sun not in sight of upper sensor - ds['albedo'] = ds['albedo'].where(OKalbedos) #TODO remove? - + ds['dsr_cor'] = ds['usr_cor'].copy(deep=True) / ds['albedo'] # Correct DWR using more reliable USWR when sun not in sight of upper sensor + ds['albedo'] = ds['albedo'].where(OKalbedos) #TODO remove? + # Remove data where TOA shortwave radiation invalid - isr_toa = calcTOA(ZenithAngle_deg, ZenithAngle_rad) # Calculate TOA shortwave radiation + isr_toa = calcTOA(ZenithAngle_deg, ZenithAngle_rad) # Calculate TOA shortwave radiation TOA_crit_nopass = (ds['dsr_cor'] > (0.9 * isr_toa + 10)) # Determine filter ds['dsr_cor'][TOA_crit_nopass] = np.nan # Apply filter and interpolate ds['usr_cor'][TOA_crit_nopass] = np.nan @@ -135,36 +129,35 @@ def toL2(L1, T_0=273.15, ews=1013.246, ei0=6.1071, eps_overcast=1., # # Check sun position # sundown = ZenithAngle_deg >= 90 - # _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass) + # _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass) if hasattr(ds, 'correct_precip'): # Correct precipitation precip_flag=ds.attrs['correct_precip'] else: precip_flag=True - if ~ds['precip_u'].isnull().all() and precip_flag: - ds['precip_u_cor'], ds['precip_u_rate'] = correctPrecip(ds['precip_u'], + if ~ds['precip_u'].isnull().all() and precip_flag: + ds['precip_u_cor'], ds['precip_u_rate'] = correctPrecip(ds['precip_u'], ds['wspd_u']) - if ds.attrs['number_of_booms']==2: + if ds.attrs['number_of_booms']==2: ds['rh_l_cor'] = correctHumidity(ds['rh_l'], ds['t_l'], # Correct relative humidity - T_0, T_100, ews, ei0) - + T_0, T_100, ews, ei0) + if ~ds['precip_l'].isnull().all() and precip_flag: # Correct precipitation - ds['precip_l_cor'], ds['precip_l_rate']= correctPrecip(ds['precip_l'], + ds['precip_l_cor'], ds['precip_l_rate']= correctPrecip(ds['precip_l'], ds['wspd_l']) - - if hasattr(ds,'t_i'): + + if hasattr(ds,'t_i'): if ~ds['t_i'].isnull().all(): # Instantaneous msg processing ds['rh_i_cor'] = correctHumidity(ds['rh_i'], ds['t_i'], # Correct relative humidity - T_0, T_100, ews, ei0) + T_0, T_100, ews, ei0) return ds - def flagNAN(ds_in, flag_url='https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/flags/', flag_dir='local/flags/'): - '''Read flagged data from .csv file. For each variable, and downstream + '''Read flagged data from .csv file. For each variable, and downstream dependents, flag as invalid (or other) if set in the flag .csv - + Parameters ---------- ds_in : xr.Dataset @@ -173,7 +166,7 @@ def flagNAN(ds_in, URL to directory where .csv flag files can be found flag_dir : str File directory where .csv flag files can be found - + Returns ------- ds : xr.Dataset @@ -181,20 +174,20 @@ def flagNAN(ds_in, ''' ds = ds_in.copy() df = None - + df = _getDF(flag_url + ds.attrs["station_id"] + ".csv", os.path.join(flag_dir, ds.attrs["station_id"] + ".csv"), # download = False, # only for working on draft local flag'n'fix files ) - + if isinstance(df, pd.DataFrame): df.t0 = pd.to_datetime(df.t0).dt.tz_localize(None) df.t1 = pd.to_datetime(df.t1).dt.tz_localize(None) - - if df.shape[0] > 0: + + if df.shape[0] > 0: for i in df.index: t0, t1, avar = df.loc[i,['t0','t1','variable']] - + if avar == '*': # Set to all vars if var is "*" varlist = list(ds.keys()) @@ -202,31 +195,31 @@ def flagNAN(ds_in, # Reads as regex if contains "*" and other characters (e.g. 't_i_.*($)') varlist = pd.DataFrame(columns = list(ds.keys())).filter(regex=(avar)).columns else: - varlist = avar.split() - + varlist = avar.split() + if 'time' in varlist: varlist.remove("time") - + # Set to all times if times are "n/a" - if pd.isnull(t0): + if pd.isnull(t0): t0 = ds['time'].values[0] - if pd.isnull(t1): + if pd.isnull(t1): t1 = ds['time'].values[-1] - + for v in varlist: if v in list(ds.keys()): print('---> flagging',t0, t1, v) ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1)) else: - print('---> could not flag', v,', not in dataset') + print('---> could not flag', v,', not in dataset') return ds -def adjustTime(ds, - adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", - adj_dir='local/adjustments/', +def adjustTime(ds, + adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", + adj_dir='local/adjustments/', var_list=[], skip_var=[]): '''Read adjustment data from .csv file. Only applies the "time_shift" adjustment - + Parameters ---------- ds : xr.Dataset @@ -235,7 +228,7 @@ def adjustTime(ds, URL to directory where .csv adjustment files can be found adj_dir : str File directory where .csv adjustment files can be found - + Returns ------- ds : xr.Dataset @@ -243,22 +236,22 @@ def adjustTime(ds, ''' ds_out = ds.copy() adj_info=None - + adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"), # download = False, verbose = False) - + if isinstance(adj_info, pd.DataFrame): - + if "time_shift" in adj_info.adjust_function.values: time_shifts = adj_info.loc[adj_info.adjust_function == "time_shift", :] # if t1 is left empty, then adjustment is applied until the end of the file time_shifts.loc[time_shifts.t1.isnull(), "t1"] = pd.to_datetime(ds_out.time.values[-1]).isoformat() time_shifts.t0 = pd.to_datetime(time_shifts.t0).dt.tz_localize(None) time_shifts.t1 = pd.to_datetime(time_shifts.t1).dt.tz_localize(None) - + for t0, t1, val in zip( time_shifts.t0, time_shifts.t1, @@ -266,12 +259,12 @@ def adjustTime(ds, ): ds_shifted = ds_out.sel(time=slice(t0,t1)) ds_shifted['time'] = ds_shifted.time.values + pd.Timedelta(days = val) - + # here we concatenate what was before the shifted part, the shifted # part and what was after the shifted part - # note that if any data was already present in the target period + # note that if any data was already present in the target period # (where the data lands after the shift), it is overwritten - + ds_out = xr.concat( ( ds_out.sel(time=slice(pd.to_datetime(ds_out.time.values[0]), @@ -288,13 +281,13 @@ def adjustTime(ds, return ds_out -def adjustData(ds, - adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", - adj_dir='local/adjustments/', +def adjustData(ds, + adj_url="https://raw.githubusercontent.com/GEUS-Glaciology-and-Climate/PROMICE-AWS-data-issues/master/adjustments/", + adj_dir='local/adjustments/', var_list=[], skip_var=[]): - '''Read adjustment data from .csv file. For each variable, and downstream + '''Read adjustment data from .csv file. For each variable, and downstream dependents, adjust data accordingly if set in the adjustment .csv - + Parameters ---------- ds : xr.Dataset @@ -303,30 +296,30 @@ def adjustData(ds, URL to directory where .csv adjustment files can be found adj_dir : str File directory where .csv adjustment files can be found - + Returns ------- ds : xr.Dataset Level 0 data with flagged data ''' ds_out = ds.copy() - adj_info=None + adj_info=None adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"), # download = False, # only for working on draft local flag'n'fix files ) - + if isinstance(adj_info, pd.DataFrame): # removing potential time shifts from the adjustment list adj_info = adj_info.loc[adj_info.adjust_function != "time_shift", :] - + # if t1 is left empty, then adjustment is applied until the end of the file adj_info.loc[adj_info.t0.isnull(), "t0"] = ds_out.time.values[0] adj_info.loc[adj_info.t1.isnull(), "t1"] = ds_out.time.values[-1] # making all timestamps timezone naive (compatibility with xarray) adj_info.t0 = pd.to_datetime(adj_info.t0).dt.tz_localize(None) adj_info.t1 = pd.to_datetime(adj_info.t1).dt.tz_localize(None) - + # if "*" is in the variable name then we interpret it as regex selec = adj_info['variable'].str.contains('\*') & (adj_info['variable'] != "*") for ind in adj_info.loc[selec, :].index: @@ -337,20 +330,20 @@ def adjustData(ds, line_template.name = adj_info.index.max() + 1 adj_info = pd.concat((adj_info, line_template.to_frame().transpose()),axis=0) adj_info = adj_info.drop(labels=ind, axis=0) - + adj_info = adj_info.sort_values(by=["variable", "t0"]) adj_info.set_index(["variable", "t0"], drop=False, inplace=True) - + if len(var_list) == 0: var_list = np.unique(adj_info.variable) else: adj_info = adj_info.loc[np.isin(adj_info.variable, var_list), :] var_list = np.unique(adj_info.variable) - + if len(skip_var) > 0: adj_info = adj_info.loc[~np.isin(adj_info.variable, skip_var), :] var_list = np.unique(adj_info.variable) - + for var in var_list: if var not in list(ds_out.keys()): print('could not adjust',var,', not in dataset') @@ -372,7 +365,7 @@ def adjustData(ds, # msk = ds_out[var].loc[dict(time=slice(t0, t1))])].notnull() # ind = ds_out[var].loc[dict(time=slice(t0, t1))])].loc[msk].time # ds_out.loc[ind, var + "_adj_flag"] = 1 - + if func == "multiply": ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values * val if "DW" in var: @@ -383,16 +376,16 @@ def adjustData(ds, # msk = ds_out[var].loc[dict(time=slice(t0, t1))].notnull() # ind = ds_out[var].loc[dict(time=slice(t0, t1))].loc[msk].time # ds_out.loc[ind, var + "_adj_flag"] = 1 - + if func == "min_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values tmp[tmp < val] = np.nan - + if func == "max_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values tmp[tmp > val] = np.nan ds_out[var].loc[dict(time=slice(t0, t1))] = tmp - + if func == "upper_perc_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").quantile(1 - val / 100) @@ -402,9 +395,9 @@ def adjustData(ds, values_month = tmp.loc[msk].values values_month[values_month < df_w.loc[m_start]] = np.nan tmp.loc[msk] = values_month - + ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - + if func == "biweekly_upper_range_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() df_max = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").max() @@ -422,33 +415,33 @@ def adjustData(ds, tmp.loc[msk] = values_month # updating original pandas ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - + if func == "hampel_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))] tmp = _hampel(tmp, k=7 * 24, t0=val) ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values - + if func == "grad_filter": tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() msk = ds_out[var].loc[dict(time=slice(t0, t1))].copy().diff() tmp[np.roll(msk.abs() > val, -1)] = np.nan ds_out[var].loc[dict(time=slice(t0, t1))] = tmp - + if "swap_with_" in func: var2 = func[10:] val_var = ds_out[var].loc[dict(time=slice(t0, t1))].values.copy() val_var2 = ds_out[var2].loc[dict(time=slice(t0, t1))].values.copy() ds_out[var2].loc[dict(time=slice(t0, t1))] = val_var ds_out[var].loc[dict(time=slice(t0, t1))] = val_var2 - + if func == "rotate": ds_out[var].loc[dict(time=slice(t0, t1))] = (ds_out[var].loc[dict(time=slice(t0, t1))].values + val) % 360 - + return ds_out def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): '''Calculate cloud cover from T and T_0 - + Parameters ---------- T : xarray.DataArray @@ -466,7 +459,7 @@ def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): overcast and cloud clear assumptions are pre-defined. Currently KAN_M and KAN_U are special cases, but this will need to be done for all stations eventually - + Returns ------- cc : xarray.DataArray @@ -479,17 +472,17 @@ def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): LR_overcast = 305 + 4*T LR_clear = 220 + 3.5*T else: - LR_overcast = eps_overcast * 5.67e-8 *(T + T_0)**4 - LR_clear = eps_clear * 5.67e-8 * (T + T_0)**6 + LR_overcast = eps_overcast * 5.67e-8 *(T + T_0)**4 + LR_clear = eps_clear * 5.67e-8 * (T + T_0)**6 cc = (dlr - LR_clear) / (LR_overcast - LR_clear) cc[cc > 1] = 1 cc[cc < 0] = 0 return cc def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): - '''Calculate surface temperature from air temperature, upwelling and + '''Calculate surface temperature from air temperature, upwelling and downwelling radiation and emissivity - + Parameters ---------- T_0 : xarray.DataArray @@ -500,7 +493,7 @@ def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): Downwelling longwave radiation emissivity : int Assumed emissivity - + Returns ------- xarray.DataArray @@ -508,10 +501,10 @@ def calcSurfaceTemperature(T_0, ulr, dlr, emissivity): ''' t_surf = ((ulr - (1 - emissivity) * dlr) / emissivity / 5.67e-8)**0.25 - T_0 return t_surf - + def calcTilt(tilt_x, tilt_y, deg2rad): '''Calculate station tilt - + Parameters ---------- tilt_x : xarray.DataArray @@ -520,7 +513,7 @@ def calcTilt(tilt_x, tilt_y, deg2rad): Y tilt inclinometer measurements deg2rad : float Degrees to radians conversion - + Returns ------- phi_sensor_rad : xarray.DataArray @@ -531,12 +524,12 @@ def calcTilt(tilt_x, tilt_y, deg2rad): # Tilt as radians tx = tilt_x * deg2rad ty = tilt_y * deg2rad - + # Calculate cartesian coordinates X = np.sin(tx) * np.cos(tx) * np.sin(ty)**2 + np.sin(tx) * np.cos(ty)**2 Y = np.sin(ty) * np.cos(ty) * np.sin(tx)**2 + np.sin(ty) * np.cos(tx)**2 Z = np.cos(tx) * np.cos(ty) + np.sin(tx)**2 * np.sin(ty)**2 - + # Calculate spherical coordinates phi_sensor_rad = -np.pi /2 - np.arctan(Y/X) phi_sensor_rad[X > 0] += np.pi @@ -545,10 +538,10 @@ def calcTilt(tilt_x, tilt_y, deg2rad): phi_sensor_rad[phi_sensor_rad < 0] += 2*np.pi # Total tilt of the sensor, i.e. 0 when horizontal - theta_sensor_rad = np.arccos(Z / (X**2 + Y**2 + Z**2)**0.5) + theta_sensor_rad = np.arccos(Z / (X**2 + Y**2 + Z**2)**0.5) # phi_sensor_deg = phi_sensor_rad * rad2deg #TODO take these out if not needed # theta_sensor_deg = theta_sensor_rad * rad2deg - return phi_sensor_rad, theta_sensor_rad + return phi_sensor_rad, theta_sensor_rad def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO figure out if T replicate is needed '''Correct relative humidity using Groff & Gratch method, where values are @@ -568,15 +561,15 @@ def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO f Saturation pressure (normal atmosphere) at steam point temperature ei0 : float Saturation pressure (normal atmosphere) at ice-point temperature - + Returns ------- rh_cor : xarray.DataArray Corrected relative humidity - ''' - # Convert to hPa (Groff & Gratch) + ''' + # Convert to hPa (Groff & Gratch) e_s_wtr = 10**(-7.90298 * (T_100 / (T + T_0) - 1) - + 5.02808 * np.log10(T_100 / (T + T_0)) + + 5.02808 * np.log10(T_100 / (T + T_0)) - 1.3816E-7 * (10**(11.344 * (1 - (T + T_0) / T_100)) - 1) + 8.1328E-3 * (10**(-3.49149 * (T_100/(T + T_0) - 1)) -1) + np.log10(ews)) @@ -584,38 +577,38 @@ def correctHumidity(rh, T, T_0, T_100, ews, ei0): #TODO f - 3.56654 * np.log10(T_0 / (T + T_0)) + 0.876793 * (1 - (T + T_0) / T_0) + np.log10(ei0)) - - # Define freezing point. Why > -100? - freezing = (T < 0) & (T > -100).values - # Set to Groff & Gratch values when freezing, otherwise just rh - rh_cor = rh.where(~freezing, other = rh*(e_s_wtr / e_s_ice)) - return rh_cor + # Define freezing point. Why > -100? + freezing = (T < 0) & (T > -100).values + + # Set to Groff & Gratch values when freezing, otherwise just rh + rh_cor = rh.where(~freezing, other = rh*(e_s_wtr / e_s_ice)) + return rh_cor def correctPrecip(precip, wspd): - '''Correct precipitation with the undercatch correction method used in + '''Correct precipitation with the undercatch correction method used in Yang et al. (1999) and Box et al. (2022), based on Goodison et al. (1998) - - Yang, D., Ishida, S., Goodison, B. E., and Gunther, T.: Bias correction of - daily precipitation measurements for Greenland, + + Yang, D., Ishida, S., Goodison, B. E., and Gunther, T.: Bias correction of + daily precipitation measurements for Greenland, https://doi.org/10.1029/1998jd200110, 1999. - + Box, J., Wehrle, A., van As, D., Fausto, R., Kjeldsen, K., Dachauer, A., - Ahlstrom, A. P., and Picard, G.: Greenland Ice Sheet rainfall, heat and - albedo feedback imapacts from the Mid-August 2021 atmospheric river, - Geophys. Res. Lett. 49 (11), e2021GL097356, + Ahlstrom, A. P., and Picard, G.: Greenland Ice Sheet rainfall, heat and + albedo feedback imapacts from the Mid-August 2021 atmospheric river, + Geophys. Res. Lett. 49 (11), e2021GL097356, https://doi.org/10.1029/2021GL097356, 2022. - - Goodison, B. E., Louie, P. Y. T., and Yang, D.: Solid Precipitation + + Goodison, B. E., Louie, P. Y. T., and Yang, D.: Solid Precipitation Measurement Intercomparison, WMO, 1998 - + Parameters ---------- precip : xarray.DataArray Cumulative precipitation measurements wspd : xarray.DataArray Wind speed measurements - + Returns ------- precip_cor : xarray.DataArray @@ -629,27 +622,27 @@ def correctPrecip(precip, wspd): # Fix all values below 1.02 to 1.02 corr = corr.where(corr>1.02, other=1.02) - # Fill nan values in precip with preceding value - precip = precip.ffill(dim='time') - + # Fill nan values in precip with preceding value + precip = precip.ffill(dim='time') + # Calculate precipitation rate precip_rate = precip.diff(dim='time', n=1) - + # Apply correction to rate precip_rate = precip_rate*corr - + # Flag rain bucket reset - precip_rate = precip_rate.where(precip_rate>-0.01, other=np.nan) + precip_rate = precip_rate.where(precip_rate>-0.01, other=np.nan) b = precip_rate.to_dataframe('precip_flag').notna().to_xarray() - + # Get corrected cumulative precipitation, reset if rain bucket flag precip_cor = precip_rate.cumsum()-precip_rate.cumsum().where(~b['precip_flag']).ffill(dim='time').fillna(0).astype(float) - + return precip_cor, precip_rate - + def calcDeclination(doy, hour, minute): '''Calculate sun declination based on time - + Parameters ---------- doy : int @@ -658,7 +651,7 @@ def calcDeclination(doy, hour, minute): Hour of day minute : int Minute of hour - + Returns ------- float @@ -677,7 +670,7 @@ def calcHourAngle(hour, minute, lon): '''Calculate hour angle of sun based on time and longitude. Make sure that time is set to UTC and longitude is positive when west. Hour angle should be 0 at noon - + Parameters ---------- hour : int @@ -686,7 +679,7 @@ def calcHourAngle(hour, minute, lon): Minute of hour lon : float Longitude - + Returns ------- float @@ -696,15 +689,15 @@ def calcHourAngle(hour, minute, lon): # ; - 15.*timezone/360.) def calcDirectionDeg(HourAngle_rad): #TODO remove if not plan to use this - '''Calculate sun direction as degrees. This is an alternative to + '''Calculate sun direction as degrees. This is an alternative to _calcHourAngle that is currently not implemented into the offical L0>>L3 workflow. Here, 180 degrees is at noon (NH), as opposed to HourAngle - + Parameters ---------- HourAngle_rad : float Sun hour angle in radians - + Returns ------- DirectionSun_deg @@ -717,7 +710,7 @@ def calcDirectionDeg(HourAngle_rad): #T def calcZenith(lat, Declination_rad, HourAngle_rad, deg2rad, rad2deg): '''Calculate sun zenith in radians and degrees - + Parameters ---------- lat : float @@ -730,7 +723,7 @@ def calcZenith(lat, Declination_rad, HourAngle_rad, deg2rad, rad2deg): Degrees to radians conversion rad2deg : float Radians to degrees conversion - + Returns ------- ZenithAngle_rad : float @@ -743,32 +736,32 @@ def calcZenith(lat, Declination_rad, HourAngle_rad, deg2rad, rad2deg): * np.cos(HourAngle_rad) + np.sin(lat * deg2rad) * np.sin(Declination_rad)) - - ZenithAngle_deg = ZenithAngle_rad * rad2deg + + ZenithAngle_deg = ZenithAngle_rad * rad2deg return ZenithAngle_rad, ZenithAngle_deg -def calcAngleDiff(ZenithAngle_rad, HourAngle_rad, phi_sensor_rad, +def calcAngleDiff(ZenithAngle_rad, HourAngle_rad, phi_sensor_rad, theta_sensor_rad): '''Calculate angle between sun and upper sensor (to determine when sun is in sight of upper sensor - + Parameters ---------- ZenithAngle_rad : float Zenith angle in radians - HourAngle_rad : float + HourAngle_rad : float Sun hour angle in radians phi_sensor_rad : xarray.DataArray Spherical tilt coordinates theta_sensor_rad : xarray.DataArray Total tilt of sensor, where 0 is horizontal - + Returns ------- float Angle between sun and sensor ''' - return 180 / np.pi * np.arccos(np.sin(ZenithAngle_rad) + return 180 / np.pi * np.arccos(np.sin(ZenithAngle_rad) * np.cos(HourAngle_rad + np.pi) * np.sin(theta_sensor_rad) * np.cos(phi_sensor_rad) @@ -777,12 +770,12 @@ def calcAngleDiff(ZenithAngle_rad, HourAngle_rad, phi_sensor_rad, * np.sin(theta_sensor_rad) * np.sin(phi_sensor_rad) + np.cos(ZenithAngle_rad) - * np.cos(theta_sensor_rad)) + * np.cos(theta_sensor_rad)) def calcAlbedo(usr, dsr_cor, AngleDif_deg, ZenithAngle_deg): - '''Calculate surface albedo based on upwelling and downwelling shorwave + '''Calculate surface albedo based on upwelling and downwelling shorwave flux, the angle between the sun and sensor, and the sun zenith - + Parameters ---------- usr : xarray.DataArray @@ -793,7 +786,7 @@ def calcAlbedo(usr, dsr_cor, AngleDif_deg, ZenithAngle_deg): Angle between sun and sensor in degrees ZenithAngle_deg: float Zenith angle in degrees - + Returns ------- albedo : xarray.DataArray @@ -801,55 +794,55 @@ def calcAlbedo(usr, dsr_cor, AngleDif_deg, ZenithAngle_deg): OKalbedos : xarray.DataArray Valid albedo measurements ''' - albedo = usr / dsr_cor - + albedo = usr / dsr_cor + # NaN bad data - OKalbedos = (AngleDif_deg < 70) & (ZenithAngle_deg < 70) & (albedo < 1) & (albedo > 0) - albedo[~OKalbedos] = np.nan - - # Interpolate all. Note "use_coordinate=False" is used here to force - # comparison against the GDL code when that is run with *only* a TX file. - # Should eventually set to default (True) and interpolate based on time, - # not index. - albedo = albedo.interpolate_na(dim='time', use_coordinate=False) + OKalbedos = (AngleDif_deg < 70) & (ZenithAngle_deg < 70) & (albedo < 1) & (albedo > 0) + albedo[~OKalbedos] = np.nan + + # Interpolate all. Note "use_coordinate=False" is used here to force + # comparison against the GDL code when that is run with *only* a TX file. + # Should eventually set to default (True) and interpolate based on time, + # not index. + albedo = albedo.interpolate_na(dim='time', use_coordinate=False) albedo = albedo.ffill(dim='time').bfill(dim='time') #TODO remove this line and one above? return albedo, OKalbedos - + def calcTOA(ZenithAngle_deg, ZenithAngle_rad): '''Calculate incoming shortwave radiation at the top of the atmosphere, accounting for sunset periods - + Parameters ---------- ZenithAngle_deg : float Zenith angle in degrees ZenithAngle_rad : float Zenith angle in radians - + Returns ------- isr_toa : float Incoming shortwave radiation at the top of the atmosphere ''' sundown = ZenithAngle_deg >= 90 - + # Incoming shortware radiation at the top of the atmosphere - isr_toa = 1372 * np.cos(ZenithAngle_rad) + isr_toa = 1372 * np.cos(ZenithAngle_rad) isr_toa[sundown] = 0 - return isr_toa - -def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, - HourAngle_rad, ZenithAngle_rad, ZenithAngle_deg, - lat, DifFrac, deg2rad): - '''Calculate correction factor for direct beam radiation, as described + return isr_toa + +def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, + HourAngle_rad, ZenithAngle_rad, ZenithAngle_deg, + lat, DifFrac, deg2rad): + '''Calculate correction factor for direct beam radiation, as described here: http://solardat.uoregon.edu/SolarRadiationBasics.html - + Offset correction (where solar zenith angles larger than 110 degrees) not implemented as it should not improve the accuracy of well-calibrated instruments. It would go something like this: ds['dsr'] = ds['dsr'] - ds['dwr_offset'] SRout = SRout - SRout_offset - + Parameters ---------- Declination_rad : float @@ -858,11 +851,11 @@ def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, Spherical tilt coordinates theta_sensor_rad : xarray.DataArray Total tilt of sensor, where 0 is horizontal - HourAngle_rad : float + HourAngle_rad : float Sun hour angle in radians - ZenithAngle_rad : float + ZenithAngle_rad : float Zenith angle in radians - ZenithAngle_deg : float + ZenithAngle_deg : float Zenith Angle in degrees lat : float Latitude @@ -870,7 +863,7 @@ def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, Fractional cloud cover deg2rad : float Degrees to radians conversion - + Returns ------- CorFac_all : xarray.DataArray @@ -895,20 +888,20 @@ def calcCorrectionFactor(Declination_rad, phi_sensor_rad, theta_sensor_rad, * np.sin(theta_sensor_rad) \ * np.sin(phi_sensor_rad + np.pi) \ * np.sin(HourAngle_rad) \ - + CorFac = np.cos(ZenithAngle_rad) / CorFac # sun out of field of view upper sensor CorFac[(CorFac < 0) | (ZenithAngle_deg > 90)] = 1 - + # Calculating ds['dsr'] over a horizontal surface corrected for station/sensor tilt CorFac_all = CorFac / (1 - DifFrac + CorFac * DifFrac) - + return CorFac_all def _getDF(flag_url, flag_file, download=True, verbose=True): - '''Get dataframe from flag or adjust file. First attempt to retrieve from - URL. If this fails then attempt to retrieve from local file - + '''Get dataframe from flag or adjust file. First attempt to retrieve from + URL. If this fails then attempt to retrieve from local file + Parameters ---------- flag_url : str @@ -917,17 +910,17 @@ def _getDF(flag_url, flag_file, download=True, verbose=True): Local path to file download : bool Flag to download file from URL - + Returns ------- df : pd.DataFrame Flag or adjustment dataframe ''' - + # Download local copy as csv if download==True: - os.makedirs(os.path.dirname(flag_file), exist_ok = True) - + os.makedirs(os.path.dirname(flag_file), exist_ok = True) + try: urllib.request.urlretrieve(flag_url, flag_file) if verbose: print('Downloaded a', @@ -944,10 +937,10 @@ def _getDF(flag_url, flag_file, download=True, verbose=True): if os.path.isfile(flag_file): df = pd.read_csv( - flag_file, - comment="#", + flag_file, + comment="#", skipinitialspace=True, - ).dropna(how='all', axis='rows') + ).dropna(how='all', axis='rows') else: df=None if verbose: print('No', flag_file.split('/')[-2][:-1], 'file to read.') @@ -956,20 +949,20 @@ def _getDF(flag_url, flag_file, download=True, verbose=True): def _hampel(vals_orig, k=7*24, t0=3): '''Hampel filter - + Parameters ---------- vals : pd.DataSeries Series of values from which to remove outliers k : int - Size of window, including the sample. For example, 7 is equal to 3 on + Size of window, including the sample. For example, 7 is equal to 3 on either side of value. The default is 7*24. t0 : int Threshold value. The default is 3. ''' #Make copy so original not edited vals=vals_orig.copy() - + #Hampel Filter L= 1.4826 rolling_median=vals.rolling(k).median() @@ -979,11 +972,11 @@ def _hampel(vals_orig, k=7*24, t0=3): outlier_idx=difference>threshold outlier_idx[0:round(k/2)]=False vals.loc[outlier_idx]=np.nan - return(vals) + return(vals) def _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass): '''Check sun position - + Parameters ---------- ds : xarray.Dataset @@ -1009,15 +1002,15 @@ def _checkSunPos(ds, OKalbedos, sundown, sunonlowerdome, TOA_crit_nopass): print('Mean net SR change by corrections:', (ds['dsr_cor']-ds['usr_cor']-ds['dsr']+ds['usr']).sum().values/valid.values, "W/m2") - + def _getTempK(T_0): #TODO same as L2toL3._getTempK() '''Return steam point temperature in Kelvins - + Parameters ---------- T_0 : float Ice point temperature in K - + Returns ------- float @@ -1025,12 +1018,11 @@ def _getTempK(T_0): # return T_0+100 def _getRotation(): #TODO same as L2toL3._getRotation() - '''Return degrees-to-radians and radians-to-degrees''' + '''Return degrees-to-radians and radians-to-degrees''' deg2rad = np.pi / 180 rad2deg = 1 / deg2rad return deg2rad, rad2deg - -if __name__ == "__main__": - # unittest.main() - pass +if __name__ == "__main__": + # unittest.main() + pass diff --git a/src/pypromice/qc/static_qc.py b/src/pypromice/qc/static_qc.py index 4d9c2e59..daabdfd5 100644 --- a/src/pypromice/qc/static_qc.py +++ b/src/pypromice/qc/static_qc.py @@ -1,18 +1,22 @@ +import logging + import numpy as np import pandas as pd import xarray as xr -from typing import Mapping, Optional - +from typing import Mapping, Optional, Union __all__ = [ "apply_static_qc", "find_static_regions", ] +logger = logging.getLogger(__name__) + DEFAULT_VARIABLE_THRESHOLDS = { - "t": {"max_diff": 0.001, "period": 1}, - "p": {"max_diff": 0.0001 / 24, "period": 24}, - "rh": {"max_diff": 0.0001 / 24, "period": 24}, + "t": {"max_diff": 0.0001, "period": 2}, + "p": {"max_diff": 0.0001, "period": 2}, + # Relative humidity can be very stable around 100%. + #"rh": {"max_diff": 0.0001, "period": 2}, } @@ -52,6 +56,8 @@ def apply_static_qc( if variable_thresholds is None: variable_thresholds = DEFAULT_VARIABLE_THRESHOLDS + logger.debug(f"Running apply_static_qc using {variable_thresholds}") + for k in variable_thresholds.keys(): var_all = [ k + "_u", @@ -64,6 +70,9 @@ def apply_static_qc( for v in var_all: if v in df: mask = find_static_regions(df[v], period, max_diff) + n_masked = mask.sum() + n_samples = len(mask) + logger.debug(f"Applying static QC in {v}. Filtering {n_masked}/{n_samples} samples") # setting outliers to NaN df.loc[mask, v] = np.nan @@ -78,18 +87,45 @@ def apply_static_qc( def find_static_regions( data: pd.Series, - period: int, + min_repeats: int, max_diff: float, ) -> pd.Series: """ Algorithm that ensures values can stay the same within the outliers_mask """ - diff = data.diff().fillna(method="ffill").abs() # forward filling all NaNs! - # Indexing is significantly faster on numpy arrays that pandas series - diff = np.array(diff) - outliers_mask = np.zeros_like(diff, dtype=bool) - for i in range(len(outliers_mask) - period + 1): - i_end = i + period - if max(diff[i:i_end]) < max_diff: - outliers_mask[i:i_end] = True - return pd.Series(index=data.index, data=outliers_mask) + diff = data.ffill().diff().abs() # forward filling all NaNs! + mask: pd.Series = diff < max_diff + consecutive_true_df = count_consecutive_true(mask) + static_regions = consecutive_true_df >= min_repeats + # Ignore entries which already nan in the input data + static_regions[data.isna()] = False + return static_regions + + +def count_consecutive_true( + series: Union[pd.Series, pd.DataFrame] +) -> Union[pd.Series, pd.DataFrame]: + """ + Convert boolean series to integer series where the values represent the number of connective true values. + + Examples + -------- + >>> count_consecutive_true(pd.Series([False, True, False, False, True, True, True, False, True])) + pd.Series([0, 1, 0, 0, 1, 2, 3, 0, 1]) + + Parameters + ---------- + series + Boolean pandas Series or DataFrame + + Returns + ------- + consecutive_true_count + Integer pandas Series or DataFrame with values representing the number of connective true values. + + """ + # assert series.dtype == bool + cumsum = series.cumsum() + is_first = series.astype("int").diff() == 1 + offset = (is_first * cumsum).replace(0, np.nan).fillna(method="ffill").fillna(0) + return ((cumsum - offset + 1) * series).astype("int") diff --git a/src/pypromice/qc/static_qc_test.py b/src/pypromice/qc/static_qc_test.py index b556f565..e9aac828 100644 --- a/src/pypromice/qc/static_qc_test.py +++ b/src/pypromice/qc/static_qc_test.py @@ -1,142 +1,146 @@ -import datetime -import random import unittest -from typing import Union import numpy as np +import numpy.testing import pandas as pd from pypromice.qc.static_qc import find_static_regions -def get_random_datetime() -> datetime.datetime: - # Select random timestamp in the period 1970-2030 - seconds_per_year = 365.25 * 24 * 3600 - timestamp = 60 * seconds_per_year * random.random() - return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) - - -def get_random_timeseries( - start: Union[str, datetime.datetime], - period: datetime.timedelta = datetime.timedelta(days=5), - freq="1h", - offset: float = -20, - amplitude: float = 5, -) -> pd.Series: - end = start + period - time_range = pd.date_range(start=start, end=end, freq=freq, tz="utc") - steps_per_day = "1d" / time_range.freq - phase = np.random.random() * 2 * np.pi - x = 2 * np.pi * np.arange(len(time_range)) / steps_per_day - data = amplitude * np.sin(x + phase) + offset - return pd.Series(index=time_range, data=data, name="data") - - class StaticQATestCase(unittest.TestCase): def test_1_hour_static(self): - series = get_random_timeseries( - start=get_random_datetime(), - period=datetime.timedelta(days=5), - freq="1h", - ) - index = 24 - series.iloc[index] = series.iloc[index - 1] - - mask = find_static_regions(series, period=1, max_diff=0.001) - - self.assertEqual(1, mask.sum()) - self.assertTrue(1, mask.iloc[index]) + self._test_1_hour_repeat(10) def test_1_hour_second_index(self): - series = get_random_timeseries( - start=get_random_datetime(), - period=datetime.timedelta(days=5), - freq="1h", - ) - index = 1 - series.iloc[index] = series.iloc[index - 1] - - mask = find_static_regions(series, period=1, max_diff=0.001) - - self.assertEqual(1, mask.sum()) - self.assertTrue(1, mask.iloc[index]) + self._test_1_hour_repeat(0) def test_1_hour_last_index(self): - series = get_random_timeseries( - start=get_random_datetime(), - period=datetime.timedelta(days=5), - freq="1h", - ) - index = -1 - series.iloc[index] = series.iloc[index - 1] + self._test_1_hour_repeat(-2) - mask = find_static_regions(series, period=1, max_diff=0.001) + def _test_1_hour_repeat(self, index: int): + self.assertTrue(index < -1 or index >= 0) + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" + ) + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index + 1] = input_series[index] + min_repeats = 1 + expected_output = input_series.map(lambda _: False) + expected_output[index + 1] = True + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) - self.assertEqual(1, mask.sum()) - self.assertTrue(1, mask.iloc[index]) + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) def test_no_static_period(self): - series = get_random_timeseries( - start=get_random_datetime(), - period=datetime.timedelta(days=5), - freq="1h", + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" ) + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + min_repeats = 1 + expected_output = input_series.map(lambda _: False) - static_mask = find_static_regions(series, period=1, max_diff=0.001) - - pd.testing.assert_series_equal( - pd.Series(index=static_mask.index, data=False), - static_mask, - check_names=False, + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 ) + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) + def test_static_period_longer_than_period_threshold(self): - series = get_random_timeseries( - start=get_random_datetime(), - period=datetime.timedelta(days=100), - freq="1h", + time_range = pd.date_range( + start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left" + ) + index_start = 23 + index_end = 33 + min_repeats = 4 + expected_filter_start = 27 + expected_filter_end = 33 + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index_start:index_end] = input_series[index_start] + expected_output = input_series.map(lambda _: False) + expected_output[expected_filter_start:expected_filter_end] = True + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 ) - index_start = 41 - index_length = 30 - index_end = index_start + index_length - series.iloc[index_start:index_end] = series.iloc[index_start - 1] - static_mask = find_static_regions(series, period=24, max_diff=0.001) + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) - self.assertEqual( - index_length, - static_mask.sum(), + def test_period_threshold_longer_than_static_period(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left" ) - self.assertEqual( - index_length, - static_mask.iloc[index_start:index_end].sum(), + index_start = 23 + index_end = 27 + min_repeats = 10 + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index_start:index_end] = input_series[index_start] + expected_output = input_series.map(lambda _: False) + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 ) - def test_period_threshold_longer_than_static_period(self): - series = get_random_timeseries( - start=get_random_datetime(), - period=datetime.timedelta(days=100), - freq="1h", - ) - index_start = 41 - index_length = 30 - index_end = index_start + index_length - series.iloc[index_start:index_end] = series.iloc[index_start - 1] + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) - static_mask = find_static_regions(series, period=31, max_diff=0.001) + def test_static_period_at_the_end(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-28", freq="h", tz="utc", inclusive="left" + ) + index_start = 23 + min_repeats = 4 + expected_filter_start = 27 + input_series = pd.Series(index=time_range, data=np.arange(0, len(time_range))) + input_series[index_start:] = input_series[index_start] + expected_output = input_series.map(lambda _: False) + expected_output[expected_filter_start:] = True + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 + ) - self.assertEqual(0, static_mask.sum()) + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) - def test_static_period_at_the_end(self): - series = get_random_timeseries( - start=get_random_datetime(), - period=datetime.timedelta(days=5), - freq="1h", + def test_dont_filter_nan_values(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" + ) + input_series = pd.Series( + index=time_range, data=np.zeros_like(time_range, dtype="float") + ) + min_repeats = 4 + input_series[:] = np.nan + input_series[9] = -11 + input_series[10:12] = -10 + input_series[15] = -9 + # There are >=4 repeats if the nan values are forward filled. [10:15] == -10 + # The output mask shouldn't filter nan values. + expected_output = input_series.map(lambda _: False) + + static_mask = find_static_regions( + input_series, min_repeats=min_repeats, max_diff=0.001 ) - index_length = 14 - series.iloc[-index_length:] = series.iloc[-index_length - 1] - static_mask = find_static_regions(series, period=10, max_diff=0.001) + pd.testing.assert_series_equal(expected_output, static_mask, check_names=False) - self.assertEqual(index_length, static_mask.sum()) - self.assertEqual(index_length, static_mask.iloc[-index_length:].sum()) + def test_series_with_nan_values_between_static_values(self): + time_range = pd.date_range( + start="2023-01-26", end="2023-01-27", freq="h", tz="utc", inclusive="left" + ) + values = np.zeros_like(time_range, dtype="float") + values[:] = np.nan + values[9] = -11 + values[16] = -11 + values[17] = -9 + series = pd.Series(index=time_range, data=values) + expected_mask = np.zeros_like(values, dtype="bool") + period = 4 + # The value and index 16 is the same as 9 which is longer than period + # Note: The station region mask shall not filter nan values + expected_mask[16] = True + + output_mask = find_static_regions(series, min_repeats=period, max_diff=0.01) + + np.testing.assert_equal(expected_mask, output_mask)