dataframe_validators.py 3.61 KB
Newer Older
1
from typing import Tuple, Callable, Iterable, List
2
3
4
5
import re

import pandas as pd

6
from app.bulk_persistence.dask.utils import WDMS_INDEX_NAME
7
from app.bulk_persistence.dask.errors import BulkNotProcessable
8
9
from app.conf import Config

10

Yannick's avatar
Yannick committed
11
ValidationResult = Tuple[bool, str]  # Tuple (is_dataframe_valid, failure_reason)
12

Yannick's avatar
Yannick committed
13
ValidationSuccess = (True, '')
14

Yannick's avatar
Yannick committed
15
DataFrameValidationFunc = Callable[[pd.DataFrame], ValidationResult]
16
17


18
19
def assert_df_validate(dataframe: pd.DataFrame,
                       validation_funcs: List[DataFrameValidationFunc]):
20
21
    """ call one or more validation function and throw BulkNotProcessable in case of invalid, run all validation before
     returning """
22
23
24
    if not validation_funcs:
        return
    all_validity, all_reasons = zip(*[fn(dataframe) for fn in validation_funcs])
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

    if not all(all_validity):
        # raise exception with all invalid reasons
        raise BulkNotProcessable(message=",".join([msg for ok, msg in zip(all_validity, all_reasons) if not ok]))


# the following functions are stateless and without side-effect so can be easily used in parallel/cross process context

def no_validation(_) -> ValidationResult:
    """
    Always validate the given dataframe without error/warning
    return True, ''
    """
    return ValidationSuccess


Yannick's avatar
Yannick committed
41
def auto_cast_columns_to_string(df: pd.DataFrame) -> ValidationResult:
42
43
44
45
46
47
48
49
    """
    If given dataframe contains columns name which is not a string, cast it
    return always returns validation success
    """
    df.columns = df.columns.astype(str)
    return ValidationSuccess


Yannick's avatar
Yannick committed
50
def columns_type_must_be_string(df: pd.DataFrame) -> ValidationResult:
51
52
53
54
55
56
57
    """ Ensure given dataframe contains columns name as string only as described by WellLog schemas """
    if all((type(t) is str for t in df.columns)):
        return ValidationSuccess

    return False, 'All columns type should be string'


Yannick's avatar
Yannick committed
58
def validate_index(df: pd.DataFrame) -> ValidationResult:
59
60
61
62
63
64
65
66
67
68
    """ Ensure index """
    if len(df.index) == 0:
        return False, "Empty data"
    if not df.index.is_numeric() and not isinstance(df.index, pd.DatetimeIndex):
        return False, "Index should be numeric or datetime"
    if not df.index.is_unique:
        return False, "Duplicated index found"
    return ValidationSuccess


69
70
71
72
73
74
75
def validate_number_of_columns(df: pd.DataFrame) -> ValidationResult:
    """ Verify max number of columns """
    if len(df.columns) > Config.max_columns_per_chunk_write.value:
        return False, f"Too many columns : maximum allowed '{Config.max_columns_per_chunk_write.value}'"
    return ValidationSuccess


Yannick's avatar
Yannick committed
76
77
PandasReservedIndexColRegexp = re.compile(r'__index_level_\d+__')

Yannick's avatar
Yannick committed
78

Jeremie Hallal's avatar
Jeremie Hallal committed
79
80
def is_reserved_column_name(name: str) -> bool:
    """Return True if the name is a reserved column name by Pandas/Dask with PyArrow"""
81
82
83
    return (PandasReservedIndexColRegexp.match(name)
            or name == '__null_dask_index__'
            or name == WDMS_INDEX_NAME)
Jeremie Hallal's avatar
Jeremie Hallal committed
84

Yannick's avatar
Yannick committed
85
86

def any_reserved_column_name(names: Iterable[str]) -> bool:
87
88
89
90
    """
        There are reserved name for columns which are internally used by Pandas/Dask with PyArrow to save the index.
        Save a df containing reserved name as regular columns lead to inability to read parquet file then.

91
        At this stage, columns used as index are already marked as index and it's not considered as columns by Pandas.
Yannick's avatar
Yannick committed
92
        return: True is any column uses a reserved name
93
    """
Yannick's avatar
Yannick committed
94
    return any(is_reserved_column_name(name) for name in names if type(name) is str)
Yannick's avatar
Yannick committed
95

96

Yannick's avatar
Yannick committed
97
98
99
def columns_not_in_reserved_names(df: pd.DataFrame) -> ValidationResult:
    if any_reserved_column_name(df.columns):
        return False, 'Invalid column name'
100
101

    return ValidationSuccess