Source code for knockpy.knockoff_filter

import warnings
import numpy as np
from . import constants
from . import utilities
from . import mrc
from . import knockoffs
from . import metro
from . import mlr
from . import knockoff_stats as kstats


[docs]class KnockoffFilter: """ Performs knockoff-based inference, from start to finish. This wraps both the ``knockoffs.KnockoffSampler`` and ``knockoff_stats.FeatureStatistic`` classes. Parameters ---------- fstat : str or knockpy.knockoff_stats.FeatureStatistic The feature statistic to use in the knockoff filter. Defaults to a lasso statistic. This may also be a string identifier, including: - 'lasso' or 'lcd': cross-validated lasso coefficients differences - 'mlr': the masked likelihood ratio (MLR) statistic, which has provable optimality properties for knockoffs - 'lsm': signed maximum of the lasso path statistic as in Barber and Candes 2015 - 'dlasso': Cross-validated debiased lasso coefficients - 'ridge': Cross validated ridge coefficients - 'ols': Ordinary least squares coefficients - 'margcorr': marginal correlations between features and response - 'deeppink': The deepPINK statistic as in Lu et al. 2018 - 'mlrspline' or 'spline': The MLR statistic applied to regression spline basis functions. - 'randomforest': A random forest with swap importances Note that when using FX knockoffs, the `lcd` statistic does not use cross-valdilidation and sets ``alphas = sigma2 * sqrt(log(p) / n).`` ksampler : str or knockpy.knockoffs.KnockoffSampler The knockoff sampler to use in the knockoff filter. This may also be a string identifier, including: - 'gaussian': Gaussian Model-X knockoffs - 'fx': Fixed-X knockoffs. - 'metro': Generic metropolized knockoff sampler - 'artk': t-tailed Markov chain. - 'blockt': Blocks of t-distributed - 'gibbs_grid': Discrete gibbs grid An alternative to specifying the ksampler is to simply pass in a knockoff matrix during the ``forward`` call. fstat_kwargs : dict Kwargs to pass to the feature statistic ``fit`` function, excluding the required arguments, defaults to {} knockoff_kwargs : dict Kwargs for instantiating the knockoff sampler argument if the ksampler argument is a string identifier. This can be the empt dict for some identifiers such as "gaussian" or "fx", but additional keyword arguments are required for complex samplers such as the "metro" identifier. Defaults to {} Attributes ---------- fstat : knockpy.knockoff_stats.FeatureStatistic The feature statistics to use for inference. This inherits from ``knockoff_stats.FeatureStatistic``. ksampler : knockpy.knockoffs.KnockoffSampler The knockoff sampler to use during inference. This eventually inherits from ``knockoffs.KnockoffSampler``. fstat_kwargs : dict Dictionary of kwargs to pass to the ``fit`` call of ``self.fstat``. knockoff_kwargs : dict If ``ksampler`` is not yet initialized, kwargs to pass to ``ksampler``. Z : np.ndarray a ``2p``-dimsional array of feature and knockoff importances. The first p coordinates correspond to features, the last p correspond to knockoffs. W : np.ndarray an array of feature statistics. This is ``(p,)``-dimensional for regular knockoffs and ``(num_groups,)``-dimensional for group knockoffs. S : np.ndarray the ``(p, p)``-shaped knockoff S-matrix used to generate knockoffs. X : np.ndarray the ``(n, p)``-shaped design matrix Xk : np.ndarray the ``(n, p)``-shaped matrix of knockoffs groups : np.ndarray For group knockoffs, a p-length array of integers from 1 to num_groups such that ``groups[j] == i`` indicates that variable `j` is a member of group `i`. Defaults to None (regular knockoffs). rejections : np.ndarray a ``(p,)``-shaped boolean array where rejections[j] == 1 iff the the knockoff filter rejects the jth feature. G : np.ndarray the ``(2p, 2p)``-shaped feature-knockoff covariance matrix threshold : float the knockoff data-dependent threshold used to select variables Examples -------- Here we fit the KnockoffFilter on fake data from a Gaussian linear model: :: # Fake data-generating process for Gaussian linear model import knockpy as kpy dgprocess = kpy.dgp.DGP() dgprocess.sample_data(n=500, p=500, sparsity=0.1) # LCD statistic with Gaussian MX knockoffs # This uses LedoitWolf covariance estimation by default. from knockpy.knockoff_filter import KnockoffFilter kfilter = KnockoffFilter( fstat='lcd', ksampler='gaussian', knockoff_kwargs={"method":"mvr"}, ) rejections = kfilter.forward(X=dgprocess.X, y=dgprocess.y) """ def __init__( self, fstat="lasso", ksampler="gaussian", fstat_kwargs={}, knockoff_kwargs={}, ): """ Initialize the class. """ ### Preprocessing for knockoffs self.knockoff_kwargs = knockoff_kwargs.copy() if isinstance(ksampler, str): self.knockoff_type = ksampler.lower() self.ksampler = None if isinstance(ksampler, knockoffs.KnockoffSampler): self.knockoff_type = None self.ksampler = ksampler if isinstance(ksampler, knockoffs.FXSampler): self._mx = False elif self.knockoff_type == "fx": self._mx = False else: self._mx = True # Initialize self.S = None ### Parse feature statistic self.fstat_kwargs = fstat_kwargs.copy() if isinstance(fstat, str): fstat = fstat.lower() # Save feature statistic if class-based if isinstance(fstat, kstats.FeatureStatistic): pass # Else parse flags elif fstat == "lasso" or fstat == "lcd": fstat = kstats.LassoStatistic() elif fstat == "lsm": fstat = kstats.LassoStatistic() self.fstat_kwargs["zstat"] = "lars_path" self.fstat_kwargs["antisym"] = "sm" elif fstat == "dlasso": fstat = kstats.LassoStatistic() self.fstat_kwargs["debias"] = True elif fstat == "ridge": fstat = kstats.RidgeStatistic() elif fstat == "ols": fstat = kstats.OLSStatistic() elif fstat == "margcorr": fstat = kstats.MargCorrStatistic() elif fstat == "randomforest": fstat = kstats.RandomForestStatistic() elif fstat == "deeppink": fstat = kstats.DeepPinkStatistic() elif fstat == 'mlr': if self._mx: fstat = mlr.MLR_Spikeslab() else: fstat = mlr.MLR_FX_Spikeslab() elif fstat in ['mlr_spline', 'spline']: fstat = mlr.MLR_Spikeslab_Splines() else: raise ValueError(f"Unrecognized fstat {fstat}") self.fstat = fstat
[docs] def sample_knockoffs(self): """ Samples knockoffs during ``forward``. """ # If we have already computed S, signal this # because this is expensive if self.S is not None: if "S" not in self.knockoff_kwargs: self.knockoff_kwargs["S"] = self.S # Possibly initialize ksampler if self.ksampler is None: # Args common to all ksamplers except fx args = {"X": self.X, "Sigma": self.Sigma} if self.knockoff_type == "gaussian": self.ksampler = knockoffs.GaussianSampler( groups=self.groups, mu=self.mu, **args, **self.knockoff_kwargs ) elif self.knockoff_type == "fx": self.ksampler = knockoffs.FXSampler( X=self.X, groups=self.groups, **self.knockoff_kwargs, ) elif self.knockoff_type == "artk": self.ksampler = metro.ARTKSampler(**args, **self.knockoff_kwargs,) elif self.knockoff_type == "blockt": self.ksampler = metro.BlockTSampler(**args, **self.knockoff_kwargs,) elif self.knockoff_type == "metro": self.ksampler = metro.MetropolizedKnockoffSampler( **args, mu=self.mu, **self.knockoff_kwargs ) elif self.knockoff_type == "gibbs_grid": self.ksampler = metro.GibbsGridSampler( **args, mu=self.mu, **self.knockoff_kwargs ) else: raise ValueError(f"Unrecognized ksampler string {self.knockoff_type}") Xk = self.ksampler.sample_knockoffs() self.S = self.ksampler.fetch_S() # Possibly use recycling if self.recycle_up_to is not None: # Split rec_Xk = self.X[: self.recycle_up_to] new_Xk = Xk[self.recycle_up_to :] # Combine Xk = np.concatenate((rec_Xk, new_Xk), axis=0) self.Xk = Xk # Construct the feature-knockoff covariance matrix, or estimate # it if construction is not possible if self.S is not None and self.Sigma is not None: self.G = np.concatenate( [ np.concatenate([self.Sigma, self.Sigma - self.S]), np.concatenate([self.Sigma - self.S, self.Sigma]), ], axis=1, ) self.Ginv = None else: self.G = None self.Ginv = None return self.Xk
[docs] def make_selections(self, W, fdr): """" Calculate data dependent threshhold and selections """ self.threshold = kstats.data_dependent_threshhold(W=W, fdr=fdr) selected_flags = (W >= self.threshold).astype("float32") return selected_flags
[docs] def forward( self, X, y, Xk=None, mu=None, Sigma=None, groups=None, fdr=0.10, fstat_kwargs={}, knockoff_kwargs={}, shrinkage="ledoitwolf", num_factors=None, recycle_up_to=None, ): """ Runs the knockoff filter; returns whether each feature was rejected. Parameters ---------- X : np.ndarray ``(n, p)``-shaped design matrix y : np.ndarray ``(n,)``-shaped response vector Xk : np.ndarray ``(n, p)``-shaped knockoff matrix. If ``None``, this will construct knockoffs using ``self.ksampler``. mu : np.ndarray ``(p, )``-shaped mean of the features. If ``None``, this defaults to the empirical mean of the features. Sigma : np.ndarray ``(p, p)``-shaped covariance matrix of the features. If ``None``, this is estimated using the ``shrinkage`` option. This is ignored for fixed-X knockoffs. groups : np.ndarray For group knockoffs, a p-length array of integers from 1 to num_groups such that ``groups[j] == i`` indicates that variable `j` is a member of group `i`. Defaults to ``None`` (regular knockoffs). fdr : float The desired level of false discovery rate control. fstat_kwargs : dict Extra kwargs to pass to the feature statistic ``fit`` function, excluding the required arguments. knockoff_kwargs : dict Extra kwargs for instantiating the knockoff sampler argument if the ksampler argument is a string identifier. This can be the empty dict for some identifiers such as "gaussian" or "fx", but additional keyword arguments are required for complex samplers such as the "metro" identifier. Defaults to {} shrinkage : str Shrinkage method if estimating the covariance matrix. Defaults to "LedoitWolf." Other options are "MLE" and "glasso" (graphical lasso). num_factors : int If num_factors is not ``None`` and Sigma is estimated, assumes that the ground-truth Sigma is a factor model with rank ``num_factors`` to speed up computation. recycle_up_to : int or float Three options: - if ``None``, does nothing. - if an integer > 1, uses the first "recycle_up_to" rows of X as the the first ``recycle_up_to`` rows of knockoffs - if a float between 0 and 1 (inclusive), interpreted as the proportion of rows to recycle. For more on recycling, see https://arxiv.org/abs/1602.03574 """ # Preliminaries - infer covariance matrix for MX if Sigma is None and self._mx: Sigma, _ = utilities.estimate_covariance(X, 1e-2, shrinkage) # Possible factor model approximation if num_factors is not None and Sigma is not None: self.D, self.U = utilities.estimate_factor( Sigma, num_factors=num_factors ) Sigma = np.diag(self.D) + np.dot(self.U, self.U.T) self.knockoff_kwargs['how_approx'] = 'factor' self.knockoff_kwargs['D'] = self.D self.knockoff_kwargs['U'] = self.U else: self.D = None self.U = None if not self._mx: Sigma = None # Save objects self.X = X self.Xk = Xk # Center if we are going to fit FX knockoffs if self.Xk is None and not self._mx: self.X = self.X - self.X.mean(axis=0) self.y = y self.mu = mu self.Sigma = Sigma self.groups = groups for key in fstat_kwargs: self.fstat_kwargs[key] = fstat_kwargs[key] for key in knockoff_kwargs: self.knockoff_kwargs[key] = knockoff_kwargs[key] # Save n, p, groups n = X.shape[0] p = X.shape[1] if groups is None: groups = np.arange(1, p + 1, 1) # Parse recycle_up_to if recycle_up_to is None: pass elif recycle_up_to < 1: recycle_up_to = int(recycle_up_to * n) else: recycle_up_to = int(recycle_up_to) self.recycle_up_to = recycle_up_to # Sample knockoffs if self.Xk is None: self.Xk = self.sample_knockoffs() # When using FX knockoffs and lcd statistic, # we cannot use cross-validation. Thus, we pick # a default regularization parameter if not self._mx and isinstance( self.fstat, kstats.LassoStatistic ): if self.fstat_kwargs.get('zstat', 'coef') != 'lars_path': hatsigma2 = kstats.compute_residual_variance( self.X, self.Xk, self.y ) self.fstat_kwargs['alphas'] = hatsigma2 * np.sqrt(np.log(p)/n) # As an edge case, pass Ginv to debiased lasso if "debias" in self.fstat_kwargs: if self.fstat_kwargs["debias"]: if self.Ginv is None: if self.G is not None: self.Ginv = np.linalg.inv(self.G) else: self.G, self.Ginv = utilities.estimate_covariance( np.concatenate([self.X, self.Xk], axis=1) ) self.fstat_kwargs["Ginv"] = self.Ginv # Feature statistics self.fstat.fit( X=self.X, Xk=self.Xk, y=self.y, groups=self.groups, **self.fstat_kwargs ) # Inherit some attributes self.fdr = fdr self.Z = self.fstat.Z self.W = self.fstat.W self.score = self.fstat.score self.score_type = self.fstat.score_type self.rejections = self.make_selections(self.W, self.fdr) # Return return self.rejections
[docs] def seqstep_plot( self, max_rank=None, ): """ Visualizes the knockoff filter. Parameters ---------- max_rank : int Only plot the top ``max_rank`` feature statistics. Defaults to plotting all feature statistics. """ # Import matplotlib try: import matplotlib.pyplot as plt except ImportError as err: raise ImportError( f"matplotlib is required for plotting, but importing it raised {err}." ) if max_rank is None: max_rank = self.X.shape[1] # sort W statistics inds = np.argsort(-1*np.abs(self.W)) sortW = self.W[inds][0:max_rank] xvals = np.arange(max_rank) # threshold threshold_ind = np.sum(np.abs(self.W) >= self.threshold) # Plot and show plt.bar(xvals[sortW < 0], sortW[sortW < 0], color='red') plt.bar(xvals[sortW >= 0], sortW[sortW >= 0], color='blue') if threshold_ind >= max_rank: print("Not plotting threshold since threshold <= max_rank.") else: plt.axvline( threshold_ind, color='black', linestyle='dashed', label=f'Threshold\n(q={self.fdr})' ) plt.legend() plt.xlabel("Rank") plt.ylabel("Feature statistic") plt.show()