Source code for twiser.twiser

# Copyright 2021 Twitter, Inc.
# SPDX-License-Identifier: Apache-2.0

"""The goal of this package is to make hypothesis testing using variance reduction methods as easy
as using :func:`scipy.stats.ttest_ind` and :func:`scipy.stats.ttest_ind_from_stats`. A lot of the
API is designed to match that simplicity as much as possible.

The publication in [1]_ was implented using this package. The variance reduction ideas here are
built on top of the CUPED method ideas in [2]_ and [3]_.

The package currently supports three kinds of tests:

* basic :math:`z`-test: This is the one from the intro stats textbooks.
* held out: This is a held out control variate method (train the predictor on a held out set).
* cross val: This is a :math:`k`-fold cross validation type setup when training the predictor.

The distinction between basic, held out (aka cv), and cross val (aka stacked) is discussed in [4]_.

Each method has a few different ways to call it:

* basic: Call the method using the raw data and the control variate predictions.
* from stats: Call the method using sufficient statistics of the data and predictions only.
* train: Pass in a predictor object to train and evaluate the predictor in the routine.

  * For lack of a better choice, I assume the model has a sklearn-style `fit()` and `predict()` API.

Every statistical test in this package returns the same set of variables:

* A best estimate (of the difference of means)
* A confidence interval (on the difference of means)
* A p-value under the H0 that the two means are equal

  * The p-value and confidence interval are tested to be consistent with each under inversion.

References
----------
.. [1] `R. Turner, U. Pavalanathan, S. Webb, N. Hammerla, B. Cohn, and A. Fu. Isotonic regression
   adjustment for variance reduction. In CODE@MIT, 2021
   <https://ide.mit.edu/events/2021-conference-on-digital-experimentation-mit-codemit/>`_.
.. [2] `A. Deng, Y. Xu, R. Kohavi, and T. Walker. Improving the sensitivity of online controlled
   experiments by utilizing pre-experiment data. In Proceedings of the Sixth ACM International
   Conference on Web Search and Data Mining, pages 123--132, 2013
   <https://www.exp-platform.com/Documents/2013-02-CUPED-ImprovingSensitivityOfControlledExperiments.pdf>`_.
.. [3] `A. Poyarkov, A. Drutsa, A. Khalyavin, G. Gusev, and P. Serdyukov. Boosted decision tree
   regression adjustment for variance reduction in online controlled experiments. In Proceedings of
   the 22nd ACM SIGKDD International Conference on Knowledge Discovery and Data Mining, pages
   235--244, 2016 <https://www.kdd.org/kdd2016/papers/files/adf0653-poyarkovA.pdf>`_.
.. [4] `I. Barr. Reducing the variance of A/B tests using prior information. Degenerate State, Jun
   2018
   <https://www.degeneratestate.org/posts/2018/Jan/04/reducing-the-variance-of-ab-test-using-prior-information/>`_.
"""
import warnings
from copy import deepcopy
from typing import Any, Callable, Optional, Sequence, Tuple

import numpy as np
import numpy.typing as npt
import scipy.stats as ss
from sklearn.base import clone
from sklearn.linear_model import LogisticRegression

# Defaults
ALPHA = 0.05
K_FOLD = 5
TRAIN_FRAC = 0.2
HEALTH_CHK_PVAL = 1e-6

MIN_SPLIT = 2  # Min data size so we can estimate mean and variance
MIN_FOLD = 2  # At least need a train and test in K-fold

# Some standard types this package uses
TestResult = Tuple[float, Tuple[float, float], float]
DataGen = Callable[[], Tuple[npt.ArrayLike, npt.ArrayLike, npt.ArrayLike, npt.ArrayLike]]
# Some placeholders that we can later make more restrictive
Model = Any
Rng = Optional[np.random.RandomState]

# Access default numpy rng in way that is short and sphinx friendly
np_random = np.random.random.__self__


class PassThruPred(object):
  def __init__(self):
    pass

  def fit(self, x: npt.ArrayLike, y: npt.ArrayLike) -> None:
    n, = np.shape(y)
    assert x.shape == (n, 1)

  def predict(self, x: npt.ArrayLike) -> np.ndarray:
    n, _ = np.shape(x)
    assert x.shape == (n, 1)
    yp = x[:, 0]
    return yp


class Cuped(object):
  def __init__(self, ddof: int = 1):
    self.mean_x = None
    self.mean_y = None
    self.theta = None
    self.ddof = ddof

  def fit(self, x: npt.ArrayLike, y: npt.ArrayLike) -> None:
    n, = np.shape(y)
    assert n >= MIN_SPLIT
    assert x.shape == (n, 1)
    x = x[:, 0]

    self.mean_x = np.mean(x).item()
    self.mean_y = np.mean(y).item()
    var_x = np.var(x, ddof=self.ddof)
    if np.isclose(var_x, 0.0):
      self.theta = 0.0
    else:
      self.theta = (np.cov(x, y, ddof=self.ddof)[0, 1] / var_x).item()

  def predict(self, x: npt.ArrayLike) -> np.ndarray:
    n, _ = np.shape(x)
    assert x.shape == (n, 1)
    yp = (x[:, 0] - self.mean_x) * self.theta + self.mean_y
    assert yp.shape == (n,)
    return yp


