Source code for disdrodb.l0.template_tools

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import pandas as pd
from typing import Union
from disdrodb.l0.standards import (
    get_l0a_dtype,
    get_field_nchar_dict,
    get_field_ndigits_dict,
    get_field_ndigits_decimals_dict,
    get_field_ndigits_natural_dict,
)


[docs]def check_column_names(column_names: list, sensor_name: str) -> None: """Checks that the columnn names respects DISDRODB standards. Parameters ---------- column_names : list List of columns names. sensor_name : str Name of the sensor. Raises ------ TypeError Error if some columns do not meet the DISDRODB standards. """ if not isinstance(column_names, list): raise TypeError("'column_names' must be a list of strings.") # Get valid columns dtype_dict = get_l0a_dtype(sensor_name) valid_columns = list(dtype_dict) valid_columns = valid_columns + ["time"] # -------------------------------------------- # Create name sets column_names = set(column_names) valid_columns = set(valid_columns) # -------------------------------------------- # Raise warning if there are columns not respecting DISDRODB standards invalid_columns = list(column_names.difference(valid_columns)) if len(invalid_columns) > 0: print( f"The following columns do no met the DISDRODB standards: {invalid_columns}." ) print("Please remove such columns within the df_sanitizer_fun") # -------------------------------------------- # Check time column is present if "time" not in column_names: print("Please be sure to create the 'time' column within the df_sanitizer_fun.") print( "The 'time' column must be datetime with resolution in seconds (dtype='M8[s]')." ) # -------------------------------------------- return None
def _check_valid_column_index(column_idx, n_columns): if column_idx > (n_columns - 1): raise ValueError("'column_idx' must be between 0 and {}".format(n_columns - 1)) if column_idx < 0: raise ValueError("'column_idx' must be between 0 and {}".format(n_columns - 1)) def _check_columns_indices(column_indices, n_columns): if not isinstance(column_indices, (int, list, type(None), slice)): raise TypeError( "'column_indices' must be an integer, a list of integers, or None." ) if column_indices is None: column_indices = list(range(0, n_columns)) if isinstance(column_indices, slice): start = column_indices.start stop = column_indices.stop step = column_indices.step step = 1 if step is None else step column_indices = list(range(start, stop, step)) if isinstance(column_indices, list): [_check_valid_column_index(idx, n_columns) for idx in column_indices] if isinstance(column_indices, int): _check_valid_column_index(column_indices, n_columns) column_indices = [column_indices] return column_indices
[docs]def get_df_columns_unique_values_dict( df: pd.DataFrame, column_indices: Union[int, slice, list] = None, column_names: bool = True, ): """Create a dictionary {column: unique values} Parameters ---------- df : pd.DataFrame Input dataframe column_indices : Union[int,slice,list], optional column indices column_names : bool, optional If true, print the column name, by default True """ # Retrieve column names columns = list(df.columns) n_columns = len(columns) # Checks for printing specific columns only column_indices = _check_columns_indices(column_indices, n_columns) columns = [columns[idx] for idx in column_indices] # Create dictionary d = {} for i, column in zip(column_indices, columns): if column_names: key = column else: key = "Column " + str(i) d[key] = sorted(df[column].unique().tolist()) # Return return d
####--------------------------------------------------------------------------. #### Character checks
[docs]def arr_has_constant_nchar(arr: np.array) -> bool: """Check if the content of an array has a constant number of characters Parameters ---------- arr : numpy.ndarray The array to analyse Returns ------- booleen True if the number of character is constant """ arr = np.asarray(arr) # Get unique character code unique_character_code = arr.dtype.char if unique_character_code == "O": # If (Python) objects arr = arr.astype(str) elif unique_character_code != "U": # or if not Unicode string raise TypeError("Expecting object (O) or string (U) dtype.") # Get number of characters (include .) str_nchars = np.char.str_len(arr) str_nchars_unique = np.unique(str_nchars) if len(str_nchars_unique) != 1: return False # raise ValueError("Non-unique string length !") else: return True
[docs]def str_is_number(string: str) -> bool: """Check if a string is numeric Parameters ---------- string : Input string Returns ------- bool True if float. """ try: float(string) return True except ValueError: return False
[docs]def str_is_not_number(string: str) -> bool: """Check if a string is not numeric Parameters ---------- string : Input string Returns ------- bool True if not float. """ return not str_is_number(string)
[docs]def str_is_integer(string: str) -> bool: """Check if a string is an integer Parameters ---------- string : Input string Returns ------- bool True if integer. """ try: int(string) return True except ValueError: return False
[docs]def str_has_decimal_digits(string: str) -> bool: """Check if a string has decimals Parameters ---------- string : Input string Returns ------- bool True if sting has digits. """ if len(string.split(".")) == 2: return True else: return False
[docs]def get_decimal_ndigits(string: str) -> int: """Get the decimal number of digit. Parameters ---------- string : str Input string Returns ------- int The number of digit. """ if str_has_decimal_digits(string): return len(string.split(".")[1]) else: return 0
[docs]def get_natural_ndigits(string: str) -> int: """Get the natural number of digit. Parameters ---------- string : str Input string Returns ------- int The number of digit. """ if str_is_integer(string): return len(string) if str_has_decimal_digits(string): return len(string.split(".")[0]) else: return 0
[docs]def get_ndigits(string: str) -> int: """Get the number of digit. Parameters ---------- string : str Input string Returns ------- int Number of digit """ if str_is_not_number(string): return 0 if str_has_decimal_digits(string): return len(string) - 1 # remove . else: return len(string)
[docs]def get_nchar(string: str) -> int: """Get the number of charactar. Parameters ---------- string : str Input string Returns ------- int Number of charactar """ return len(string)
[docs]def get_possible_keys(dict_options: dict, desired_value: str) -> set: """Get the possible keys from the input values Parameters ---------- dict_options : dict Input dictionnary desired_value : str Input value Returns ------- set Keys that the value matches the desired input value. """ list_key_match = [] for k, v in dict_options.items(): if v == desired_value: list_key_match.append(k) set_key_match = set(list_key_match) return set_key_match
[docs]def search_possible_columns(string: str, sensor_name: str) -> list: """Define possible column Parameters ---------- string : str Inpur string sensor_name : str Name of the sensor Returns ------- list list of possible columns """ dict_digits = get_field_ndigits_dict(sensor_name) dict_nchar_digits = get_field_nchar_dict(sensor_name) dict_decimal_digits = get_field_ndigits_decimals_dict(sensor_name) dict_natural_digits = get_field_ndigits_natural_dict(sensor_name) set_digits = get_possible_keys(dict_digits, get_ndigits(string)) set_nchar = get_possible_keys(dict_nchar_digits, get_nchar(string)) set_decimals = get_possible_keys(dict_decimal_digits, get_decimal_ndigits(string)) set_natural = get_possible_keys(dict_natural_digits, get_natural_ndigits(string)) possible_keys = set_digits.intersection(set_nchar, set_decimals, set_natural) possible_keys = list(possible_keys) return possible_keys
[docs]def infer_column_names(df: pd.DataFrame, sensor_name: str, row_idx: int = 1): """Try to guess the dataframe columns names based on string characteristics. Parameters ---------- df : numpy.ndarray The array to analyse sensor_name : str name of the sensor row_idx : int, optional The row ID of the array, by default 1 Returns ------- dict Dictionary with the keys being the column id and the values being the guessed column names """ dict_possible_columns = {} for i, column in enumerate(df.columns): # Get string array arr = df.iloc[:, i] arr = np.asarray(arr).astype(str) # check is the array contains a constant number of character if not arr_has_constant_nchar(arr): print("Column", i, "has non-unique number of characters") # Subset a single string string = arr[row_idx] # Try to guess the column possible_columns = search_possible_columns(string, sensor_name=sensor_name) dict_possible_columns[i] = possible_columns return dict_possible_columns