"""
Knockoffs for detecting edges in gaussian graphical models.
See https://arxiv.org/pdf/1908.11611.pdf.
"""
import copy
import numpy as np
from .knockoff_filter import KnockoffFilter as KF
[docs]def discovered_edges(W, T, logic='and'):
"""
Parameters
----------
W : np.array
``(p,p)``-shaped array of knockoff statistics
with zeros along the diagonal. The rows of W must obey
the flip-sign property.
T : np.array
``(p,)``-shaped array of knockoff thresholds
logic : string
One of 'and' or 'or'. This is a hyperparameter used to
determine the rejection set. Defaults to "and".
Returns
-------
edges : np.array
``(p,p)``-shaped symmetric boolean array where
edges[i,j] is true if and only if edge (i,j) has been
discovered.
"""
edges = W >= T.reshape(-1, 1)
if logic == 'and':
edges = edges & edges.T
elif logic == 'or':
edges = edges | edges.T
else:
raise ValueError(f"logic={logic} must be one of 'and', 'or'")
return edges
[docs]def compute_ggm_threshold(W, fdr=0.1, logic='and', a=1, offset=1):
"""
Parameters
----------
W : np.array
``(p,p)``-shaped array of knockoff statistics
with zeros along the diagonal. The rows of W must obey
the flip-sign property.
fdr : float
Desired level of FDR control.
logic : string
One of 'and' or 'or'. This is a hyperparameter used to
determine the rejection set. Defaults to "and".
a : float
One of 0.01 or 1.
Hyperparameter used to determine the rejection threshold.
See Li and Maathuis (2019) for discussion.
offset : int
If offset = 0, control the modified FDR.
If offset = 1 (default), controls the FDR exactly.
Returns
-------
T : np.array
``(p,)``-shaped array of knockoff thresholds
Notes
-----
See Algorithm 2, https://arxiv.org/pdf/1908.11611.pdf
"""
p = W.shape[0]
eps = W[W > 0].min() / 2 # smallest threshold
if np.any(np.diag(W) != 0):
raise ValueError("W matrix must have a diagonal of all zeros")
if a == 1.0:
ca = 1.93
elif a == 0.01:
ca = 102
else:
raise ValueError(f"a={a} must be one of [0.01,1.0]")
# Compute start of loop
logic = str(logic).lower()
if logic == 'and':
mmax = fdr * (p-1) / ca - a * offset
qi = 2 * fdr / (ca * p)
elif logic == 'or':
mmax = fdr * (p-1) / (2*ca) - a * offset
qi = fdr / (ca * p)
else:
raise ValueError(f"logic={logic} must be one of 'and', 'or'")
mmax = int(np.floor(mmax))
if mmax < 0:
return np.zeros(p) + np.inf
# Preprocess signs
inds = np.argsort(-1*np.abs(W), axis=1)
sortW = np.take_along_axis(W, inds, axis=1)
cumpos = np.cumsum(sortW > 0, axis=1)
cumneg = np.cumsum(sortW < 0, axis=1)
# Loop through thresholds. `ms` indexes the number
# of negatives allowed.
ms = np.flip(np.arange(0, mmax+1, 1))
for m in ms:
# This gives the last index less than or equal to m
Tinds = np.argmin(cumneg <= m, axis=1) - 1
# Construct thresholds based on Tinds
T = np.abs(sortW[np.arange(p), Tinds])
# if all indices are <= m
all_leq_m = np.all(cumneg <= m, axis=1)
T[Tinds == -1] = np.inf # if no index is less than or equal to m
T[(Tinds == -1) & all_leq_m] = eps
# Create set of discovered edges
edges = discovered_edges(W=W, T=T, logic=logic)
ndisc = edges.sum() # is this double-counting? I don't think so, but am not sure. If so divide by 2.
#print(m, (a + m) / max(ndisc, 1.0), qi)
if (a + m) / max(ndisc, 1.0) < qi:
return T
# If no feasible solution, return nothing
return np.zeros(p) + np.inf
[docs]class KnockoffGGM:
"""
Tests for edges in a Gaussian Graphical Model.
See Li and Maathuis (2019) for details.
Parameters
----------
fstat : str
The feature statistic to use in the knockoff filter: this
must be a string and must be a valid fixed-X knockoff
feature statistic. Identifiers include:
- 'lasso' or 'lcd': lasso coefficients differences
- 'lsm': signed maximum of the lasso path statistic as
in Barber and Candes 2015
- 'ols': Ordinary least squares coefficients
- 'margcorr': marginal correlations between features and response
fstat_kwargs : dict
Kwargs to pass to the feature statistic ``fit`` function,
excluding the required arguments, defaults to {}
knockoff_kwargs : dict
Kwargs for instantiating the knockoff sampler argument if
the ksampler argument is a string identifier. Defaults to {}
Attributes
----------
W : np.array
``(p,p)``-shaped array of knockoff statistics
with zeros along the diagonal. The rows of W obey
the flip-sign property, i.e., W[0] obeys the flip-sign
property.
kfs : list
A list of KnockoffFilter classes corresponding to the regression
run on each covariate.
Notes
-----
There is no known way to use model-X knockoffs for this application.
Examples
--------
Here we fit KnockoffGGM under the global null when the true Gaussian
graphical model has no edges:
# Fake data-generating process for Gaussian graphical model
import numpy as np
X = np.random.randn(300, 30)
# LCD statistic with FX knockoffs
from knockpy.ggm import KnockoffGGM
gkf = KnockoffGGM(
fstat='lcd',
knockoff_kwargs={"method":"mvr"},
)
edges = gkf.forward(X=X, verbose=True)
"""
def __init__(self, fstat='lcd', fstat_kwargs=None, knockoff_kwargs=None):
self.fstat = fstat
self.fstat_kwargs = fstat_kwargs if fstat_kwargs is not None else {}
self.knockoff_kwargs = knockoff_kwargs if knockoff_kwargs is not None else {}
[docs] def forward(self, X, logic='and', fdr=0.1, ggm_kwargs=None, verbose=True):
"""
Runs the GGM filter by applying fixed-X knockoffs
to each column of X using the other columns as covariates.
Parameters
----------
X : np.ndarray
``(n, p)``-shaped design matrix.
fdr : float
Nominal level at which to control the FDR.
logic : string
One of 'and' or 'or'. This is a hyperparameter used to
determine the rejection set. Defaults to "and".
ggm_kwargs : dict
Dictionary of hyperparameters to pass to the
``ggm.compute_ggm_threshold`` function. Defaults to {}.
verbose : bool
If true, log progress over time.
Returns
-------
edges : np.array
``(p,p)``-shaped symmetric boolean array where
edges[i,j] is true if and only if edge (i,j) has been
discovered.
Notes
-----
This requires fitting knockoffs p times, so it is quite expensive.
"""
self.n, self.p = X.shape
self.X = X
self.logic = logic
self.fdr = fdr
self.ggm_kwargs = ggm_kwargs if ggm_kwargs is not None else {}
self.kfs = []
self.Ws = np.zeros((self.p, self.p))
# Loop through columns to fit knockoffs
for j in range(self.p):
kf = KF(
ksampler='fx',
fstat=self.fstat,
knockoff_kwargs=copy.copy(self.knockoff_kwargs),
fstat_kwargs=copy.copy(self.fstat_kwargs)
)
negj = [i for i in range(self.p) if i != j]
Xnegj = X[:, negj]
kf.forward(
X=Xnegj, y=X[:, j],
)
self.kfs.append(kf)
self.Ws[j][negj] = kf.W
if verbose:
print(f"Finished feature {j} of {self.p}.")
# Find threshold
self.T = compute_ggm_threshold(W=self.Ws, logic=self.logic, **self.ggm_kwargs)
# Compute rejections
self.edges = discovered_edges(W=self.Ws, T=self.T, logic=self.logic)
return self.edges