import warnings
import numpy as np
# Model-fitters
import sklearn
from sklearn import linear_model, model_selection, ensemble
from . import utilities
from .constants import DEFAULT_REG_VALS
[docs]def calc_mse(model, X, y):
""" Gets MSE of a model """
preds = model.predict(X)
resids = (preds - y) / y.std()
return np.sum((resids) ** 2)
[docs]def parse_y_dist(y):
""" Checks if y is binary; else assumes it is continuous """
n = y.shape[0]
if np.unique(y).shape[0] == 2:
return "binomial"
elif np.unique(y).shape[0] == n:
return "gaussian"
else:
#warnings.warn("Treating y data as continuous even though it may be discrete.")
return "gaussian"
[docs]def parse_logistic_flag(kwargs):
""" Checks whether y_dist is binomial """
if "y_dist" in kwargs:
if kwargs["y_dist"] == "binomial":
return True
return False
[docs]def combine_Z_stats(Z, groups=None, antisym="cd", group_agg="sum"):
"""
Given Z scores (variable importances), returns (grouped) feature statistics
Parameters
----------
Z : np.ndarray
``(2p,)``-shaped numpy array of Z-statistics. The first p values
correspond to true features, and the last p correspond to knockoffs
(in the same order as the true features).
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
antisym : str
The antisymmetric function used to create (ungrouped) feature
statistics. Three options:
- "CD" (Difference of absolute vals of coefficients),
- "SM" (signed maximum).
- "SCD" (Simple difference of coefficients - NOT recommended)
group_agg : str
For group knockoffs, specifies how to turn individual feature
statistics into grouped feature statistics. Two options:
"sum" and "avg".
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
# Step 1: Pairwise W statistics.
p = int(Z.shape[0] / 2)
if Z.shape[0] != 2 * p:
raise ValueError(
f"Unexpected shape {Z.shape} for Z statistics (expected ({2*p},))"
)
if groups is None:
groups = np.arange(1, p + 1)
else:
groups = utilities.preprocess_groups(groups)
antisym = str(antisym).lower()
# Absolute coefficient differences
if antisym == "cd":
pair_W = np.abs(Z[0:p]) - np.abs(Z[p:])
# Signed maxes
elif antisym == "sm":
inds = np.arange(0, p, 1)
pair_W = np.maximum(np.abs(Z[inds]), np.abs(Z[inds + p]))
pair_W = pair_W * np.sign(np.abs(Z[inds]) - np.abs(Z[inds + p]))
# Simple coefficient differences
elif antisym == "scd":
pair_W = Z[0:p] - Z[p:]
else:
raise ValueError(f'antisym ({antisym}) must be one of "cd", "sm", "scd"')
# Step 2: Group statistics
m = np.unique(groups).shape[0]
W_group = np.zeros(m)
for j in range(p):
W_group[groups[j] - 1] += pair_W[j]
# If averaging...
if group_agg == "sum":
pass
elif group_agg == "avg":
group_sizes = utilities.calc_group_sizes(groups)
W_group = W_group / group_sizes
else:
raise ValueError(f'group_agg ({group_agg}) must be one of "sum", "avg"')
# Return
return W_group
[docs]def compute_residual_variance(X, Xk, y):
"""
Estimates sigma2 using residual variance of y
given [X, Xk]. Uses a penalized linear model
if n < 2p + 10 or p >= 1500 (for efficiency).
Returns
-------
sigma2 : float
Estimated residual variance
"""
# Use (penalized) linear model to predict y
n, p = X.shape
if n - 2 * p > 10 and p < 1500:
model = linear_model.LinearRegression()
else:
model = linear_model.Lasso(
alpha=0.1 * np.sqrt(np.log(p) / n),
max_iter=100
)
# Concatenating features shows no preference between X, Xk
features = np.concatenate([X, Xk], axis=1)
perm_inds, _ = utilities.random_permutation_inds(2 * p)
features = features[:, perm_inds]
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model.fit(features, y)
resid = np.power(model.predict(features) - y, 2).sum()
return resid / max(10, n - np.sum(model.coef_ != 0))
# ------------------------------ Lasso Stuff ---------------------------------------#
[docs]def calc_lars_path(X, Xk, y, groups=None, **kwargs):
"""
Calculates locations at which X/knockoffs enter lasso
model when regressed on y.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
**kwargs
kwargs for ``sklearn.linear_model.lars_path``
Returns
-------
Z : np.ndarray
``(2p,)``-shaped array indicating the lasso path statistic
for each variable. (This means the maximum lambda such that
the lasso coefficient on variable j is nonzero.)
"""
# Ignore y_dist kwargs (residual)
if "y_dist" in kwargs:
kwargs.pop("y_dist")
# Bind data
p = X.shape[1]
features = np.concatenate([X, Xk], axis=1)
# Randomize coordinates to make sure everything is symmetric
inds, rev_inds = utilities.random_permutation_inds(2 * p)
features = features[:, inds]
# By default, all variables are their own group
if groups is None:
groups = np.arange(1, p + 1, 1)
# Fit
alphas, _, coefs = linear_model.lars_path(features, y, method="lasso", **kwargs,)
# Calculate places where features enter the model
Z = np.zeros(2 * p)
for i in range(2 * p):
if (coefs[i] != 0).sum() == 0:
Z[i] = 0
else:
Z[i] = alphas[np.where(coefs[i] != 0)[0][0]]
return Z[rev_inds]
[docs]def fit_lasso(X, Xk, y, y_dist=None, alphas=None, use_lars=False, **kwargs):
"""
Fits cross-validated lasso on [X, Xk] and y.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix.
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
y_dist : str
One of "binomial" or "gaussian"
alphas : float
The regularization parameter(s) to try. Selects one
alpha using cross-validation by default.
use_lars : bool
If True, uses a LARS-based solver for Gaussian data.
If False, uses a gradient based solver (default).
**kwargs
kwargs for sklearn model.
Notes
-----
To avoid FDR control violations, this randomly permutes
the features before fitting the lasso.
Returns
-------
gl : sklearn Lasso/LassoCV/LassoLarsCV/LogisticRegressionCV object
The sklearn model fit through cross-validation.
inds : np.ndarray
``(2p,)``-dimensional array of indices representing the random
permutation applied to the concatenation of [X, Xk] before fitting
``gl.``
rev_inds : np.ndarray:
Indices which reverse the effect of ``inds.`` In particular, if
M is any ``(n, 2p)``-dimensional array, then ```M==M[:, inds][:, rev_inds]```
"""
# Parse some kwargs/defaults
max_iter = kwargs.pop("max_iter", 500)
tol = kwargs.pop("tol", 1e-3)
cv = kwargs.pop("cv", 5)
if alphas is None:
alphas = DEFAULT_REG_VALS
if isinstance(alphas, float) or isinstance(alphas, int):
alphas = [alphas]
if y_dist is None:
y_dist = parse_y_dist(y)
# Bind data
p = X.shape[1]
features = np.concatenate([X, Xk], axis=1)
# Randomize coordinates to make sure everything is symmetric
inds, rev_inds = utilities.random_permutation_inds(2 * p)
features = features[:, inds]
# Fit lasso
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if y_dist == "gaussian":
if not use_lars and len(alphas) == 1:
gl = linear_model.Lasso(
alpha=alphas[0],
max_iter=max_iter,
tol=tol,
**kwargs
).fit(features, y)
elif not use_lars:
gl = linear_model.LassoCV(
alphas=alphas,
cv=cv,
verbose=False,
max_iter=max_iter,
tol=tol,
**kwargs,
).fit(features, y)
elif use_lars:
gl = linear_model.LassoLarsCV(
cv=cv, verbose=False, max_iter=max_iter, **kwargs,
).fit(features, y)
elif y_dist == "binomial":
gl = linear_model.LogisticRegressionCV(
Cs=1 / np.array(alphas),
penalty="l1",
max_iter=max_iter,
tol=tol,
cv=cv,
verbose=False,
solver="liblinear",
**kwargs,
).fit(features, y)
else:
raise ValueError(f"y_dist must be one of gaussian, binomial, not {y_dist}")
return gl, inds, rev_inds
[docs]def fit_ridge(X, Xk, y, y_dist=None, **kwargs):
"""
Fits cross-validated ridge on [X, Xk] and y.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
y_dist : str
One of "binomial" or "gaussian"
**kwargs
kwargs for sklearn model.
Notes
-----
To avoid FDR control violations, this randomly permutes
the features before fitting the ridge.
Returns
-------
gl : sklearn.linear_model.RidgeCV/LogisticRegressionCV
The sklearn model fit through cross-validation.
inds : np.ndarray
``(2p,)``-dimensional array of indices representing the random
permutation applied to the concatenation of [X, Xk] before fitting
``gl.``
rev_inds : np.ndarray:
Indices which reverse the effect of ``inds.`` In particular, if
M is any ``(n, 2p)``-dimensional array, then ```M==M[:, inds][:, rev_inds]```
"""
# Bind data
p = X.shape[1]
features = np.concatenate([X, Xk], axis=1)
# Randomize coordinates to ensure antisymmetry
inds, rev_inds = utilities.random_permutation_inds(2 * p)
features = features[:, inds]
# Fit lasso
with warnings.catch_warnings():
warnings.simplefilter('ignore')
if y_dist == "gaussian":
ridge = linear_model.RidgeCV(
alphas=DEFAULT_REG_VALS,
store_cv_values=True,
scoring="neg_mean_squared_error",
**kwargs,
).fit(features, y)
elif y_dist == "binomial":
ridge = linear_model.LogisticRegressionCV(
Cs=1 / DEFAULT_REG_VALS, penalty="l2", solver="liblinear", **kwargs,
).fit(features, y)
else:
raise ValueError(f"y_dist must be one of gaussian, binomial, not {y_dist}")
return ridge, inds, rev_inds
[docs]class FeatureStatistic:
"""
The base knockoff feature statistic class, which can wrap any
generic prediction algorithm.
Parameters
----------
model :
An instance of a class with a "train" or "fit" method
and a "predict" method.
Attributes
----------
model :
A (predictive) model class underlying the variable importance
measures.
inds : np.ndarray
``(2p,)``-dimensional array of indices representing the random
permutation applied to the concatenation of [X, Xk] before fitting
``gl.``
rev_inds : np.ndarray:
Indices which reverse the effect of ``inds.`` In particular, if
M is any ``(n, 2p)``-dimensional array, then ```M==M[:, inds][:, rev_inds]```
score : float
A metric of the model's performance, as defined by ``score_type``.
score_type : str
One of MSE, CVMSE, accuracy, or cvaccuracy. (cv stands for
cross-validated)
Z : np.ndarray
a ``2p``-dimsional array of feature and knockoff importances. The
first p coordinates correspond to features, the last p correspond
to knockoffs.
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
"""
def __init__(self, model=None):
self.model = model # sklearn/statsmodels/pyglmnet model
self.inds = None # permutation of features
self.rev_inds = None # reverse permutation of features
self.score = None # E.g. MSE/CV MSE model
self.score_type = None # One of MSE, CVMSE, or accuracy
self.Z = None # Z statistic
self.groups = None # Grouping of features for group knockoffs
self.W = None # W statistic
[docs] def fit(
self,
X,
Xk,
y,
groups=None,
feature_importance="swap",
antisym="cd",
group_agg="avg",
**kwargs,
):
"""
Trains the model and creates feature importances.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
feature_importance : str
Specifies how to create feature importances from ``model``.
Two options:
- "swap": The default swap-statistic from
http://proceedings.mlr.press/v89/gimenez19a/gimenez19a.pdf.
These are good measures of feature importance but
slightly slower.
- "swapint": The swap-integral defined from
http://proceedings.mlr.press/v89/gimenez19a/gimenez19a.pdf
Defaults to 'swap'
antisym : str
The antisymmetric function used to create (ungrouped) feature
statistics. Three options:
- "CD" (Difference of absolute vals of coefficients),
- "SM" (signed maximum).
- "SCD" (Simple difference of coefficients - NOT recommended)
group_agg : str
For group knockoffs, specifies how to turn individual feature
statistics into grouped feature statistics. Two options:
"sum" and "avg".
**kwargs : **dict
kwargs to pass to the 'train' or 'fit' method of the model.
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
if self.model is None:
raise ValueError(
"For base feature statistic class, must provide a trainable model class instance."
)
# Permute features to prevent FDR control violations
n = X.shape[0]
p = X.shape[1]
features = np.concatenate([X, Xk], axis=1)
self.inds, self.rev_inds = utilities.random_permutation_inds(2 * p)
features = features[:, self.inds]
# Train model
if hasattr(self.model, "train"):
self.model.train(features, y, **kwargs)
elif hasattr(self.model, "fit"):
self.model.fit(features, y, **kwargs)
else:
raise ValueError(
f"model {self.model} must have either a 'fit' or 'train' method"
)
# Score using swap importances
if feature_importance == "swap":
self.Z = self.swap_feature_importances(features, y)
elif feature_importance == "swapint":
self.Z = self.swap_path_feature_importances(features, y)
else:
raise ValueError(f"Unrecognized feature_importance {feature_importance}")
# Combine Z statistics
self.groups = groups
self.W = combine_Z_stats(
self.Z, self.groups, antisym=antisym, group_agg=group_agg
)
return self.W
[docs] def swap_feature_importances(self, features, y):
"""
Given a model of the features and y, calculates feature importances
as follows.
For feature i, replace the feature with its knockoff and calculate
the relative increase in the loss. Similarly, for knockoff i,
replace the knockoffs with its feature and calculate the relative
increase in the loss.
Parameters
----------
features : np.ndarray
``(n, 2p)``-shaped array of concatenated features and knockoffs,
which must be permuted by ``self.inds``.
y : np.ndarray
``(n,)``-shaped response vector
Returns
-------
Z_swap : np.ndarray
``(2p,)``-shaped array of variable importances such that
Z_swap is in the same permuted as ``features`` initially were.
"""
# Parse aspects of the DGP
n = features.shape[0]
p = int(features.shape[1] / 2)
y_dist = parse_y_dist(y)
# Iteratively replace columns with their knockoffs/features
Z_swap = np.zeros(2 * p)
for i in range(p):
for knockoff in [0, 1]:
# Unshuffle features and replace column
new_features = features[:, self.rev_inds].copy()
col = i + knockoff * p # The column we calculate the score for
partner = i + (1 - knockoff) * p # its corresponding feature/knockoff
new_features[:, col] = new_features[:, partner]
new_features = new_features[:, self.inds] # Reshuffle cols for model
# Calculate loss
Z_swap[col] = self.score_model(new_features, y, y_dist=y_dist)
return Z_swap
[docs] def swap_path_feature_importances(self, features, y, step_size=0.5, max_lambda=5):
"""
Similar to ``swap_feature_importances``; see
http://proceedings.mlr.press/v89/gimenez19a/gimenez19a.pdf
"""
# Parse aspects of the DGP
n = features.shape[0]
p = int(features.shape[1] / 2)
y_dist = parse_y_dist(y)
# Baseline loss
baseline_loss = self.score_model(features, y, y_dist=y_dist)
# Iteratively replace columns with their knockoffs/features
lambda_vals = []
lambd = step_size
while lambd <= max_lambda:
lambda_vals.append(lambd)
lambd += step_size
lambda_vals = np.array(lambda_vals)
# DP approach to calculating area
Z_swap_lambd_prev = np.zeros(2 * p) + baseline_loss
Z_swap_int = np.zeros(2 * p)
for lambd in lambda_vals:
for i in range(p):
for knockoff in [0, 1]:
# Unshuffle features and replace column
new_features = features[:, self.rev_inds].copy()
col = i + knockoff * p # The column we calculate the score for
partner = (
i + (1 - knockoff) * p
) # its corresponding feature/knockoff
# Calc new col as linear interpolation btwn col and partner
fk_diff = new_features[:, partner] - new_features[:, col]
new_col = new_features[:, col] + lambd * (fk_diff)
# Set new column and reshuffle
new_features[:, col] = new_col
new_features = new_features[:, self.inds] # Reshuffle
# Calculate area under curve
loss = self.score_model(new_features, y, y_dist=y_dist)
avg_trapezoid_height = (Z_swap_lambd_prev[col] + loss) / 2
Z_swap_int[col] += step_size * avg_trapezoid_height
# Cache for DP
Z_swap_lambd_prev[col] = loss
return Z_swap_int
[docs] def score_model(self, features, y, y_dist=None):
"""
Computes mean-squared error of self.model on
(features, y) when y is nonbinary, and computes
1 - accuracy otherwise.
Returns
-------
loss : float
Either the MSE or one minus the accuracy of the model,
depending on whether y is continuous or binary.
"""
# Make sure model exists
if self.model is None:
raise ValueError("Must train self.model before calling model_training_loss")
# Parse y distribution
if y_dist is None:
y_dist = parse_y_dist(y)
# MSE for gaussian data
if y_dist == "gaussian":
preds = self.model.predict(features)
loss = np.power(preds - y, 2).mean()
# 1- accuracy for binomial data
elif y_dist == "binomial":
preds = self.model.predict(features)
accuracy = (preds == y).mean()
loss = 1 - accuracy
# log_probs = self.model.predict_log_proba(features)
# log_probs[log_probs == -np.inf] = -10 # Numerical errors
# # TODO: should normalize
# loss = -1*log_probs[:, y.astype('int32')].mean()
else:
raise ValueError(f"Unexpected y_dist = {y_dist}")
return loss
[docs] def cv_score_model(self, features, y, cv_score, logistic_flag=False):
"""
Similar to score_model, but uses cross-validated scoring if cv_score=True.
"""
# Possibly, compute CV MSE/Accuracy, although this
# can be very expensive (e.g. for LARS solver)
if cv_score:
if isinstance(self.model, sklearn.base.RegressorMixin):
# Parse whether to use MSE or Accuracy
if logistic_flag:
self.score_type = "accuracy_cv"
scoring = "accuracy"
else:
self.score_type = "mse_cv"
scoring = "neg_mean_squared_error"
cv_scores = model_selection.cross_val_score(
self.model, features, y, cv=5, scoring=scoring,
)
# Adjust negative mse to be positive
if scoring == "neg_mean_squared_error":
cv_scores = -1 * cv_scores
# Take the mean
self.score = cv_scores.mean()
else:
raise ValueError(
f"Model is of {type(self.model)}, must be sklearn estimator for cvscoring"
)
else:
if logistic_flag:
y_dist = "binomial"
self.score_type = "log_likelihood"
else:
y_dist = "gaussian"
self.score_type = "mse"
self.score = -1 * self.score_model(features, y, y_dist=y_dist)
[docs]class RidgeStatistic(FeatureStatistic):
""" Ridge statistic wrapper class """
def __init__(self):
super().__init__()
[docs] def fit(
self,
X,
Xk,
y,
groups=None,
antisym="cd",
group_agg="avg",
cv_score=False,
**kwargs,
):
"""
Wraps the FeatureStatistic class but uses cross-validated Ridge
coefficients as variable importances.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
antisym : str
The antisymmetric function used to create (ungrouped) feature
statistics. Three options:
- "CD" (Difference of absolute vals of coefficients),
- "SM" (signed maximum).
- "SCD" (Simple difference of coefficients - NOT recommended)
group_agg : str
For group knockoffs, specifies how to turn individual feature
statistics into grouped feature statistics. Two options:
"sum" and "avg".
cv_score : bool
If true, score the feature statistic's predictive accuracy
using cross validation.
kwargs : dict
Extra kwargs to pass to underlying Lasso classes
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
# Possibly set default groups
n = X.shape[0]
p = X.shape[1]
if groups is None:
groups = np.arange(1, p + 1, 1)
# Check if y_dist is gaussian, binomial
y_dist = parse_y_dist(y)
kwargs["y_dist"] = y_dist
# Step 1: Calculate Z stats by fitting ridge
self.model, self.inds, self.rev_inds = fit_ridge(X=X, Xk=Xk, y=y, **kwargs,)
# Retrieve Z statistics and save cv scores
if y_dist == "gaussian":
Z = self.model.coef_[self.rev_inds]
self.score = -1 * self.model.cv_values_.mean(axis=1).min()
self.score_type = "mse_cv"
elif y_dist == "binomial":
Z = self.model.coef_[0, self.rev_inds]
self.score = self.model.scores_[1].mean(axis=0).max()
self.score_type = "accuracy_cv"
else:
raise ValueError(
f"y_dist must be one of gaussian, binomial, not {kwargs['y_dist']}"
)
# Combine Z statistics
W_group = combine_Z_stats(Z, groups, antisym=antisym, group_agg=group_agg)
# Save values for later use
self.Z = Z
self.groups = groups
self.W = W_group
return W_group
[docs]class LassoStatistic(FeatureStatistic):
""" Lasso Statistic wrapper class """
def __init__(self):
super().__init__()
[docs] def fit(
self,
X,
Xk,
y,
groups=None,
zstat="coef",
antisym="cd",
group_agg="avg",
cv_score=False,
debias=False,
Ginv=None,
**kwargs,
):
"""
Wraps the FeatureStatistic class but uses cross-validated Lasso
coefficients or Lasso path statistics as variable importances.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
zstat : str:
Two options for the variable importance measure:
- If 'coef', uses to cross-validated (group) lasso coefficients.
- If 'lars_path', uses the lambda value where each feature/knockoff
enters the lasso path (meaning becomes nonzero).
This defaults to coef.
antisym : str
The antisymmetric function used to create (ungrouped) feature
statistics. Three options:
- "CD" (Difference of absolute vals of coefficients),
- "SM" (signed maximum).
- "SCD" (Simple difference of coefficients - NOT recommended)
group_agg : str
For group knockoffs, specifies how to turn individual feature
statistics into grouped feature statistics. Two options:
"sum" and "avg".
cv_score : bool
If true, score the feature statistic's predictive accuracy
using cross validation.
debias : bool:
If true, debias the lasso. See https://arxiv.org/abs/1508.02757
Ginv : np.ndarray
``(2p, 2p)``-shaped precision matrix for the feature-knockoff
covariate distribution. This must be specified if ``debias=True``.
kwargs : dict
Extra kwargs to pass to underlying Lasso classes.
Notes
-----
When not using the group lasso, one can specify choice(s) of
regularization parameter using the ``alphas`` keyword argument.
Otherwise, use the ``reg_vals`` keyword argument.
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
# Possibly set default groups
n = X.shape[0]
p = X.shape[1]
if groups is None:
groups = np.arange(1, p + 1, 1)
# Check if y_dist is gaussian, binomial, poisson
kwargs["y_dist"] = kwargs.get("y_dist", parse_y_dist(y))
# Step 1: Calculate Z statistics
zstat = str(zstat).lower()
if zstat == "coef":
# Fit (possibly group) lasso
gl, inds, rev_inds = fit_lasso(
X=X,
Xk=Xk,
y=y,
**kwargs,
)
# Parse the expected output format
logistic_flag = parse_logistic_flag(kwargs)
# Retrieve Z statistics
if logistic_flag:
if gl.coef_.shape[0] != 1:
raise ValueError(
"Unexpected shape for logistic lasso coefficients (sklearn)"
)
Z = gl.coef_[0, rev_inds]
else:
Z = gl.coef_[rev_inds]
# Possibly debias the lasso
if debias:
if Ginv is None:
raise ValueError(f"To debias the lasso, Ginv must be provided")
elif logistic_flag:
raise ValueError(
f"Debiased lasso is not implemented for binomial data"
)
else:
features = np.concatenate([X, Xk], axis=1)
debias_term = np.dot(Ginv, features.T)
debias_term = np.dot(debias_term, y - np.dot(features, Z))
Z = Z + debias_term / n
# Save lasso class and reverse inds
self.model = gl
self.inds = inds
self.rev_inds = rev_inds
# Try to save cv accuracy for logistic lasso
if isinstance(self.model, linear_model.LogisticRegressionCV):
self.score = self.model.scores_[1].mean(axis=0).max()
self.score_type = "accuracy_cv"
# Save cv mse for lasso
elif isinstance(self.model, linear_model.LassoCV):
self.score = self.model.mse_path_.mean(axis=1).min()
self.score_type = "mse_cv"
# Else compute the score
else:
features = np.concatenate([X, Xk], axis=1)[:, inds]
self.cv_score_model(
features=features,
y=y,
cv_score=cv_score,
logistic_flag=logistic_flag,
)
elif zstat == "lars_path":
Z = calc_lars_path(X, Xk, y, groups, **kwargs)
else:
raise ValueError(f'zstat ({zstat}) must be one of "coef", "lars_path"')
# Combine Z statistics
W_group = combine_Z_stats(Z, groups, antisym=antisym, group_agg=group_agg)
# Save values for later use
self.Z = Z
self.groups = groups
self.W = W_group
return W_group
[docs]class MargCorrStatistic(FeatureStatistic):
""" Lasso Statistic wrapper class """
def __init__(self):
super().__init__()
[docs] def fit(
self, X, Xk, y, groups=None, **kwargs,
):
"""
Wraps the FeatureStatistic class using marginal correlations
between X, Xk and y as variable importances.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
kwargs : dict
Extra kwargs to pass to underlying ``combine_Z_stats``
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
# Calc correlations
features = np.concatenate([X, Xk], axis=1)
correlations = np.corrcoef(features, y.reshape(-1, 1), rowvar=False)[-1][0:-1]
# Combine
W = combine_Z_stats(correlations, groups, **kwargs)
# Cache
self.Z = correlations
self.groups = groups
self.W = W
return W
[docs]class OLSStatistic(FeatureStatistic):
""" Lasso Statistic wrapper class """
def __init__(self):
super().__init__()
[docs] def fit(self, X, Xk, y, groups=None, cv_score=False, **kwargs):
"""
Wraps the FeatureStatistic class with OLS coefs as variable importances.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
cv_score : bool
If true, score the feature statistic's predictive accuracy
using cross validation.
kwargs : dict
Extra kwargs to pass to ``combine_Z_stats``.
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
# Run linear regression, permute indexes to prevent FDR violations
p = X.shape[1]
features = np.concatenate([X, Xk], axis=1)
inds, rev_inds = utilities.random_permutation_inds(2 * p)
features = features[:, inds]
lm = linear_model.LinearRegression(fit_intercept=False).fit(features, y)
Z = lm.coef_
# Play with shape
Z = Z.reshape(-1)
if Z.shape[0] != 2 * p:
raise ValueError(
f"Unexpected shape {Z.shape} for sklearn LinearRegression coefs (expected ({2*p},))"
)
# Undo random permutation
Z = Z[rev_inds]
# Combine with groups to create W-statistic
W = combine_Z_stats(Z, groups, **kwargs)
# Save
self.model = lm
self.inds = inds
self.rev_inds = rev_inds
self.Z = Z
self.groups = groups
self.W = W
# Score model
self.cv_score_model(
features=features, y=y, cv_score=cv_score,
)
return W
[docs]class RandomForestStatistic(FeatureStatistic):
[docs] def fit(
self,
X,
Xk,
y,
groups=None,
feature_importance="swap",
antisym="cd",
group_agg="sum",
cv_score=False,
**kwargs,
):
"""
Wraps the FeatureStatistic class using a Random Forest to
generate variable importances.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
feature_importance : str
Specifies how to create feature importances from ``model``.
Three options:
- "sklearn": Use sklearn feature importances. These
are very poor measures of feature importance, but
very fast.
- "swap": The default swap-statistic from
http://proceedings.mlr.press/v89/gimenez19a/gimenez19a.pdf.
These are good measures of feature importance but
slightly slower.
- "swapint": The swap-integral defined from
http://proceedings.mlr.press/v89/gimenez19a/gimenez19a.pdf
Defaults to 'swap'
antisym : str
The antisymmetric function used to create (ungrouped) feature
statistics. Three options:
- "CD" (Difference of absolute vals of coefficients),
- "SM" (signed maximum).
- "SCD" (Simple difference of coefficients - NOT recommended)
group_agg : str
For group knockoffs, specifies how to turn individual feature
statistics into grouped feature statistics. Two options:
"sum" and "avg".
cv_score : bool
If true, score the feature statistic's predictive accuracy
using cross validation. This is extremely expensive for random
forests.
kwargs : dict
Extra kwargs to pass to underlying RandomForest class
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
# Bind data
p = X.shape[1]
features = np.concatenate([X, Xk], axis=1)
# Randomize coordinates to make sure everything is symmetric
self.inds, self.rev_inds = utilities.random_permutation_inds(2 * p)
features = features[:, self.inds]
# By default, all variables are their own group
if groups is None:
groups = np.arange(1, p+1)
self.groups = groups
# Parse y_dist, initialize model
y_dist = parse_y_dist(y)
# Avoid future warnings
if "n_estimators" not in kwargs:
kwargs["n_estimators"] = 10
if y_dist == "gaussian":
self.model = ensemble.RandomForestRegressor(**kwargs)
else:
self.model = ensemble.RandomForestClassifier(**kwargs)
# Fit model, get Z statistics.
# Note this does the randomization of features by itself.
self.model.fit(features, y)
feature_importance = str(feature_importance).lower()
if feature_importance == "default":
self.Z = self.model.feature_importances_[self.rev_inds]
elif feature_importance == "swap":
self.Z = self.swap_feature_importances(features, y)
elif feature_importance == "swapint":
self.Z = self.swap_path_feature_importances(features, y)
else:
raise ValueError(
f"feature_importance {feature_importance} must be one of 'swap', 'swapint', 'default'"
)
# Get W statistics
self.W = combine_Z_stats(
self.Z, self.groups, antisym=antisym, group_agg=group_agg
)
# Possibly score model
self.cv_score_model(features=features, y=y, cv_score=cv_score)
return self.W
[docs]class DeepPinkStatistic(FeatureStatistic):
[docs] def fit(
self,
X,
Xk,
y,
groups=None,
feature_importance="deeppink",
antisym="cd",
group_agg="sum",
cv_score=False,
train_kwargs={"verbose": False},
**kwargs,
):
"""
Wraps the FeatureStatistic class using DeepPINK to generate
variable importances.
Parameters
----------
X : np.ndarray
the ``(n, p)``-shaped design matrix
Xk : np.ndarray
the ``(n, p)``-shaped matrix of knockoffs
y : np.ndarray
``(n,)``-shaped response vector
groups : np.ndarray
For group knockoffs, a p-length array of integers from 1 to
num_groups such that ``groups[j] == i`` indicates that variable `j`
is a member of group `i`. Defaults to None (regular knockoffs).
feature_importance : str
Specifies how to create feature importances from ``model``.
Four options:
- "deeppink": Use the deeppink feature importance
defined in https://arxiv.org/abs/1809.01185
- "unweighted": Use the Z weights from the deeppink
paper without weighting them using the layers from
the MLP. Deeppink usually outperforms this feature
importance (but not always).
- "swap": The default swap-statistic from
http://proceedings.mlr.press/v89/gimenez19a/gimenez19a.pdf
- "swapint": The swap-integral defined from
http://proceedings.mlr.press/v89/gimenez19a/gimenez19a.pdf
Defaults to deeppink, which is often both the most powerful and
the most computationally efficient.
antisym : str
The antisymmetric function used to create (ungrouped) feature
statistics. Three options:
- "CD" (Difference of absolute vals of coefficients),
- "SM" (signed maximum).
- "SCD" (Simple difference of coefficients - NOT recommended)
group_agg : str
For group knockoffs, specifies how to turn individual feature
statistics into grouped feature statistics. Two options:
"sum" and "avg".
cv_score : bool
If true, score the feature statistic's predictive accuracy
using cross validation. This is extremely expensive for deeppink.
kwargs : dict
Extra kwargs to pass to underlying deeppink class (in kpytorch)
Returns
-------
W : np.ndarray
an array of feature statistics. This is ``(p,)``-dimensional
for regular knockoffs and ``(num_groups,)``-dimensional for
group knockoffs.
"""
# Check if kpytorch (and therefore deeppink) is available.
utilities.check_kpytorch_available(purpose="deepPINK statistics")
from .kpytorch import deeppink
# Bind data
n = X.shape[0]
p = X.shape[1]
features = np.concatenate([X, Xk], axis=1)
# The deeppink model class will shuffle statistics,
# but for compatability we create indices anyway
self.inds = np.arange(2*p)
self.rev_inds = self.inds
# By default, all variables are their own group
if groups is None:
groups = np.arange(1, p+1)
self.groups = groups
# Parse y_dist, hidden_sizes, initialize model
y_dist = parse_y_dist(y)
if "hidden_sizes" not in kwargs:
kwargs["hidden_sizes"] = [min(n, p)]
self.model = deeppink.DeepPinkModel(p=p, **kwargs)
# Train model
self.model.train()
self.model = deeppink.train_deeppink(self.model, features, y, **train_kwargs)
self.model.eval()
# Get Z statistics
feature_importance = str(feature_importance).lower()
if feature_importance == "deeppink":
self.Z = self.model.feature_importances(weight_scores=True)
elif feature_importance == "unweighted":
self.Z = self.model.feature_importances(weight_scores=False)
elif feature_importance == "swap":
self.Z = self.swap_feature_importances(features, y)
elif feature_importance == "swapint":
self.Z = self.swap_path_feature_importances(features, y)
else:
raise ValueError(
f"feature_importance {feature_importance} must be one of 'deeppink', 'unweighted', 'swap', 'swapint'"
)
# Get W statistics
self.W = combine_Z_stats(
self.Z, self.groups, antisym=antisym, group_agg=group_agg
)
# Possibly score model
self.cv_score_model(features=features, y=y, cv_score=cv_score)
return self.W
[docs]def data_dependent_threshhold(W, fdr=0.10, offset=1):
"""
Calculate data-dependent threshhold given W statistics.
Parameters
----------
W : np.ndarray
p-length numpy array of feature statistics OR (p, batch_length)
shaped array.
fdr : float
desired level of false discovery rate control
offset : int
If offset = 0, control the modified FDR.
If offset = 1 (default), controls the FDR exactly.
Returns
-------
T : float or np.ndarray
The data-dependent threshhold. Either a float or a (batch_length,)
dimensional array.
"""
# Non-batched result.
if len(W.shape) == 1:
# sort by abs values
absW = np.abs(W)
inds = np.argsort(-absW)
negatives = np.cumsum(W[inds] <= 0)
positives = np.cumsum(W[inds] > 0)
positives[positives == 0] = 1 # Don't divide by 0
# calc hat fdrs
hat_fdrs = (negatives + offset) / positives
# Maximum threshold such that hat_fdr <= nominal level
if np.any(hat_fdrs <= fdr):
T = absW[inds[np.where(hat_fdrs <= fdr)[0].max()]]
if T == 0:
T = np.min(W[W > 0])
else:
T = np.inf
return T
# batch (may be depreciated)
else:
return np.array([
data_dependent_threshhold(W[:, j], fdr=fdr, offset=offset)
for j in range(W.shape[1])
])