Skip to content

from cluster_experiments.washover import *

ConstantWashover

Bases: Washover

Constant washover - we drop all rows in the washover period when there is a switch where the treatment is different.

Source code in cluster_experiments/washover.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class ConstantWashover(Washover):
    """Constant washover - we drop all rows in the washover period when
    there is a switch where the treatment is different."""

    def __init__(self, washover_time_delta: datetime.timedelta):
        self.washover_time_delta = washover_time_delta

    def washover(
        self,
        df: pd.DataFrame,
        truncated_time_col: str,
        treatment_col: str,
        cluster_cols: List[str],
        original_time_col: Optional[str] = None,
    ) -> pd.DataFrame:
        """Constant washover - we drop all rows in the washover period when
        there is a switch where the treatment is different.

        Args:
            df (pd.DataFrame): Input dataframe.
            truncated_time_col (str): Name of the truncated time column.
            treatment_col (str): Name of the treatment column.
            cluster_cols (List[str]): List of clusters of experiment.
            original_time_col (Optional[str], optional): Name of the original time column.

        Returns:
            pd.DataFrame: Same dataframe as input without the rows in the washover period.

        Usage:
        ```python
        import numpy as np
        import pandas as pd
        from datetime import datetime, timedelta

        from cluster_experiments import ConstantWashover

        np.random.seed(42)

        num_rows = 10

        def random_timestamp(start_time, end_time):
            time_delta = end_time - start_time
            random_seconds = np.random.randint(0, time_delta.total_seconds())
            return start_time + timedelta(seconds=random_seconds)

        def generate_data(start_time, end_time, treatment):
            data = {
                'order_id': np.random.randint(10**9, 10**10, size=num_rows),
                'city_code': 'VAL',
                'activation_time_local': [random_timestamp(start_time, end_time) for _ in range(num_rows)],
                'bin_start_time_local': start_time,
                'treatment': treatment
            }
            return pd.DataFrame(data)

        start_times = [datetime(2024, 1, 22, 9, 0), datetime(2024, 1, 22, 11, 0),
                    datetime(2024, 1, 22, 13, 0), datetime(2024, 1, 22, 15, 0)]

        treatments = ['control', 'variation', 'variation', 'control']

        dataframes = [generate_data(start, start + timedelta(hours=2), treatment) for start, treatment in zip(start_times, treatments)]

        df = pd.concat(dataframes).sort_values(by='activation_time_local').reset_index(drop=True)

        ## Define washover with 30 min duration
        washover = ConstantWashover(washover_time_delta=timedelta(minutes=30))

        ## Apply washover to the dataframe, the orders with activation time within the first 30 minutes after every change in the treatment column, clustering by city and 2h time bin, will be dropped
        df_analysis_washover = washover.washover(
            df=df,
            truncated_time_col='bin_start_time_local',
            treatment_col='treatment',
            cluster_cols=['city_code','bin_start_time_local'],
            original_time_col='activation_time_local',
        )
        ```
        """
        # Set original time column
        original_time_col = (
            original_time_col
            if original_time_col
            else _original_time_column(truncated_time_col)
        )

        # Validate columns
        self._validate_columns(df, truncated_time_col, cluster_cols, original_time_col)

        # Cluster columns that do not involve time
        non_time_cols = list(set(cluster_cols) - set([truncated_time_col]))
        # For each cluster, we need to check if treatment has changed wrt last time
        df_agg = df.sort_values([original_time_col]).copy()
        df_agg = df_agg.drop_duplicates(subset=cluster_cols + [treatment_col])

        if non_time_cols:
            df_agg["__changed"] = (
                df_agg.groupby(non_time_cols)[treatment_col].shift(1)
                != df_agg[treatment_col]
            )
        else:
            df_agg["__changed"] = (
                df_agg[treatment_col].shift(1) != df_agg[treatment_col]
            )
        df_agg = df_agg.loc[:, cluster_cols + ["__changed"]]
        return (
            df.merge(df_agg, on=cluster_cols, how="inner")
            .assign(
                __time_since_switch=lambda x: x[original_time_col].astype(
                    "datetime64[ns]"
                )
                - x[truncated_time_col].astype("datetime64[ns]"),
                __after_washover=lambda x: x["__time_since_switch"]
                > self.washover_time_delta,
            )
            # add not changed in query
            .query("__after_washover or not __changed")
            .drop(columns=["__time_since_switch", "__after_washover", "__changed"])
        )

    @classmethod
    def from_config(cls, config) -> "Washover":
        if not config.washover_time_delta:
            raise ValueError(
                f"Washover time delta must be specified for ConstantWashover, while it is {config.washover_time_delta = }"
            )

        washover_time_delta = config.washover_time_delta
        if isinstance(washover_time_delta, int):
            washover_time_delta = datetime.timedelta(minutes=config.washover_time_delta)
        return cls(washover_time_delta=washover_time_delta)