# ==== Validation ====


def _is_psd2(cov: npt.ArrayLike) -> bool:
  # This works for 2x2 but not in general, unlike calling cholesky this is happy with semi-def:
  is_psd = (np.trace(cov) >= -1e-8) and (np.linalg.det(cov) >= -1e-8)
  return is_psd


def _validate_alpha(alpha: float) -> None:
  # Only scalars coming in, so no need to pass back an np version
  assert np.shape(alpha) == ()
  assert 0.0 <= alpha
  assert alpha < 1.0


def _validate_ddof(ddof: int) -> None:
  # Only scalars coming in, so no need to pass back an np version
  assert np.shape(ddof) == ()
  assert ddof >= 0


def _validate_moments_1(mean: float, std: float, n: int) -> None:
  # Only scalars coming in, so no need to pass back an np version
  assert np.shape(mean) == ()
  assert np.shape(std) == ()
  assert np.shape(n) == ()
  assert std >= 0
  assert n > 0


def _validate_moments_2(
  mean: npt.ArrayLike, cov: npt.ArrayLike, n: int
) -> Tuple[np.ndarray, np.ndarray, int]:
  mean = np.asarray_chkfinite(mean)
  cov = np.asarray_chkfinite(cov)

  assert np.shape(mean) == (2,)
  assert np.shape(cov) == (2, 2)
  assert np.shape(n) == ()
  assert _is_psd2(cov)
  assert n > 0
  return mean, cov, n


def _validate_data(
  x: npt.ArrayLike, y: npt.ArrayLike, *, paired: bool = False, dtypes: str = "if"
) -> Tuple[np.ndarray, np.ndarray]:
  # We can always generalize to allow non-finite input later
  x = np.asarray_chkfinite(x)
  y = np.asarray_chkfinite(y)

  assert np.ndim(x) == 1
  assert np.ndim(y) == 1
  assert len(x) >= MIN_SPLIT
  assert len(y) >= MIN_SPLIT
  # Some routines do not work with uint or bool
  assert x.dtype.kind in dtypes
  assert y.dtype.kind in dtypes
  if paired:
    assert np.shape(x) == np.shape(y)
  return x, y


def _validate_train_data(
  x: npt.ArrayLike,
  x_covariates: npt.ArrayLike,
  y: npt.ArrayLike,
  y_covariates: npt.ArrayLike,
  *,
  k_fold: int = 2,
) -> Tuple[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray], Tuple[int, int, int]]:
  x = np.asarray_chkfinite(x)
  x_covariates = np.asarray(x_covariates)
  y = np.asarray_chkfinite(y)
  y_covariates = np.asarray(y_covariates)

  n_x, d = x_covariates.shape
  n_y, = y.shape
  assert x.shape == (n_x,)
  assert y_covariates.shape == (n_y, d)
  assert d >= 1
  assert k_fold >= MIN_FOLD
  assert n_x >= MIN_SPLIT * k_fold
  assert n_y >= MIN_SPLIT * k_fold
  return (x, x_covariates, y, y_covariates), (n_x, n_y, d)


def _validate_train_data_block(
  x: npt.ArrayLike, x_covariates: npt.ArrayLike, y: npt.ArrayLike, y_covariates: npt.ArrayLike
) -> Tuple[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray], Tuple[int, int, int]]:
  x = np.asarray_chkfinite(x)
  x_covariates = np.asarray(x_covariates)
  y = np.asarray_chkfinite(y)
  y_covariates = np.asarray(y_covariates)

  n_x, d = x_covariates.shape
  n_y, = y.shape
  assert x.shape == (n_x,)
  assert y_covariates.shape == (n_y, d)
  assert d >= 1
  assert n_x >= MIN_SPLIT
  assert n_y >= MIN_SPLIT
  return (x, x_covariates, y, y_covariates), (n_x, n_y, d)


# ==== Health Check Features ====


