from cluster_experiments.washover import *
¶
ConstantWashover (Washover)
¶
Constant washover - we drop all rows in the washover period when there is a switch where the treatment is different.
Source code in cluster_experiments/washover.py
class ConstantWashover(Washover):
"""Constant washover - we drop all rows in the washover period when
there is a switch where the treatment is different."""
def __init__(self, washover_time_delta: datetime.timedelta):
self.washover_time_delta = washover_time_delta
def washover(
self,
df: pd.DataFrame,
truncated_time_col: str,
treatment_col: str,
cluster_cols: List[str],
original_time_col: Optional[str] = None,
) -> pd.DataFrame:
"""Constant washover - we drop all rows in the washover period when
there is a switch where the treatment is different.
Args:
df (pd.DataFrame): Input dataframe.
truncated_time_col (str): Name of the truncated time column.
treatment_col (str): Name of the treatment column.
cluster_cols (List[str]): List of clusters of experiment.
original_time_col (Optional[str], optional): Name of the original time column.
Returns:
pd.DataFrame: Same dataframe as input without the rows in the washover period.
Usage:
```python
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from cluster_experiments import ConstantWashover
np.random.seed(42)
num_rows = 10
def random_timestamp(start_time, end_time):
time_delta = end_time - start_time
random_seconds = np.random.randint(0, time_delta.total_seconds())
return start_time + timedelta(seconds=random_seconds)
def generate_data(start_time, end_time, treatment):
data = {
'order_id': np.random.randint(10**9, 10**10, size=num_rows),
'city_code': 'VAL',
'activation_time_local': [random_timestamp(start_time, end_time) for _ in range(num_rows)],
'bin_start_time_local': start_time,
'treatment': treatment
}
return pd.DataFrame(data)
start_times = [datetime(2024, 1, 22, 9, 0), datetime(2024, 1, 22, 11, 0),
datetime(2024, 1, 22, 13, 0), datetime(2024, 1, 22, 15, 0)]
treatments = ['control', 'variation', 'variation', 'control']
dataframes = [generate_data(start, start + timedelta(hours=2), treatment) for start, treatment in zip(start_times, treatments)]
df = pd.concat(dataframes).sort_values(by='activation_time_local').reset_index(drop=True)
## Define washover with 30 min duration
washover = ConstantWashover(washover_time_delta=timedelta(minutes=30))
## Apply washover to the dataframe, the orders with activation time within the first 30 minutes after every change in the treatment column, clustering by city and 2h time bin, will be dropped
df_analysis_washover = washover.washover(
df=df,
truncated_time_col='bin_start_time_local',
treatment_col='treatment',
cluster_cols=['city_code','bin_start_time_local'],
original_time_col='activation_time_local',
)
```
"""
# Set original time column
original_time_col = (
original_time_col
if original_time_col
else _original_time_column(truncated_time_col)
)
# Validate columns
self._validate_columns(df, truncated_time_col, cluster_cols, original_time_col)
# Cluster columns that do not involve time
non_time_cols = list(set(cluster_cols) - set([truncated_time_col]))
# For each cluster, we need to check if treatment has changed wrt last time
df_agg = df.sort_values([original_time_col]).copy()
df_agg = df_agg.drop_duplicates(subset=cluster_cols + [treatment_col])
if non_time_cols:
df_agg["__changed"] = (
df_agg.groupby(non_time_cols)[treatment_col].shift(1)
!= df_agg[treatment_col]
)
else:
df_agg["__changed"] = (
df_agg[treatment_col].shift(1) != df_agg[treatment_col]
)
df_agg = df_agg.loc[:, cluster_cols + ["__changed"]]
return (
df.merge(df_agg, on=cluster_cols, how="inner")
.assign(
__time_since_switch=lambda x: x[original_time_col].astype(
"datetime64[ns]"
)
- x[truncated_time_col].astype("datetime64[ns]"),
__after_washover=lambda x: x["__time_since_switch"]
> self.washover_time_delta,
)
# add not changed in query
.query("__after_washover or not __changed")
.drop(columns=["__time_since_switch", "__after_washover", "__changed"])
)
@classmethod
def from_config(cls, config) -> "Washover":
if not config.washover_time_delta:
raise ValueError(
f"Washover time delta must be specified for ConstantWashover, while it is {config.washover_time_delta = }"
)
washover_time_delta = config.washover_time_delta
if isinstance(washover_time_delta, int):
washover_time_delta = datetime.timedelta(minutes=config.washover_time_delta)
return cls(washover_time_delta=washover_time_delta)
washover(self, df, truncated_time_col, treatment_col, cluster_cols, original_time_col=None)
¶
Constant washover - we drop all rows in the washover period when there is a switch where the treatment is different.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
Input dataframe. |
required |
truncated_time_col |
str |
Name of the truncated time column. |
required |
treatment_col |
str |
Name of the treatment column. |
required |
cluster_cols |
List[str] |
List of clusters of experiment. |
required |
original_time_col |
Optional[str] |
Name of the original time column. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Same dataframe as input without the rows in the washover period. |
Usage:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from cluster_experiments import ConstantWashover
np.random.seed(42)
num_rows = 10
def random_timestamp(start_time, end_time):
time_delta = end_time - start_time
random_seconds = np.random.randint(0, time_delta.total_seconds())
return start_time + timedelta(seconds=random_seconds)
def generate_data(start_time, end_time, treatment):
data = {
'order_id': np.random.randint(10**9, 10**10, size=num_rows),
'city_code': 'VAL',
'activation_time_local': [random_timestamp(start_time, end_time) for _ in range(num_rows)],
'bin_start_time_local': start_time,
'treatment': treatment
}
return pd.DataFrame(data)
start_times = [datetime(2024, 1, 22, 9, 0), datetime(2024, 1, 22, 11, 0),
datetime(2024, 1, 22, 13, 0), datetime(2024, 1, 22, 15, 0)]
treatments = ['control', 'variation', 'variation', 'control']
dataframes = [generate_data(start, start + timedelta(hours=2), treatment) for start, treatment in zip(start_times, treatments)]
df = pd.concat(dataframes).sort_values(by='activation_time_local').reset_index(drop=True)
## Define washover with 30 min duration
washover = ConstantWashover(washover_time_delta=timedelta(minutes=30))
## Apply washover to the dataframe, the orders with activation time within the first 30 minutes after every change in the treatment column, clustering by city and 2h time bin, will be dropped
df_analysis_washover = washover.washover(
df=df,
truncated_time_col='bin_start_time_local',
treatment_col='treatment',
cluster_cols=['city_code','bin_start_time_local'],
original_time_col='activation_time_local',
)
Source code in cluster_experiments/washover.py
def washover(
self,
df: pd.DataFrame,
truncated_time_col: str,
treatment_col: str,
cluster_cols: List[str],
original_time_col: Optional[str] = None,
) -> pd.DataFrame:
"""Constant washover - we drop all rows in the washover period when
there is a switch where the treatment is different.
Args:
df (pd.DataFrame): Input dataframe.
truncated_time_col (str): Name of the truncated time column.
treatment_col (str): Name of the treatment column.
cluster_cols (List[str]): List of clusters of experiment.
original_time_col (Optional[str], optional): Name of the original time column.
Returns:
pd.DataFrame: Same dataframe as input without the rows in the washover period.
Usage:
```python
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from cluster_experiments import ConstantWashover
np.random.seed(42)
num_rows = 10
def random_timestamp(start_time, end_time):
time_delta = end_time - start_time
random_seconds = np.random.randint(0, time_delta.total_seconds())
return start_time + timedelta(seconds=random_seconds)
def generate_data(start_time, end_time, treatment):
data = {
'order_id': np.random.randint(10**9, 10**10, size=num_rows),
'city_code': 'VAL',
'activation_time_local': [random_timestamp(start_time, end_time) for _ in range(num_rows)],
'bin_start_time_local': start_time,
'treatment': treatment
}
return pd.DataFrame(data)
start_times = [datetime(2024, 1, 22, 9, 0), datetime(2024, 1, 22, 11, 0),
datetime(2024, 1, 22, 13, 0), datetime(2024, 1, 22, 15, 0)]
treatments = ['control', 'variation', 'variation', 'control']
dataframes = [generate_data(start, start + timedelta(hours=2), treatment) for start, treatment in zip(start_times, treatments)]
df = pd.concat(dataframes).sort_values(by='activation_time_local').reset_index(drop=True)
## Define washover with 30 min duration
washover = ConstantWashover(washover_time_delta=timedelta(minutes=30))
## Apply washover to the dataframe, the orders with activation time within the first 30 minutes after every change in the treatment column, clustering by city and 2h time bin, will be dropped
df_analysis_washover = washover.washover(
df=df,
truncated_time_col='bin_start_time_local',
treatment_col='treatment',
cluster_cols=['city_code','bin_start_time_local'],
original_time_col='activation_time_local',
)
```
"""
# Set original time column
original_time_col = (
original_time_col
if original_time_col
else _original_time_column(truncated_time_col)
)
# Validate columns
self._validate_columns(df, truncated_time_col, cluster_cols, original_time_col)
# Cluster columns that do not involve time
non_time_cols = list(set(cluster_cols) - set([truncated_time_col]))
# For each cluster, we need to check if treatment has changed wrt last time
df_agg = df.sort_values([original_time_col]).copy()
df_agg = df_agg.drop_duplicates(subset=cluster_cols + [treatment_col])
if non_time_cols:
df_agg["__changed"] = (
df_agg.groupby(non_time_cols)[treatment_col].shift(1)
!= df_agg[treatment_col]
)
else:
df_agg["__changed"] = (
df_agg[treatment_col].shift(1) != df_agg[treatment_col]
)
df_agg = df_agg.loc[:, cluster_cols + ["__changed"]]
return (
df.merge(df_agg, on=cluster_cols, how="inner")
.assign(
__time_since_switch=lambda x: x[original_time_col].astype(
"datetime64[ns]"
)
- x[truncated_time_col].astype("datetime64[ns]"),
__after_washover=lambda x: x["__time_since_switch"]
> self.washover_time_delta,
)
# add not changed in query
.query("__after_washover or not __changed")
.drop(columns=["__time_since_switch", "__after_washover", "__changed"])
)
EmptyWashover (Washover)
¶
No washover - assumes no spill-over effects from one treatment to another.
Source code in cluster_experiments/washover.py
class EmptyWashover(Washover):
"""No washover - assumes no spill-over effects from one treatment to another."""
def washover(
self,
df: pd.DataFrame,
truncated_time_col: str,
treatment_col: str,
cluster_cols: List[str],
original_time_col: Optional[str] = None,
) -> pd.DataFrame:
"""No washover - returns the same dataframe as input.
Args:
df (pd.DataFrame): Input dataframe.
truncated_time_col (str): Name of the truncated time column.
treatment_col (str): Name of the treatment column.
cluster_cols (List[str]): List of clusters of experiment.
original_time_col (Optional[str], optional): Name of the original time column.
Returns:
pd.DataFrame: Same dataframe as input.
Usage:
```python
from cluster_experiments import SwitchbackSplitter
from cluster_experiments import EmptyWashover
washover = EmptyWashover()
n = 10
df = pd.DataFrame(
{
# Random time each minute in 2022-01-01, length 10
"time": pd.date_range("2022-01-01", "2022-01-02", freq="1min")[
np.random.randint(24 * 60, size=n)
],
"city": random.choices(["TGN", "NYC", "LON", "REU"], k=n),
}
)
splitter = SwitchbackSplitter(
washover=washover,
time_col="time",
cluster_cols=["city", "time"],
treatment_col="treatment",
switch_frequency="30T",
)
out_df = splitter.assign_treatment_df(df=washover_split_df)
```
"""
return df
washover(self, df, truncated_time_col, treatment_col, cluster_cols, original_time_col=None)
¶
No washover - returns the same dataframe as input.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
pd.DataFrame |
Input dataframe. |
required |
truncated_time_col |
str |
Name of the truncated time column. |
required |
treatment_col |
str |
Name of the treatment column. |
required |
cluster_cols |
List[str] |
List of clusters of experiment. |
required |
original_time_col |
Optional[str] |
Name of the original time column. |
None |
Returns:
Type | Description |
---|---|
pd.DataFrame |
Same dataframe as input. |
Usage:
from cluster_experiments import SwitchbackSplitter
from cluster_experiments import EmptyWashover
washover = EmptyWashover()
n = 10
df = pd.DataFrame(
{
# Random time each minute in 2022-01-01, length 10
"time": pd.date_range("2022-01-01", "2022-01-02", freq="1min")[
np.random.randint(24 * 60, size=n)
],
"city": random.choices(["TGN", "NYC", "LON", "REU"], k=n),
}
)
splitter = SwitchbackSplitter(
washover=washover,
time_col="time",
cluster_cols=["city", "time"],
treatment_col="treatment",
switch_frequency="30T",
)
out_df = splitter.assign_treatment_df(df=washover_split_df)
Source code in cluster_experiments/washover.py
def washover(
self,
df: pd.DataFrame,
truncated_time_col: str,
treatment_col: str,
cluster_cols: List[str],
original_time_col: Optional[str] = None,
) -> pd.DataFrame:
"""No washover - returns the same dataframe as input.
Args:
df (pd.DataFrame): Input dataframe.
truncated_time_col (str): Name of the truncated time column.
treatment_col (str): Name of the treatment column.
cluster_cols (List[str]): List of clusters of experiment.
original_time_col (Optional[str], optional): Name of the original time column.
Returns:
pd.DataFrame: Same dataframe as input.
Usage:
```python
from cluster_experiments import SwitchbackSplitter
from cluster_experiments import EmptyWashover
washover = EmptyWashover()
n = 10
df = pd.DataFrame(
{
# Random time each minute in 2022-01-01, length 10
"time": pd.date_range("2022-01-01", "2022-01-02", freq="1min")[
np.random.randint(24 * 60, size=n)
],
"city": random.choices(["TGN", "NYC", "LON", "REU"], k=n),
}
)
splitter = SwitchbackSplitter(
washover=washover,
time_col="time",
cluster_cols=["city", "time"],
treatment_col="treatment",
switch_frequency="30T",
)
out_df = splitter.assign_treatment_df(df=washover_split_df)
```
"""
return df
Washover (ABC)
¶
Abstract class to model washovers in the switchback splitter.
Source code in cluster_experiments/washover.py
class Washover(ABC):
"""Abstract class to model washovers in the switchback splitter."""
def _validate_columns(
self,
df: pd.DataFrame,
truncated_time_col: str,
cluster_cols: List[str],
original_time_col: str,
):
"""Validate that all the columns required for the washover are present in the dataframe.
Args:
df (pd.DataFrame): Input dataframe.
truncated_time_col (str): Name of the truncated time column.
cluster_cols (List[str]): List of clusters of experiment.
original_time_col (str): Name of the original time column.
Returns:
None: This method does not return any data; it only performs validation.
"""
if original_time_col not in df.columns:
raise ValueError(
f"{original_time_col = } is not in the dataframe columns and/or not specified as an input."
)
if truncated_time_col not in cluster_cols:
raise ValueError(f"{truncated_time_col = } is not in the cluster columns.")
for col in cluster_cols:
if col not in df.columns:
raise ValueError(f"{col = } cluster is not in the dataframe columns.")
@abstractmethod
def washover(
self,
df: pd.DataFrame,
truncated_time_col: str,
treatment_col: str,
cluster_cols: List[str],
original_time_col: Optional[str] = None,
) -> pd.DataFrame:
"""Abstract method to add washvover to the dataframe."""
@classmethod
def from_config(cls, config) -> "Washover":
return cls()
washover(self, df, truncated_time_col, treatment_col, cluster_cols, original_time_col=None)
¶
Abstract method to add washvover to the dataframe.
Source code in cluster_experiments/washover.py
@abstractmethod
def washover(
self,
df: pd.DataFrame,
truncated_time_col: str,
treatment_col: str,
cluster_cols: List[str],
original_time_col: Optional[str] = None,
) -> pd.DataFrame:
"""Abstract method to add washvover to the dataframe."""