washover(df, truncated_time_col, treatment_col, cluster_cols, original_time_col=None)

Constant washover - we drop all rows in the washover period when there is a switch where the treatment is different.

Parameters:

Name Type Description Default
df DataFrame

Input dataframe.

required
truncated_time_col str

Name of the truncated time column.

required
treatment_col str

Name of the treatment column.

required
cluster_cols List[str]

List of clusters of experiment.

required
original_time_col Optional[str]

Name of the original time column.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Same dataframe as input without the rows in the washover period.

Usage:

import numpy as np
import pandas as pd
from datetime import datetime, timedelta

from cluster_experiments import ConstantWashover

np.random.seed(42)

num_rows = 10

def random_timestamp(start_time, end_time):
    time_delta = end_time - start_time
    random_seconds = np.random.randint(0, time_delta.total_seconds())
    return start_time + timedelta(seconds=random_seconds)

def generate_data(start_time, end_time, treatment):
    data = {
        'order_id': np.random.randint(10**9, 10**10, size=num_rows),
        'city_code': 'VAL',
        'activation_time_local': [random_timestamp(start_time, end_time) for _ in range(num_rows)],
        'bin_start_time_local': start_time,
        'treatment': treatment
    }
    return pd.DataFrame(data)

start_times = [datetime(2024, 1, 22, 9, 0), datetime(2024, 1, 22, 11, 0),
            datetime(2024, 1, 22, 13, 0), datetime(2024, 1, 22, 15, 0)]

treatments = ['control', 'variation', 'variation', 'control']

dataframes = [generate_data(start, start + timedelta(hours=2), treatment) for start, treatment in zip(start_times, treatments)]

df = pd.concat(dataframes).sort_values(by='activation_time_local').reset_index(drop=True)

## Define washover with 30 min duration
washover = ConstantWashover(washover_time_delta=timedelta(minutes=30))

## Apply washover to the dataframe, the orders with activation time within the first 30 minutes after every change in the treatment column, clustering by city and 2h time bin, will be dropped
df_analysis_washover = washover.washover(
    df=df,
    truncated_time_col='bin_start_time_local',
    treatment_col='treatment',
    cluster_cols=['city_code','bin_start_time_local'],
    original_time_col='activation_time_local',
)