def _health_check_features(
  x: np.ndarray, y: np.ndarray, *, train_frac: float = TRAIN_FRAC, discriminator: Model = None
) -> None:
  random = np.random.RandomState(0)

  n = min(len(x), len(y))

  if len(x) < n:
    x = x[_subset_idx(n, len(x), random=random), :]
  if len(y) < n:
    y = y[_subset_idx(n, len(y), random=random), :]
  assert len(x) == n
  assert len(y) == n

  z = np.concatenate((x, y), axis=0)
  target = np.concatenate((np.ones(n, dtype=bool), np.zeros(n, dtype=bool)), axis=0)

  train_idx = _make_train_idx(train_frac, len(z), random=random)

  # Just hard coding default discriminator for now
  if discriminator is None:
    discriminator = LogisticRegression()
  discriminator.fit(z[train_idx, :], target[train_idx])
  pred = discriminator.predict(z[~train_idx, :])

  n_correct_guess = np.sum(pred == target[~train_idx])
  n_guess = len(target[~train_idx])

  pval = ss.binom_test(n_correct_guess, n_guess, 0.5)

  if pval <= HEALTH_CHK_PVAL:
    warnings.warn(f"Input features have different distribution with p = {pval}", UserWarning)


def _health_check_output(x: np.ndarray, y: np.ndarray) -> None:
  # KS test is not valid in the presence of dupes
  z = np.concatenate((x, y), axis=0)
  # This is faster than set operations for large z, but bloom-filter would be more mem efficient
  all_unique = len(np.unique(z)) == len(z)

  if all_unique:
    _, pval = ss.ks_2samp(x, y)
  else:
    _, _, pval = ztest(x, y)  # only check the means

  if pval <= HEALTH_CHK_PVAL:
    warnings.warn(f"Predictors have different distribution with p = {pval}", UserWarning)


# ==== Basic ====


