API Reference

multivariate_ecdf(data_a, data_b, n_nodes = 1000, verbose = True, random_seed = None)

Function to compute ecdf on proportion of row counts generated by dynamic pandas queries on a dataset.

It returns a tuple of query_list and computed ECDFs of both Input Datasets.

The query list is generated using the first dataset as reference. It is a list of query strings combined for each column. E.g. Say we have a dataframe with two columns x0 & x1. Then the query string will be generated as x0 <= z0 and x1 <= z1, where z0, z1 calculated based on random quantiles that are uniformly distributed on [0, 1], for each feature. Many such query_strings are generated and their respective count proportions are calculated for getting the ECDF from both the input datasets.

Parameters:

Name Type Description Default
data_a DataFrame

Pandas DataFrame

required
data_b DataFrame

Pandas DataFrame

required
n_nodes int

Specifies the number of nodes i.e. the query strings to be generated. This is a hyperparameter and can be tuned. Defaults to 1000.

1000
verbose bool

Flag to display progress of the operations. Defaults to True

True
random_seed int

random seed to be set before operations. If set random seed is set using np.random.seed(random_seed). Defaults to None

None

Raises:

Type Description
TypeError

Throws error if Input Datasets are not Pandas DataFrames

ValueError

Throws error if any Input Dataset is empty

ValueError

Thows value error if one or more column names between both the
Input DataFrames do not match

Returns:

Name Type Description
List Tuple

Returns Tuple of query_string & computed ECDFs of both Input Datasets

Source code in genai_evaluation\genai_evaluation.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def multivariate_ecdf(data_a: pd.DataFrame,
                      data_b: pd.DataFrame,
                      n_nodes: int = 1000,
                      verbose: bool = True,
                      random_seed: int = None) -> Tuple:
    """
    Args:
        data_a (pd.DataFrame): Pandas DataFrame
        data_b (pd.DataFrame): Pandas DataFrame
        n_nodes (int, optional):Specifies the number of nodes 
                                i.e. the query strings to be generated. 
                                This is a hyperparameter and can be tuned.
                                Defaults to 1000.
        verbose (bool): Flag to display progress of the operations. 
                        Defaults to True 
        random_seed (int, optional): random seed to be set before
                                    operations. If set random seed is set using `np.random.seed(random_seed)`. Defaults to None

    Raises:
        TypeError: Throws error if Input Datasets are not Pandas DataFrames
        ValueError: Throws error if any Input Dataset is empty
        ValueError: Thows value error if one or more column names between both the  
                    Input DataFrames do not match


    Returns:
        List: Returns Tuple of query_string & computed ECDFs of both Input Datasets
    """

    if not isinstance(data_a, pd.DataFrame) or not isinstance(data_b, pd.DataFrame):
        raise TypeError("Input Datasets should be Pandas DataFrames!!")

    if data_a.empty or data_b.empty:
        raise ValueError("Input Datasets should not be empty!!")

    data_a = data_a.copy()
    data_b = data_b.copy()

    if re.search(r'[^a-zA-Z0-9_]', "".join(data_a.columns)):
        if verbose:
            print("Dataset A columns have special characters. Will be cleaned before processing!!")
        data_a_cols_cleaned = [re.sub(r'[^a-zA-Z0-9_]', '', col).lower() 
                               for col in data_a.columns]
        data_a.columns = data_a_cols_cleaned

    if re.search(r'[^a-zA-Z0-9_]', "".join(data_a.columns)):
        if verbose:
            print("Dataset B columns have special characters. Will be cleaned before processing!!")
        data_b_cols_cleaned = [re.sub(r'[^a-zA-Z0-9_]', '', col).lower() 
                               for col in data_b.columns]
        data_b.columns = data_b_cols_cleaned

    if not data_a.columns.equals(data_b.columns):
        raise ValueError("One or more column names do not match in both DataFrames!!")    

    eps = 0.0000000001
    query_val = []
    features = data_a.columns
    n_features = len(features)
    if random_seed:
        np.random.seed(random_seed)

    for point in range(n_nodes):
        if point % 100 == 0 and verbose:
            print(f"Sampling ecdf, location = {point}")

        # Get random percentiles
        percentiles = np.random.uniform(0, 1, n_features)
        percentiles = percentiles**(1/n_features)

        # Get the percentile values from the dataset a for each column
        perc_vals = [eps + np.quantile(data_a.iloc[:, k], perc)
                     for k, perc in enumerate(percentiles)]

        # Create the query string combined for each column
        query_str = " and ".join([f"{features[k]} <= {perc_val}"
                                  for k, perc_val in enumerate(perc_vals)])

        # From dataset a, get the counts of rows which
        # satisfy the conditions in the query string
        filter_count_a = len(data_a.query(query_str))

        # For counts > 0, create key: str of the list of perc_vals
        # Append key, query_str & the normalized filter count for dataset
        if filter_count_a > 0:
            key = ', '.join(map(str, perc_vals))
            query_val.append((key, query_str, filter_count_a/len(data_a)))

    # Sort the list based on the items (third element of each tuple)
    query_val.sort(key=lambda item: item[2])

    query_lst = []
    ecdf_a = []
    ecdf_b = []

    # for each entry in the query_val list
    # Retrieve the query_str and run on both datasets to get the filter counts
    # Normalize the filter count for dataset b

    for e_val in query_val:
        query_str = e_val[1]
        value_data_a = e_val[2]
        filter_count_b = len(data_b.query(query_str))
        value_data_b = filter_count_b / len(data_b)
        query_lst.append(query_str)
        ecdf_a.append(value_data_a)
        ecdf_b.append(value_data_b)

    return query_lst, ecdf_a, ecdf_b

ks_statistic(ecdf_a, ecdf_b)

Function to calculate the KS Statistic between the two input ECDFs. Calculates the maximum separation (distance) between the ECDFs and yields a result ranging from 0 (indicating the best fit) and 1 (indicating the worst fit).

Parameters:

Name Type Description Default
ecdf_a List

ECDF Generated through the Multivariate ECDF function

required
ecdf_a List

ECDF Generated through the Multivariate ECDF function

required

Raises:

Type Description
ValueEror

Throws error if the input ECDFs' are empty or their lengths are not equal

Returns:

Name Type Description
float float

Returns KS Statistic

Source code in genai_evaluation\genai_evaluation.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def ks_statistic(ecdf_a: List, ecdf_b: List) -> float:
    """
    Args:
        ecdf_a (List): ECDF Generated through the Multivariate ECDF function
        ecdf_a (List): ECDF Generated through the Multivariate ECDF function

    Raises:
        ValueEror: Throws error if the input ECDFs' are empty or their lengths are not equal

    Returns:
        float: Returns KS Statistic
    """
    if len(ecdf_a) != len(ecdf_b):
        raise ValueError("Both Input ECDFs should be of the same length!!")

    if len(ecdf_a) == 0 or len(ecdf_b) == 0:
        raise ValueError("ECDFs should not be empty!!")

    np_ecdf_a = np.array(ecdf_a)
    np_ecdf_b = np.array(ecdf_b)

    # Compute the absolute difference between ECDFs
    abs_diff = np.abs(np_ecdf_a - np_ecdf_b)

    # Find the maximum absolute difference (KS statistic)
    ks_statistic = np.max(abs_diff)

    return ks_statistic