Source code in cluster_experiments/washover.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def washover(
    self,
    df: pd.DataFrame,
    truncated_time_col: str,
    treatment_col: str,
    cluster_cols: List[str],
    original_time_col: Optional[str] = None,
) -> pd.DataFrame:
    """Constant washover - we drop all rows in the washover period when
    there is a switch where the treatment is different.

    Args:
        df (pd.DataFrame): Input dataframe.
        truncated_time_col (str): Name of the truncated time column.
        treatment_col (str): Name of the treatment column.
        cluster_cols (List[str]): List of clusters of experiment.
        original_time_col (Optional[str], optional): Name of the original time column.

    Returns:
        pd.DataFrame: Same dataframe as input without the rows in the washover period.

    Usage:
    ```python
    import numpy as np
    import pandas as pd
    from datetime import datetime, timedelta

    from cluster_experiments import ConstantWashover

    np.random.seed(42)

    num_rows = 10

    def random_timestamp(start_time, end_time):
        time_delta = end_time - start_time
        random_seconds = np.random.randint(0, time_delta.total_seconds())
        return start_time + timedelta(seconds=random_seconds)

    def generate_data(start_time, end_time, treatment):
        data = {
            'order_id': np.random.randint(10**9, 10**10, size=num_rows),
            'city_code': 'VAL',
            'activation_time_local': [random_timestamp(start_time, end_time) for _ in range(num_rows)],
            'bin_start_time_local': start_time,
            'treatment': treatment
        }
        return pd.DataFrame(data)

    start_times = [datetime(2024, 1, 22, 9, 0), datetime(2024, 1, 22, 11, 0),
                datetime(2024, 1, 22, 13, 0), datetime(2024, 1, 22, 15, 0)]

    treatments = ['control', 'variation', 'variation', 'control']

    dataframes = [generate_data(start, start + timedelta(hours=2), treatment) for start, treatment in zip(start_times, treatments)]

    df = pd.concat(dataframes).sort_values(by='activation_time_local').reset_index(drop=True)

    ## Define washover with 30 min duration
    washover = ConstantWashover(washover_time_delta=timedelta(minutes=30))

    ## Apply washover to the dataframe, the orders with activation time within the first 30 minutes after every change in the treatment column, clustering by city and 2h time bin, will be dropped
    df_analysis_washover = washover.washover(
        df=df,
        truncated_time_col='bin_start_time_local',
        treatment_col='treatment',
        cluster_cols=['city_code','bin_start_time_local'],
        original_time_col='activation_time_local',
    )
    ```
    """
    # Set original time column
    original_time_col = (
        original_time_col
        if original_time_col
        else _original_time_column(truncated_time_col)
    )

    # Validate columns
    self._validate_columns(df, truncated_time_col, cluster_cols, original_time_col)

    # Cluster columns that do not involve time
    non_time_cols = list(set(cluster_cols) - set([truncated_time_col]))
    # For each cluster, we need to check if treatment has changed wrt last time
    df_agg = df.sort_values([original_time_col]).copy()
    df_agg = df_agg.drop_duplicates(subset=cluster_cols + [treatment_col])

    if non_time_cols:
        df_agg["__changed"] = (
            df_agg.groupby(non_time_cols)[treatment_col].shift(1)
            != df_agg[treatment_col]
        )
    else:
        df_agg["__changed"] = (
            df_agg[treatment_col].shift(1) != df_agg[treatment_col]
        )
    df_agg = df_agg.loc[:, cluster_cols + ["__changed"]]
    return (
        df.merge(df_agg, on=cluster_cols, how="inner")
        .assign(
            __time_since_switch=lambda x: x[original_time_col].astype(
                "datetime64[ns]"
            )
            - x[truncated_time_col].astype("datetime64[ns]"),
            __after_washover=lambda x: x["__time_since_switch"]
            > self.washover_time_delta,
        )
        # add not changed in query
        .query("__after_washover or not __changed")
        .drop(columns=["__time_since_switch", "__after_washover", "__changed"])
    )

EmptyWashover

Bases: Washover

No washover - assumes no spill-over effects from one treatment to another.

Source code in cluster_experiments/washover.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class EmptyWashover(Washover):
    """No washover - assumes no spill-over effects from one treatment to another."""

    def washover(
        self,
        df: pd.DataFrame,
        truncated_time_col: str,
        treatment_col: str,
        cluster_cols: List[str],
        original_time_col: Optional[str] = None,
    ) -> pd.DataFrame:
        """No washover - returns the same dataframe as input.

        Args:
            df (pd.DataFrame): Input dataframe.
            truncated_time_col (str): Name of the truncated time column.
            treatment_col (str): Name of the treatment column.
            cluster_cols (List[str]): List of clusters of experiment.
            original_time_col (Optional[str], optional): Name of the original time column.

        Returns:
            pd.DataFrame: Same dataframe as input.

        Usage:
        ```python
        from cluster_experiments import SwitchbackSplitter
        from cluster_experiments import EmptyWashover

        washover = EmptyWashover()

        n = 10
        df = pd.DataFrame(
            {
                # Random time each minute in 2022-01-01, length 10
                "time": pd.date_range("2022-01-01", "2022-01-02", freq="1min")[
                    np.random.randint(24 * 60, size=n)
                ],
                "city": random.choices(["TGN", "NYC", "LON", "REU"], k=n),
            }
        )


        splitter = SwitchbackSplitter(
            washover=washover,
            time_col="time",
            cluster_cols=["city", "time"],
            treatment_col="treatment",
            switch_frequency="30T",
        )

        out_df = splitter.assign_treatment_df(df=washover_split_df)
        ```
        """
        return df

washover(df, truncated_time_col, treatment_col, cluster_cols, original_time_col=None)

No washover - returns the same dataframe as input.

Parameters:

Name Type Description Default
df DataFrame

Input dataframe.

required
truncated_time_col str

Name of the truncated time column.

required
treatment_col str

Name of the treatment column.

required
cluster_cols List[str]

List of clusters of experiment.