[docs]def ztest_from_stats( mean1: float, std1: float, nobs1: int, mean2: float, std2: float, nobs2: int, *, alpha: float = ALPHA, ) -> TestResult: r"""Version of :func:`ztest` that works off the sufficient statistics of the data. Parameters ---------- mean1 : float The sample mean of the treatment group outcome :math:`x`. std1 : float The sample standard deviation of the treatment group outcome. nobs1 : int The number of samples in the treatment group. mean2 : float The sample mean of the control group outcome :math:`y`. std2 : float The sample standard deviation of the control group outcome. nobs2 : int The number of samples in the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ _validate_moments_1(mean1, std1, nobs1) _validate_moments_1(mean2, std2, nobs2) _validate_alpha(alpha) estimate = mean1 - mean2 var_1 = std1 ** 2 var_2 = std2 ** 2 std_err = np.sqrt((var_1 / nobs1) + (var_2 / nobs2)) # ss.norm seems pretty robust except exactly at std_err = 0 (even np.nextafter(0, 1) is ok) if std_err == 0.0: lb, ub = estimate, estimate pval = np.float_(estimate == 0.0) else: lb, ub = ss.norm.interval(1.0 - alpha, loc=estimate, scale=std_err) pval = 2 * ss.norm.cdf(-np.abs(estimate), loc=0.0, scale=std_err) return estimate, (lb, ub), pval
[docs]def ztest(x: npt.ArrayLike, y: npt.ArrayLike, *, alpha: float = ALPHA, ddof: int = 1) -> TestResult: r"""Standard two-sample unpaired :math:`z`-test. It does not assume equal sample sizes or variances. Parameters ---------- x : :class:`numpy:numpy.ndarray` of shape of shape (n,) Outcomes for the treatment group. y : :class:`numpy:numpy.ndarray` of shape (m,) Outcomes for the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. ddof : int The "Delta Degrees of Freedom" argument for computing sample variances. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ x, y = _validate_data(x, y, dtypes="buif") _validate_alpha(alpha) _validate_ddof(ddof) R = ztest_from_stats( np.mean(x), np.std(x, ddof=ddof), len(x), np.mean(y), np.std(y, ddof=ddof), len(y), alpha=alpha ) return R
# ==== Control Variate ==== def _delta_moments(mean: np.ndarray, cov: np.ndarray) -> Tuple[float, float]: delta_mean = mean[0] - mean[1] delta_std = np.sqrt(cov[0, 0] + cov[1, 1] - 2 * cov[0, 1]) return delta_mean, delta_std def _subset_idx(m: int, n: int, random: Rng = np_random) -> np.ndarray: idx = np.zeros(n, dtype=bool) idx[:m] = True random.shuffle(idx) return idx def _make_train_idx(frac: float, n: int, random: Rng = np_random) -> np.ndarray: # There are functions in sklearn we could use to avoid needing to implement this, but we are # trying to avoid needing sklearn as a dep outside of the unit tests. assert n >= 2 * MIN_SPLIT n_train = int(np.ceil(np.clip(frac * n, MIN_SPLIT, n - MIN_SPLIT)).item()) assert n_train >= MIN_SPLIT assert n_train <= n - MIN_SPLIT train_idx = _subset_idx(n_train, n, random=random) assert np.sum(train_idx) >= MIN_SPLIT assert np.sum(~train_idx) >= MIN_SPLIT return train_idx
[docs]def ztest_held_out_from_stats( mean1: npt.ArrayLike, cov1: npt.ArrayLike, nobs1: int, mean2: npt.ArrayLike, cov2: npt.ArrayLike, nobs2: int, *, alpha: float = ALPHA, ) -> TestResult: r"""Version of :func:`ztest_held_out` that works off the sufficient statistics of the data. Parameters ---------- mean1 : :class:`numpy:numpy.ndarray` of shape (2,) The sample mean of the treatment group outcome and its prediction: ``[mean(x), mean(xp)]``. cov1 : :class:`numpy:numpy.ndarray` of shape (2, 2) The sample covariance matrix of the treatment group outcome and its prediction: ``cov([x, xp])``. nobs1 : int The number of samples in the treatment group. mean2 : :class:`numpy:numpy.ndarray` of shape (2,) The sample mean of the control group outcome and its prediction: ``[mean(y), mean(yp)]``. cov2 : :class:`numpy:numpy.ndarray` of shape (2, 2) The sample covariance matrix of the control group outcome and its prediction: ``cov([y, yp])``. nobs2 : int The number of samples in the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ mean1, cov1, nobs1 = _validate_moments_2(mean1, cov1, nobs1) mean2, cov2, nobs2 = _validate_moments_2(mean2, cov2, nobs2) _validate_alpha(alpha) mean1, std1 = _delta_moments(mean1, cov1) mean2, std2 = _delta_moments(mean2, cov2) R = ztest_from_stats(mean1, std1, nobs1, mean2, std2, nobs2, alpha=alpha) return R
[docs]def ztest_held_out( x: npt.ArrayLike, xp: npt.ArrayLike, y: npt.ArrayLike, yp: npt.ArrayLike, *, alpha: float = ALPHA, health_check_output: bool = True, ddof: int = 1, ) -> TestResult: r"""Two-sample unpaired :math:`z`-test with variance reduction using control variarates. It does not assume equal sample sizes or variances. The predictions (control variates) must be derived from features that are independent of assignment to treatment or control. If the predictions in treatment and control have a different distribution then the test may be invalid. Parameters ---------- x : :class:`numpy:numpy.ndarray` of shape (n,) Outcomes for the treatment group. xp : :class:`numpy:numpy.ndarray` of shape (n,) Predicted outcomes for the treatment group. y : :class:`numpy:numpy.ndarray` of shape (m,) Outcomes for the control group. yp : :class:`numpy:numpy.ndarray` of shape (m,) Predicted outcomes for the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. health_check_output : bool If ``True`` perform a health check that ensures the predictions have the same distribution in treatment and control. If not, issue a warning. ddof : int The "Delta Degrees of Freedom" argument for computing sample variances. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ x, xp = _validate_data(x, xp, paired=True) y, yp = _validate_data(y, yp, paired=True) _validate_alpha(alpha) _validate_ddof(ddof) if health_check_output: _health_check_output(xp, yp) R = ztest(x - xp, y - yp, alpha=alpha, ddof=ddof) return R
[docs]def ztest_held_out_train( x: npt.ArrayLike, x_covariates: npt.ArrayLike, y: npt.ArrayLike, y_covariates: npt.ArrayLike, *, alpha: float = ALPHA, train_frac: float = TRAIN_FRAC, health_check_input: bool = False, health_check_output: bool = True, predictor: Model = None, random: Rng = None, ddof: int = 1, ) -> TestResult: r"""Version of :func:`ztest_held_out` that also trains the control variate predictor. The covariates/features must be independent of assignment to treatment or control. If the features in treatment and control have a different distribution then the test may be invalid. Parameters ---------- x : :class:`numpy:numpy.ndarray` of shape (n,) Outcomes for the treatment group. x_covariates : :class:`numpy:numpy.ndarray` of shape (n, d) Covariates/features for the treatment group. y : :class:`numpy:numpy.ndarray` of shape (m,) Outcomes for the control group. y_covariates : :class:`numpy:numpy.ndarray` of shape (m, d) Covariates/features for the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. train_frac : float The fraction of data to hold out for training the predictors. To ensure test validity, we do not use the same data for training the predictors and performing the test. This must be inside the interval range ``[0, 1]``. health_check_input : bool If ``True`` perform a health check that ensures the features have the same distribution in treatment and control. If not, issue a warning. It works by training a classifier to predict if a data point is in training or control. This can be slow for a large data set since it requires training a classifier. health_check_output : bool If ``True`` perform a health check that ensures the predictions have the same distribution in treatment and control. If not, issue a warning. predictor : sklearn-like regression object An object that has a `fit` and `predict` routine to make predictions. The object does not need to be fit yet. It will be fit in this method. random : :class:`numpy:numpy.random.RandomState` An optional numpy random stream can be passed in for reproducibility. ddof : int The "Delta Degrees of Freedom" argument for computing sample variances. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ (x, x_covariates, y, y_covariates), (n_x, n_y, _) = _validate_train_data( x, x_covariates, y, y_covariates ) _validate_alpha(alpha) assert 0.0 <= train_frac assert train_frac <= 1.0 _validate_ddof(ddof) if predictor is None: predictor = PassThruPred() if random is None: random = np_random if health_check_input: _health_check_features(x_covariates, y_covariates) # The MC calibration tests fail when we fit in-sample => we need to split. train_idx_x = _make_train_idx(train_frac, n_x, random=random) train_idx_y = _make_train_idx(train_frac, n_y, random=random) z_covariates = np.concatenate( (x_covariates[train_idx_x, :], y_covariates[train_idx_y, :]), axis=0 ) z = np.concatenate((x[train_idx_x], y[train_idx_y]), axis=0) predictor.fit(z_covariates, z) xp = predictor.predict(x_covariates[~train_idx_x, :]) assert np.all(np.isfinite(xp)) yp = predictor.predict(y_covariates[~train_idx_y, :]) assert np.all(np.isfinite(yp)) R = ztest_held_out( x[~train_idx_x], xp, y[~train_idx_y], yp, alpha=alpha, ddof=ddof, health_check_output=health_check_output, ) return R
[docs]def ztest_in_sample_train( x: npt.ArrayLike, x_covariates: npt.ArrayLike, y: npt.ArrayLike, y_covariates: npt.ArrayLike, *, alpha: float = ALPHA, health_check_input: bool = False, health_check_output: bool = False, predictor: Model = None, random: Rng = None, ddof: int = 1, ) -> TestResult: r"""Version of :func:`ztest_held_out` that also trains the control variate predictor. The covariates/features must be independent of assignment to treatment or control. If the features in treatment and control have a different distribution then the test may be invalid. Parameters ---------- x : :class:`numpy:numpy.ndarray` of shape (n,) Outcomes for the treatment group. x_covariates : :class:`numpy:numpy.ndarray` of shape (n, d) Covariates/features for the treatment group. y : :class:`numpy:numpy.ndarray` of shape (m,) Outcomes for the control group. y_covariates : :class:`numpy:numpy.ndarray` of shape (m, d) Covariates/features for the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. health_check_input : bool If ``True`` perform a health check that ensures the features have the same distribution in treatment and control. If not, issue a warning. It works by training a classifier to predict if a data point is in training or control. This can be slow for a large data set since it requires training a classifier. health_check_output : bool If ``True`` perform a health check that ensures the predictions have the same distribution in treatment and control. If not, issue a warning. predictor : sklearn-like regression object An object that has a `fit` and `predict` routine to make predictions. The object does not need to be fit yet. It will be fit in this method. random : :class:`numpy:numpy.random.RandomState` An optional numpy random stream can be passed in for reproducibility. ddof : int The "Delta Degrees of Freedom" argument for computing sample variances. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ (x, x_covariates, y, y_covariates), (n_x, n_y, _) = _validate_train_data( x, x_covariates, y, y_covariates ) _validate_alpha(alpha) _validate_ddof(ddof) if predictor is None: predictor = PassThruPred() if random is None: random = np_random if health_check_input: _health_check_features(x_covariates, y_covariates) z_covariates = np.concatenate((x_covariates, y_covariates), axis=0) z = np.concatenate((x, y), axis=0) predictor.fit(z_covariates, z) xp = predictor.predict(x_covariates) assert np.all(np.isfinite(xp)) yp = predictor.predict(y_covariates) assert np.all(np.isfinite(yp)) R = ztest_held_out(x, xp, y, yp, alpha=alpha, ddof=ddof, health_check_output=health_check_output) return R
# ==== Implement cross val version ==== def _pool_moments( mean: npt.ArrayLike, cov: npt.ArrayLike, nobs: npt.ArrayLike ) -> Tuple[np.ndarray, np.ndarray, int]: """Warning: this routine is currently only correct for ddof=0.""" mean = np.asarray_chkfinite(mean) cov = np.asarray_chkfinite(cov) nobs = np.asarray_chkfinite(nobs) n_g, d = mean.shape assert nobs.shape == (n_g,) assert cov.shape == (n_g, d, d) assert np.all(nobs >= MIN_SPLIT) assert all(_is_psd2(cc) for cc in cov) w = nobs / float(np.sum(nobs)) mean_ = np.sum(w[:, None] * mean, axis=0) cov_ = np.sum(w[:, None, None] * cov, axis=0) + np.cov(mean.T, ddof=0, aweights=w) nobs_ = np.sum(nobs) return mean_, cov_, nobs_ def _fold_idx(n: int, k: int, random: Rng = np_random) -> np.ndarray: # There are functions in sklearn we could use to avoid needing to implement this, but we are # trying to avoid needing sklearn as a dep outside of the unit tests. assert n >= k assert k >= 1 idx = np.arange(n) % k random.shuffle(idx) return idx
[docs]def ztest_cross_val_from_stats( mean1: npt.ArrayLike, cov1: npt.ArrayLike, nobs1: npt.ArrayLike, mean2: npt.ArrayLike, cov2: npt.ArrayLike, nobs2: npt.ArrayLike, *, alpha: float = ALPHA, ) -> TestResult: r"""Version of :func:`ztest_cross_val` that works off the sufficient statistics of the data. Parameters ---------- mean1 : :class:`numpy:numpy.ndarray` of shape (k, 2) The sample mean of the treatment group outcome and its prediction: ``[mean(x), mean(xp)]``, for each fold in the :math:`k`-fold cross validation. cov1 : :class:`numpy:numpy.ndarray` of shape (k, 2, 2) The sample covariance matrix of the treatment group outcome and its prediction: ``cov([x, xp])``, for each fold in the :math:`k`-fold cross validation. nobs1 : :class:`numpy:numpy.ndarray` of shape (k,) The number of samples in the treatment group, for each fold in the :math:`k`-fold cross validation. mean2 : :class:`numpy:numpy.ndarray` of shape (k, 2) The sample mean of the control group outcome and its prediction: ``[mean(y), mean(yp)]``, for each fold in the :math:`k`-fold cross validation. cov2 : :class:`numpy:numpy.ndarray` of shape (k, 2, 2) The sample covariance matrix of the control group outcome and its prediction: ``cov([y, yp])``, for each fold in the :math:`k`-fold cross validation. nobs2 : :class:`numpy:numpy.ndarray` of shape (k,) The number of samples in the control group, for each fold in the :math:`k`-fold cross validation. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ # _pool_moments will validate the moments _validate_alpha(alpha) mean1, cov1, nobs1 = _pool_moments(mean1, cov1, nobs1) mean2, cov2, nobs2 = _pool_moments(mean2, cov2, nobs2) R = ztest_held_out_from_stats(mean1, cov1, nobs1, mean2, cov2, nobs2, alpha=alpha) return R
[docs]def ztest_cross_val( x: npt.ArrayLike, xp: npt.ArrayLike, x_fold: npt.ArrayLike, y: npt.ArrayLike, yp: npt.ArrayLike, y_fold: npt.ArrayLike, *, alpha: float = ALPHA, health_check_output: bool = True, ) -> TestResult: r"""Two-sample unpaired :math:`z`-test with variance reduction using the cross validated control variarates method. It does not assume equal sample sizes or variances. The predictions (control variates) must be derived from features that are independent of assignment to treatment or control. If the predictions in treatment and control have a different distribution then the test may be invalid. Parameters ---------- x : :class:`numpy:numpy.ndarray` of shape (n,) Outcomes for the treatment group. xp : :class:`numpy:numpy.ndarray` of shape (n,) Predicted outcomes for the treatment group derived from a cross-validation routine. x_fold : :class:`numpy:numpy.ndarray` of shape (n,) The cross validation fold assignment for each data point in treatment (of `dtype` `int`). y : :class:`numpy:numpy.ndarray` of shape (m,) Outcomes for the control group. yp : :class:`numpy:numpy.ndarray` of shape (m,) Predicted outcomes for the control group derived from a cross-validation routine. y_fold : :class:`numpy:numpy.ndarray` of shape (n,) The cross validation fold assignment for each data point in control (of `dtype` `int`). alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. health_check_output : bool If ``True`` perform a health check that ensures the predictions have the same distribution in treatment and control. If not, issue a warning. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ x, xp = _validate_data(x, xp, paired=True) y, yp = _validate_data(y, yp, paired=True) _validate_alpha(alpha) # Current method ignores fold index => we won't validate for now R = ztest_held_out(x, xp, y, yp, alpha=alpha, ddof=0, health_check_output=health_check_output) return R
[docs]def ztest_cross_val_train( x: npt.ArrayLike, x_covariates: npt.ArrayLike, y: npt.ArrayLike, y_covariates: npt.ArrayLike, *, alpha: float = ALPHA, k_fold: int = K_FOLD, health_check_input: bool = False, health_check_output: bool = True, predictor: Model = None, random: Rng = None, ) -> TestResult: r"""Version of :func:`ztest_cross_val` that also trains the control variate predictor. The covariates/features must be independent of assignment to treatment or control. If the features in treatment and control have a different distribution then the test may be invalid. Parameters ---------- x : :class:`numpy:numpy.ndarray` of shape (n,) Outcomes for the treatment group. x_covariates : :class:`numpy:numpy.ndarray` of shape (n, d) Covariates/features for the treatment group. y : :class:`numpy:numpy.ndarray` of shape (m,) Outcomes for the control group. y_covariates : :class:`numpy:numpy.ndarray` of shape (m, d) Covariates/features for the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. k_fold : int The number of folds in the cross validation: :math:`k`. health_check_input : bool If ``True`` perform a health check that ensures the features have the same distribution in treatment and control. If not, issue a warning. It works by training a classifier to predict if a data point is in training or control. This can be slow for a large data set since it requires training a classifier. health_check_output : bool If ``True`` perform a health check that ensures the predictions have the same distribution in treatment and control. If not, issue a warning. predictor : sklearn-like regression object An object that has a `fit` and `predict` routine to make predictions. The object does not need to be fit yet. It will be fit in this method. random : :class:`numpy:numpy.random.RandomState` An optional numpy random stream can be passed in for reproducibility. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ (x, x_covariates, y, y_covariates), (n_x, n_y, _) = _validate_train_data( x, x_covariates, y, y_covariates, k_fold=k_fold ) _validate_alpha(alpha) if predictor is None: predictor = PassThruPred() if random is None: random = np_random if health_check_input: _health_check_features(x_covariates, y_covariates) fold_idx_x = _fold_idx(n_x, k_fold, random=random) fold_idx_y = _fold_idx(n_y, k_fold, random=random) xp = np.zeros(n_x) yp = np.zeros(n_y) for kk in range(k_fold): z_covariates = np.concatenate( (x_covariates[fold_idx_x != kk, :], y_covariates[fold_idx_y != kk, :]), axis=0 ) z = np.concatenate((x[fold_idx_x != kk], y[fold_idx_y != kk]), axis=0) predictor.fit(z_covariates, z) xp[fold_idx_x == kk] = predictor.predict(x_covariates[fold_idx_x == kk]) yp[fold_idx_y == kk] = predictor.predict(y_covariates[fold_idx_y == kk]) R = ztest_cross_val( x, xp, fold_idx_x, y, yp, fold_idx_y, alpha=alpha, health_check_output=health_check_output ) return R
[docs]def ztest_cross_val_train_blockwise( x: npt.ArrayLike, x_covariates: npt.ArrayLike, y: npt.ArrayLike, y_covariates: npt.ArrayLike, *, alpha: float = ALPHA, k_fold: int = K_FOLD, health_check_input: bool = False, health_check_output: bool = True, predictor: Model = None, random: Rng = None, ) -> TestResult: r"""Version of :func:`ztest_cross_val_train` that is more efficient if the fit routine scales worse than O(N), otherwise this will not be more efficient. Parameters ---------- x : :class:`numpy:numpy.ndarray` of shape (n,) Outcomes for the treatment group. x_covariates : :class:`numpy:numpy.ndarray` of shape (n, d) Covariates/features for the treatment group. y : :class:`numpy:numpy.ndarray` of shape (m,) Outcomes for the control group. y_covariates : :class:`numpy:numpy.ndarray` of shape (m, d) Covariates/features for the control group. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. k_fold : int The number of folds in the cross validation: :math:`k`. health_check_input : bool If ``True`` perform a health check that ensures the features have the same distribution in treatment and control. If not, issue a warning. It works by training a classifier to predict if a data point is in training or control. This can be slow for a large data set since it requires training a classifier. health_check_output : bool If ``True`` perform a health check that ensures the predictions have the same distribution in treatment and control. If not, issue a warning. predictor : sklearn-like regression object An object that has a `fit` and `predict` routine to make predictions. The object does not need to be fit yet. It will be fit in this method. random : :class:`numpy:numpy.random.RandomState` An optional numpy random stream can be passed in for reproducibility. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ (x, x_covariates, y, y_covariates), (n_x, n_y, _) = _validate_train_data( x, x_covariates, y, y_covariates, k_fold=k_fold ) _validate_alpha(alpha) if predictor is None: predictor = PassThruPred() if random is None: random = np_random if health_check_input: _health_check_features(x_covariates, y_covariates) fold_idx_x = _fold_idx(n_x, k_fold, random=random) fold_idx_y = _fold_idx(n_y, k_fold, random=random) xp = np.nan + np.zeros((n_x, k_fold)) yp = np.nan + np.zeros((n_y, k_fold)) for kk in range(k_fold): z_covariates = np.concatenate( (x_covariates[fold_idx_x == kk, :], y_covariates[fold_idx_y == kk, :]), axis=0 ) z = np.concatenate((x[fold_idx_x == kk], y[fold_idx_y == kk]), axis=0) predictor.fit(z_covariates, z) xp[fold_idx_x != kk, kk] = predictor.predict(x_covariates[fold_idx_x != kk]) yp[fold_idx_y != kk, kk] = predictor.predict(y_covariates[fold_idx_y != kk]) # Now average the predictions assert np.all(np.sum(np.isnan(xp), axis=1) == 1) assert np.all(np.sum(np.isnan(yp), axis=1) == 1) xp = np.nanmean(xp, axis=1) yp = np.nanmean(yp, axis=1) R = ztest_cross_val( x, xp, fold_idx_x, y, yp, fold_idx_y, alpha=alpha, health_check_output=health_check_output ) return R
[docs]def ztest_cross_val_train_load_blockwise( data_iter: Sequence[DataGen], *, alpha: float = ALPHA, predictor: Model = None, callback: Optional[Callable[[Model], None]] = None, ) -> TestResult: r"""Version of :func:`ztest_cross_val_train_blockwise` that loads the data in blocks to avoid overflowing memory. Using :func:`ztest_cross_val_train_blockwise` is faster if all the data fits in memory. Parameters ---------- data_iter : Sequence[Callable[[], Tuple[ndarray, ndarray, ndarray, ndarray]]] An iterable of functions, where each function returns a different cross validation fold. The functions should return data in the format of a tuple: ``(x, x_covariates, y, y_covariates)``. See the parameters of :func:`ztest_cross_val_train_blockwise` for details on the shapes of these variables. alpha : float Required confidence level, typically this should be 0.05, and must be inside the interval range :math:`[0, 1)`. predictor : sklearn-like regression object An object that has a `fit` and `predict` routine to make predictions. The object does not need to be fit yet. It will be fit in this method. callback : An optional callback that gets called for each cross validation fold in the format ``callback(predictor)``. This is sometimes useful for logging. Returns ------- estimate : Estimate of the difference in means: :math:`\mathbb{E}[x] - \mathbb{E}[y]`. ci : Confidence interval (with coverage :math:`1 - \alpha`) for the estimate. pval : The p-value under the null hypothesis H0 that :math:`\mathbb{E}[x] = \mathbb{E}[y]`. """ # The dtype for best sphinx generation on data_iter is: # Sequence[Callable[[], Tuple[:class:`numpy:numpy.ndarray`, :class:`numpy:numpy.ndarray`, # :class:`numpy:numpy.ndarray`, :class:`numpy:numpy.ndarray`]] # but that messes up the max line length. k_fold = len(data_iter) assert k_fold >= MIN_FOLD _validate_alpha(alpha) if predictor is None: predictor = PassThruPred() try: predictor = [clone(predictor) for _ in range(k_fold)] except TypeError: predictor = [deepcopy(predictor) for _ in range(k_fold)] # Train a model for each block for predictor_, data_gen in zip(predictor, data_iter): (x, x_covariates, y, y_covariates) = data_gen() (x, x_covariates, y, y_covariates), _ = _validate_train_data_block( x, x_covariates, y, y_covariates ) z_covariates = np.concatenate((x_covariates, y_covariates), axis=0) z = np.concatenate((x, y), axis=0) predictor_.fit(z_covariates, z) if callback is not None: callback(predictor_) # Now the prediction for each block mean1 = np.zeros((k_fold, 2)) cov1 = np.zeros((k_fold, 2, 2)) nobs1 = np.zeros(k_fold) mean2 = np.zeros((k_fold, 2)) cov2 = np.zeros((k_fold, 2, 2)) nobs2 = np.zeros(k_fold) for kk, data_gen in enumerate(data_iter): (x, x_covariates, y, y_covariates) = data_gen() (x, x_covariates, y, y_covariates), (n_x, n_y, _) = _validate_train_data_block( x, x_covariates, y, y_covariates ) # Get predictions from each fold predictor xp = np.nan + np.zeros((n_x, k_fold)) yp = np.nan + np.zeros((n_y, k_fold)) for kk_, predictor_ in enumerate(predictor): if kk != kk_: xp[:, kk_] = predictor_.predict(x_covariates) yp[:, kk_] = predictor_.predict(y_covariates) # Now average the predictions assert np.all(np.sum(np.isnan(xp), axis=1) == 1) assert np.all(np.sum(np.isnan(yp), axis=1) == 1) xp = np.nanmean(xp, axis=1) yp = np.nanmean(yp, axis=1) # Now save the suff stats mean1[kk, 0] = np.mean(x) mean1[kk, 1] = np.mean(xp) cov1[kk, :, :] = np.cov((x, xp), ddof=0) nobs1[kk] = n_x mean2[kk, 0] = np.mean(y) mean2[kk, 1] = np.mean(yp) cov2[kk, :, :] = np.cov((y, yp), ddof=0) nobs2[kk] = n_y R = ztest_cross_val_from_stats(mean1, cov1, nobs1, mean2, cov2, nobs2, alpha=alpha) return R