Source code for sevnpy.io.logschema

"""
==============================================================
Logfile Handling, (:mod:`sevny.io.logschema`)
==============================================================

This module contains the class LogSchema that is used to bring at a code-level the SEVN-like log structure and the interface with regex pattern.
The SEVN-like log structure is defined as a series of  items of different  types.
The class LogSchema define each item with five attributes:

    - name
    - kind
    - type
    - pattern
    - description


"""

import warnings
from typing import Dict, Union, List, Type, Tuple, Literal

try:
    from typing import TypedDict
except ImportError:
    from typing_extensions import TypedDict

from .regexutility import ReTypeMatch, capturing, notcapturing
from .. import utility as ut

GTUnion = Union[Type[int], Type[float], Type[float]]
IPDict = Dict[str, Union[Union[str, GTUnion], Tuple[Union[str, GTUnion], str]]]


[docs] class LogSchema: """ This class is used to define and handle the structure of a given log file and produce the correspondent matching pattern There are four important keyword used in the class: - **name:** is the label used to identify the data stored in a given position of the log structure, this can be used as the name of the column on a dataframe. - **description:** short description of the data stored at a given name - **kind:** is the type of the data identified by the name, it could be "int","id","float","name",int,str,float or a string. If it is a string (but not "int","id","float","name","type"), it is called *named_kind* and the kind specifies exactly the searching pattern otherwise the searching pattern will defined by the value stored in kind. For example if kind="S", the searching pattern will contain exactly "S", but if kind="id" the searching pattern will be "[0-9]+" - **pattern:** is the regex searching pattern associated with name and kind - **type:** data type associated with each name, it depends on the kind value: - "id", "type", "int" or int -> int - "float" or float -> float - "name" -> str In all the other cases, i.e. when kind is a string (but not "int","id","float","name","type"), the matching pattern will be inferred: - If the string can be transformed to an integer -> int - If the string can be transformed to a float -> float - Otherwise -> str So, each element in a LogSchema is defined by the name, kind and pattern. Name and kind are defined by the user at the class instantiation or using the method add_item (see example below). The pattern and the type is instead generated by the class based on the kind value (see above). Actually there is a fifth element that is the regex_pattern that is estimated only when the method regex_pattern is called and it includes the pattern + the extra paranthesis needed to create a capturing or non capturing group based on method input Examples ---------- The class is used to define an obejct containing the information about the structure of a SEVN-like logfile, for example assume that we have the following log structure B;<name>;<id>;CIRC;<tiem>;<semimajor_axis_ini>:<eccentricity_ini>:<semimajor_axis_post>:eccentricity_post> e.g.: B;857175750378006;0;CIRC;1.874849e+01;38.2411:0.000633313:38.2411:0 We can use initialize a LogSchema for the header like >>> header=LogSchema({"logtype":("B",""), "name":("name","unique identifier"), "ID":("id",""),"event":("CIRC",""),"Worldtime":(float,"time in Myr")}) While for the body we can use another LogSchema, let'start from a empty initilisation >>> body=LogSchema() >>> body.add_item("semimajor_axis_ini",float,"pre circularisation semimajor axis Rsun") >>> body.add_item("eccentriciy_ini",float,"pre circularisation eccentricity") >>> body.add_item("semimajor_axis_post",float,"post circularisation semimajor axis Rsun") >>> body.add_item("eccentriciy_post",float,"pre circularisation eccentricity") If we want to check the different schema >>> body.kind_schema >>> {'semimajor_axis_ini': <class 'float'>, 'eccentriciy_ini': <class 'float'>, 'semimajor_axis_post': <class 'float'>, 'eccentriciy_post': <class 'float'>} >>> body.type_schema >>> {'semimajor_axis_ini': <class 'float'>, 'eccentriciy_ini': <class 'float'>, 'semimajor_axis_post': <class 'float'>, 'eccentriciy_post': <class 'float'>} >>> body.pattern_schema >>> {'semimajor_axis_ini': '[+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan)', 'eccentriciy_ini': '[+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan)', 'semimajor_axis_post': '[+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan)', 'eccentriciy_post': '[+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan)'} Finally, if we want to get the regex pattern considering only the semimajor axis properteis as capturing items >>> body.regex_pattern(capturing_names=('semimajor_axis_ini','semimajor_axis_post')) >>> {'semimajor_axis_ini': '([+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan))', 'eccentriciy_ini': '(?:[+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan))', 'semimajor_axis_post': '([+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan))', 'eccentriciy_post': '(?:[+|-]?[0-9]+\\.?[0-9]*(?i:e)?[+|-]?[0-9]*|(?i:nan))'} """
[docs] def __init__(self, schema: IPDict = {}): """ Parameters ---------- schema: dictionary containing the schema with the triple name:kind:description """ # Auxiliary dictionary to map from kind to type self._kind_to_type_map = { "int": int, "id": int, "type": int, int: int, float: float, "float": float, "name": str, str: str, } # Check input self._check_input_dict(schema) # Fundametal schema self._schema = schema # These are derived schemas self._kind_schema = None self._type_schema = None self._pattern_schema = None self._description_schema = None
class _ItemDict(TypedDict): """ Auxiliary TypedDict containing the full keywords for a given name """ name: str description: str kind: Union[GTUnion, str] type: GTUnion pattern: str def _check_input(self, name: str, kind: Union[str, GTUnion]): # Check if the kind is in the allowed kinds if isinstance(kind, str) == False and kind not in self._kind_to_type_map: raise ValueError(f"kind {kind} not allowed. Possible values are {list(self._kind_to_type_map.keys())}") else: pass def _check_input_dict(self, input_dict: IPDict): """ Check if the input dictionary can be used as a item for the LogSchema. If it cannot, the method raises an error. Parameters ---------- input_dict: input dictionary """ # First check if the dictionary is in the right format for key in input_dict: if not isinstance(key, str): raise ValueError(f"input key {key} is not a string this is not allowed") # second try if the value is a len 2 collection, in case just add an empty description if not len(input_dict[key]) == 2: raise ValueError( f"Item of key {key} is not a collection with two elements") # check if all the kind in the input schema are present in the allowed kinds. # Otherwise, return a meaningful error not_allowed_names = [] not_allowed_kinds = set() for name, (kind, _) in input_dict.items(): if isinstance(kind, str) == False and kind not in self._kind_to_type_map: not_allowed_names.append(name) not_allowed_kinds.add(kind) if len(not_allowed_names) > 0: err_mess = f"The following names {not_allowed_names} contain not allowed kinds:{not_allowed_kinds}." err_mess += f"The allowed links are: {list(self._kind_to_type_map.keys())}" raise ValueError(err_mess) def update(self, input_dict: IPDict): # Check input self._check_input_dict(input_dict) # Update self._schema.update(input_dict) # Reset derivative schema so that they are updated at the first new call self._reset_derivate_schema()
[docs] def add_item(self, name: str, kind: Union[str, GTUnion], description: str = ""): """ Add an item to the Schema Parameters ---------- name: name of the new item kind: kind of the new item description: a short description of the item """ # Check input self._check_input(name, kind) self._schema.update({name: (kind, description)}) # Reset derivative schema so that they are updated at the first new call self._reset_derivate_schema()
def _kind_to_type(self, kind: Union[str, int, float]) -> Union[Type[int], Type[float], Type[str]]: """ Get a type from the kind Parameters ---------- kind: kind param Returns ------- type: int|float|str the type """ if kind in self._kind_to_type_map: return self._kind_to_type_map[kind] elif ut.str_is_int(kind): return int elif ut.str_is_float(kind): return float else: return str def _kind_to_pattern(self, kind: Union[str, int, float]) -> str: """ Get a pattern from the type If kind is a string (but not "int","id","float","name"), the kind specify exactly the searching pattern otherwise the searching pattern will be defined by the value stored in kind. Parameters ---------- kind: the kind Returns ------- pattern: str the regex pattern """ if kind in self._kind_to_type_map: return ReTypeMatch[kind] else: return kind @property def schema(self) -> IPDict: """ The schema of the log reader """ return self._schema @property def kind_schema(self) -> Dict: """ dictionary containing the pair name:kind """ if self._kind_schema is None: self._kind_schema = {name: kind for name, (kind, _) in self._schema.items()} return self._kind_schema @property def description_schema(self) -> Dict: """ d Dctionary containing the pair name:description """ if self._description_schema is None: self._description_schema = {name: desc for name, (_, desc) in self._schema.items()} return self._description_schema @property def type_schema(self): # Check if empty if self._type_schema is None: self._type_schema = {name: self._kind_to_type(kind) for name, kind in self.kind_schema.items()} return self._type_schema @property def pattern_schema(self): """ Dictionary containing the pair name:type """ # Check if empty if self._pattern_schema is None: self._pattern_schema_dict = {name: self._kind_to_pattern(kind) for name, kind in self.kind_schema.items()} return self._pattern_schema_dict
[docs] def default_capturing_names(self) -> List[str]: """ Get the default capturing name, i.e. the item with a kind that is a string but not "name","id","str","float","int". Returns ------- capturing_names_temp: List A list with the default capturing names (already sorted) """ # Lambda function to check if is a "named" kind, i.e. a kind that is a string but not "name","id","int","float" _check_named_kind = lambda _kind: _kind == self._kind_to_pattern(_kind) # Cycle over the kind schema (name:kind) to check if it is a named item or not. # If it is not a named item include it in the capturing names _capturing_names_temp = [] for name, kind in self.kind_schema.items(): if not _check_named_kind(kind): _capturing_names_temp.append(name) return _capturing_names_temp
[docs] def regex_pattern(self, capturing_names: Union[Literal["default", "all"], List[str], Tuple[str, ...]] = "default") -> \ Tuple[Dict[str, str], List[str]]: """ Parameters ---------- capturing_names: "default","all", iterable A list of names to be included as capturing members. If the string "default" is used all the items in the schema will be captured except for the named item, i.e. the item with a kind that is a string but not "name","id","str","float","int". If the string "all" is used all the items in the schema will be captured Returns ------- regex_pattern: Dictionary a string containing the pair name:regex_pattern capturing_names: List A sorted list of the capturing names """ if capturing_names == "default": capturing_names = self.default_capturing_names() elif capturing_names == "all": # Include all the names in the capturing names capturing_names = self.names elif not isinstance(capturing_names, str): # Use the user provided list of names # First check if it is actually a collection try: len(capturing_names) except TypeError: raise TypeError(f"The input parameter capturing_names need to be a Collectable, or the string \'all\'" f"or the string \'default\', it is instead {capturing_names}") # Chek if a name in the capturing names are not present in the schema and raise a warning. _capturing_names = [] for name in capturing_names: if name not in self.names: warnings.warn(f"Name {name} is not present in the schema names. " f"This capturing name will not be included") else: _capturing_names.append(name) capturing_names = _capturing_names _searching_pattern = {} _sorted_capturing_names = [] for name, pattern in self.pattern_schema.items(): if name in capturing_names: _searching_pattern[name] = capturing(pattern) _sorted_capturing_names.append(name) else: _searching_pattern[name] = notcapturing(pattern) return (_searching_pattern, _sorted_capturing_names)
[docs] def reset(self): """ Reset the schema """ self._schema.clear()
def _reset_derivate_schema(self): """ Reset the other dictionary derived from the main schema """ self._pattern_schema.clear() self._type_schema.clear() self._kind_schema.clear() self._description_schema.clear()
[docs] def pop(self, name: str): """ Remove an item of given name form the schema Parameters ---------- name: Name of the item in the schema to remove """ # Remove the item with a given name self._schema.pop(name)
@property def names(self) -> List[str]: """ List of al the names """ # Return all the names return list(self._schema.keys()) @property def kinds(self) -> List[Union[str, GTUnion]]: """ List of all the kinds """ # Return all the kinds return list(self.kind_schema.values()) @property def types(self) -> List[GTUnion]: """ List of all the types """ # Return all the types return list(self.type_schema.values()) @property def patterns(self) -> List[str]: """ List of all the patterns """ # Return all the patterns return list(self.pattern_schema.values()) @property def descriptions(self) -> List[str]: """ List of all the descriptions """ return list(self.description_schema.values())
[docs] def column_schema(self, offset: int = 0) -> Dict[int, str]: """ Return a dictionary to map the index of the column in the logschema to the correspondent name Parameters ---------- offset: int Offset to use to the define the columns Returns ------- column_schema: Dictionary A Dictionary containg index:name pairs """ return {idx + offset: name for idx, name in enumerate(self.names)}
[docs] def full_schema(self) -> Dict[str, _ItemDict]: """ Create and return a dictionary containing all the information about the schema Returns ------- full_schema: Dictionary A dictionary in which each item is a pair name:dictionary structured as follow {<item_name>: {"name":<item_name>, "description":<item_description>, "kind":<item_kind>, "type":<item_type>, "pattern":<item_pattern>}} """ return {name: self.__getitem__(name) for name in self.names}
def __getitem__(self, locator: Union[str, int]) -> _ItemDict: if isinstance(locator, int): try: locator = self.column_schema()[locator] except KeyError: raise KeyError( f"Column index {locator} do not have a correspondence in the LogSchema {self.column_schema()}") try: return {"name": locator, "kind": self.kind_schema[locator], "description": self.description_schema[locator], "type": self.type_schema[locator], "pattern": self.pattern_schema[locator]} except KeyError: raise KeyError(f"Name \'{locator}\' not available.Available names are {self.names}") def __len__(self): return len(self.names)
[docs] def __repr__(self) -> str: return str(self.full_schema())
[docs] def __str__(self) -> str: fs = "" for key1, int_dic in self.full_schema().items(): fs += f"\n{key1}:\n " for key2, value in int_dic.items(): fs += f"{key2}={value}, " return fs