required
original_time_col Optional[str]

Name of the original time column.

None

Returns:

Type Description
DataFrame

pd.DataFrame: Same dataframe as input.

Usage:

from cluster_experiments import SwitchbackSplitter
from cluster_experiments import EmptyWashover

washover = EmptyWashover()

n = 10
df = pd.DataFrame(
    {
        # Random time each minute in 2022-01-01, length 10
        "time": pd.date_range("2022-01-01", "2022-01-02", freq="1min")[
            np.random.randint(24 * 60, size=n)
        ],
        "city": random.choices(["TGN", "NYC", "LON", "REU"], k=n),
    }
)


splitter = SwitchbackSplitter(
    washover=washover,
    time_col="time",
    cluster_cols=["city", "time"],
    treatment_col="treatment",
    switch_frequency="30T",
)

out_df = splitter.assign_treatment_df(df=washover_split_df)

Source code in cluster_experiments/washover.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def washover(
    self,
    df: pd.DataFrame,
    truncated_time_col: str,
    treatment_col: str,
    cluster_cols: List[str],
    original_time_col: Optional[str] = None,
) -> pd.DataFrame:
    """No washover - returns the same dataframe as input.

    Args:
        df (pd.DataFrame): Input dataframe.
        truncated_time_col (str): Name of the truncated time column.
        treatment_col (str): Name of the treatment column.
        cluster_cols (List[str]): List of clusters of experiment.
        original_time_col (Optional[str], optional): Name of the original time column.

    Returns:
        pd.DataFrame: Same dataframe as input.

    Usage:
    ```python
    from cluster_experiments import SwitchbackSplitter
    from cluster_experiments import EmptyWashover

    washover = EmptyWashover()

    n = 10
    df = pd.DataFrame(
        {
            # Random time each minute in 2022-01-01, length 10
            "time": pd.date_range("2022-01-01", "2022-01-02", freq="1min")[
                np.random.randint(24 * 60, size=n)
            ],
            "city": random.choices(["TGN", "NYC", "LON", "REU"], k=n),
        }
    )


    splitter = SwitchbackSplitter(
        washover=washover,
        time_col="time",
        cluster_cols=["city", "time"],
        treatment_col="treatment",
        switch_frequency="30T",
    )

    out_df = splitter.assign_treatment_df(df=washover_split_df)
    ```
    """
    return df

Washover

Bases: ABC

Abstract class to model washovers in the switchback splitter.

Source code in cluster_experiments/washover.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Washover(ABC):
    """Abstract class to model washovers in the switchback splitter."""

    def _validate_columns(
        self,
        df: pd.DataFrame,
        truncated_time_col: str,
        cluster_cols: List[str],
        original_time_col: str,
    ):
        """Validate that all the columns required for the washover are present in the dataframe.

        Args:
            df (pd.DataFrame): Input dataframe.
            truncated_time_col (str): Name of the truncated time column.
            cluster_cols (List[str]): List of clusters of experiment.
            original_time_col (str): Name of the original time column.

        Returns:
            None: This method does not return any data; it only performs validation.

        """
        if original_time_col not in df.columns:
            raise ValueError(
                f"{original_time_col = } is not in the dataframe columns and/or not specified as an input."
            )
        if truncated_time_col not in cluster_cols:
            raise ValueError(f"{truncated_time_col = } is not in the cluster columns.")
        for col in cluster_cols:
            if col not in df.columns:
                raise ValueError(f"{col = } cluster is not in the dataframe columns.")

    @abstractmethod
    def washover(
        self,
        df: pd.DataFrame,
        truncated_time_col: str,
        treatment_col: str,
        cluster_cols: List[str],
        original_time_col: Optional[str] = None,
    ) -> pd.DataFrame:
        """Abstract method to add washvover to the dataframe."""

    @classmethod
    def from_config(cls, config) -> "Washover":
        return cls()

washover(df, truncated_time_col, treatment_col, cluster_cols, original_time_col=None) abstractmethod

Abstract method to add washvover to the dataframe.

Source code in cluster_experiments/washover.py
42
43
44
45
46
47
48
49
50
51
@abstractmethod
def washover(
    self,
    df: pd.DataFrame,
    truncated_time_col: str,
    treatment_col: str,
    cluster_cols: List[str],
    original_time_col: Optional[str] = None,
) -> pd.DataFrame:
    """Abstract method to add washvover to the dataframe."""