"""
The ``risk_models`` module provides functions for estimating the covariance matrix given
historical returns.
The format of the data input is the same as that in :ref:`expected-returns`.
**Currently implemented:**
- fix non-positive semidefinite matrices
- general risk matrix function, allowing you to run any risk model from one function.
- sample covariance
- semicovariance
- exponentially weighted covariance
- minimum covariance determinant
- shrunk covariance matrices:
- manual shrinkage
- Ledoit Wolf shrinkage
- Oracle Approximating shrinkage
- covariance to correlation matrix
"""
import warnings
import numpy as np
import pandas as pd
from .expected_returns import returns_from_prices
def _is_positive_semidefinite(matrix):
"""
Helper function to check if a given matrix is positive semidefinite.
Any method that requires inverting the covariance matrix will struggle
with a non-positive semidefinite matrix
:param matrix: (covariance) matrix to test
:type matrix: np.ndarray, pd.DataFrame
:return: whether matrix is positive semidefinite
:rtype: bool
"""
try:
# Significantly more efficient than checking eigenvalues (stackoverflow.com/questions/16266720)
np.linalg.cholesky(matrix + 1e-16 * np.eye(len(matrix)))
return True
except np.linalg.LinAlgError:
return False
[docs]def fix_nonpositive_semidefinite(matrix, fix_method="spectral"):
"""
Check if a covariance matrix is positive semidefinite, and if not, fix it
with the chosen method.
The ``spectral`` method sets negative eigenvalues to zero then rebuilds the matrix,
while the ``diag`` method adds a small positive value to the diagonal.
:param matrix: raw covariance matrix (may not be PSD)
:type matrix: pd.DataFrame
:param fix_method: {"spectral", "diag"}, defaults to "spectral"
:type fix_method: str, optional
:raises NotImplementedError: if a method is passed that isn't implemented
:return: positive semidefinite covariance matrix
:rtype: pd.DataFrame
"""
if _is_positive_semidefinite(matrix):
return matrix
warnings.warn(
"The covariance matrix is non positive semidefinite. Amending eigenvalues."
)
# Eigendecomposition
q, V = np.linalg.eigh(matrix)
if fix_method == "spectral":
# Remove negative eigenvalues
q = np.where(q > 0, q, 0)
# Reconstruct matrix
fixed_matrix = V @ np.diag(q) @ V.T
elif fix_method == "diag":
min_eig = np.min(q)
fixed_matrix = matrix - 1.1 * min_eig * np.eye(len(matrix))
else:
raise NotImplementedError("Method {} not implemented".format(fix_method))
if not _is_positive_semidefinite(fixed_matrix): # pragma: no cover
warnings.warn(
"Could not fix matrix. Please try a different risk model.", UserWarning
)
# Rebuild labels if provided
if isinstance(matrix, pd.DataFrame):
tickers = matrix.index
return pd.DataFrame(fixed_matrix, index=tickers, columns=tickers)
else:
return fixed_matrix
[docs]def risk_matrix(prices, method="sample_cov", **kwargs):
"""
Compute a covariance matrix, using the risk model supplied in the ``method``
parameter.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param method: the risk model to use. Should be one of:
- ``sample_cov``
- ``semicovariance``
- ``exp_cov``
- ``ledoit_wolf``
- ``ledoit_wolf_constant_variance``
- ``ledoit_wolf_single_factor``
- ``ledoit_wolf_constant_correlation``
- ``oracle_approximating``
:type method: str, optional
:raises NotImplementedError: if the supplied method is not recognised
:return: annualised sample covariance matrix
:rtype: pd.DataFrame
"""
if method == "sample_cov":
return sample_cov(prices, **kwargs)
elif method == "semicovariance" or method == "semivariance":
return semicovariance(prices, **kwargs)
elif method == "exp_cov":
return exp_cov(prices, **kwargs)
elif method == "ledoit_wolf" or method == "ledoit_wolf_constant_variance":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf()
elif method == "ledoit_wolf_single_factor":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="single_factor"
)
elif method == "ledoit_wolf_constant_correlation":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="constant_correlation"
)
elif method == "oracle_approximating":
return CovarianceShrinkage(prices, **kwargs).oracle_approximating()
else:
raise NotImplementedError("Risk model {} not implemented".format(method))
[docs]def sample_cov(prices, returns_data=False, frequency=252, **kwargs):
"""
Calculate the annualised sample covariance matrix of (daily) asset returns.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:return: annualised sample covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
return fix_nonpositive_semidefinite(
returns.cov() * frequency, kwargs.get("fix_method", "spectral")
)
[docs]def semicovariance(
prices, returns_data=False, benchmark=0.000079, frequency=252, **kwargs
):
"""
Estimate the semicovariance matrix, i.e the covariance given that
the returns are less than the benchmark.
.. semicov = E([min(r_i - B, 0)] . [min(r_j - B, 0)])
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param benchmark: the benchmark return, defaults to the daily risk-free rate, i.e
:math:`1.02^{(1/252)} -1`.
:type benchmark: float
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year). Ensure that you use the appropriate
benchmark, e.g if ``frequency=12`` use the monthly risk-free rate.
:type frequency: int, optional
:return: semicovariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
drops = np.fmin(returns - benchmark, 0)
T = drops.shape[0]
return fix_nonpositive_semidefinite(
(drops.T @ drops) / T * frequency, kwargs.get("fix_method", "spectral")
)
def _pair_exp_cov(X, Y, span=180):
"""
Calculate the exponential covariance between two timeseries of returns.
:param X: first time series of returns
:type X: pd.Series
:param Y: second time series of returns
:type Y: pd.Series
:param span: the span of the exponential weighting function, defaults to 180
:type span: int, optional
:return: the exponential covariance between X and Y
:rtype: float
"""
covariation = (X - X.mean()) * (Y - Y.mean())
# Exponentially weight the covariation and take the mean
if span < 10:
warnings.warn("it is recommended to use a higher span, e.g 30 days")
return covariation.ewm(span=span).mean().iloc[-1]
[docs]def exp_cov(prices, returns_data=False, span=180, frequency=252, **kwargs):
"""
Estimate the exponentially-weighted covariance matrix, which gives
greater weight to more recent data.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param span: the span of the exponential weighting function, defaults to 180
:type span: int, optional
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:return: annualised estimate of exponential covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
assets = prices.columns
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices)
N = len(assets)
# Loop over matrix, filling entries with the pairwise exp cov
S = np.zeros((N, N))
for i in range(N):
for j in range(i, N):
S[i, j] = S[j, i] = _pair_exp_cov(
returns.iloc[:, i], returns.iloc[:, j], span
)
cov = pd.DataFrame(S * frequency, columns=assets, index=assets)
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
def min_cov_determinant(
prices, returns_data=False, frequency=252, random_state=None, **kwargs
): # pragma: no cover
warnings.warn("min_cov_determinant is deprecated and will be removed in v1.5")
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
# Extra dependency
try:
import sklearn.covariance
except (ModuleNotFoundError, ImportError):
raise ImportError("Please install scikit-learn via pip or poetry")
assets = prices.columns
if returns_data:
X = prices
else:
X = returns_from_prices(prices)
# X = np.nan_to_num(X.values)
X = X.dropna().values
raw_cov_array = sklearn.covariance.fast_mcd(X, random_state=random_state)[1]
cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * frequency
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
[docs]def cov_to_corr(cov_matrix):
"""
Convert a covariance matrix to a correlation matrix.
:param cov_matrix: covariance matrix
:type cov_matrix: pd.DataFrame
:return: correlation matrix
:rtype: pd.DataFrame
"""
if not isinstance(cov_matrix, pd.DataFrame):
warnings.warn("cov_matrix is not a dataframe", RuntimeWarning)
cov_matrix = pd.DataFrame(cov_matrix)
Dinv = np.diag(1 / np.sqrt(np.diag(cov_matrix)))
corr = np.dot(Dinv, np.dot(cov_matrix, Dinv))
return pd.DataFrame(corr, index=cov_matrix.index, columns=cov_matrix.index)
[docs]def corr_to_cov(corr_matrix, stdevs):
"""
Convert a correlation matrix to a covariance matrix
:param corr_matrix: correlation matrix
:type corr_matrix: pd.DataFrame
:param stdevs: vector of standard deviations
:type stdevs: array-like
:return: covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(corr_matrix, pd.DataFrame):
warnings.warn("corr_matrix is not a dataframe", RuntimeWarning)
corr_matrix = pd.DataFrame(corr_matrix)
return corr_matrix * np.outer(stdevs, stdevs)
[docs]class CovarianceShrinkage:
"""
Provide methods for computing shrinkage estimates of the covariance matrix, using the
sample covariance matrix and choosing the structured estimator to be an identity matrix
multiplied by the average sample variance. The shrinkage constant can be input manually,
though there exist methods (notably Ledoit Wolf) to estimate the optimal value.
Instance variables:
- ``X`` - pd.DataFrame (returns)
- ``S`` - np.ndarray (sample covariance matrix)
- ``delta`` - float (shrinkage constant)
- ``frequency`` - int
"""
[docs] def __init__(self, prices, returns_data=False, frequency=252):
"""
:param prices: adjusted closing prices of the asset, each row is a date and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param frequency: number of time periods in a year, defaults to 252 (the number of trading days in a year)
:type frequency: int, optional
"""
# Optional import
try:
from sklearn import covariance
self.covariance = covariance
except (ModuleNotFoundError, ImportError): # pragma: no cover
raise ImportError("Please install scikit-learn via pip or poetry")
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
self.frequency = frequency
if returns_data:
self.X = prices.dropna(how="all")
else:
self.X = prices.pct_change().dropna(how="all")
self.S = self.X.cov().values
self.delta = None # shrinkage constant
def _format_and_annualize(self, raw_cov_array):
"""
Helper method which annualises the output of shrinkage calculations,
and formats the result into a dataframe
:param raw_cov_array: raw covariance matrix of daily returns
:type raw_cov_array: np.ndarray
:return: annualised covariance matrix
:rtype: pd.DataFrame
"""
assets = self.X.columns
cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * self.frequency
return fix_nonpositive_semidefinite(cov, fix_method="spectral")
[docs] def shrunk_covariance(self, delta=0.2):
"""
Shrink a sample covariance matrix to the identity matrix (scaled by the average
sample variance). This method does not estimate an optimal shrinkage parameter,
it requires manual input.
:param delta: shrinkage parameter, defaults to 0.2.
:type delta: float, optional
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
self.delta = delta
N = self.S.shape[1]
# Shrinkage target
mu = np.trace(self.S) / N
F = np.identity(N) * mu
# Shrinkage
shrunk_cov = delta * F + (1 - delta) * self.S
return self._format_and_annualize(shrunk_cov)
[docs] def ledoit_wolf(self, shrinkage_target="constant_variance"):
"""
Calculate the Ledoit-Wolf shrinkage estimate for a particular
shrinkage target.
:param shrinkage_target: choice of shrinkage target, either ``constant_variance``,
``single_factor`` or ``constant_correlation``. Defaults to
``constant_variance``.
:type shrinkage_target: str, optional
:raises NotImplementedError: if the shrinkage_target is unrecognised
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
if shrinkage_target == "constant_variance":
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.ledoit_wolf(X)
elif shrinkage_target == "single_factor":
shrunk_cov, self.delta = self._ledoit_wolf_single_factor()
elif shrinkage_target == "constant_correlation":
shrunk_cov, self.delta = self._ledoit_wolf_constant_correlation()
else:
raise NotImplementedError(
"Shrinkage target {} not recognised".format(shrinkage_target)
)
return self._format_and_annualize(shrunk_cov)
def _ledoit_wolf_single_factor(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the Sharpe single-factor matrix as the shrinkage target.
See Ledoit and Wolf (2001).
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
# De-mean returns
t, n = np.shape(X)
Xm = X - X.mean(axis=0)
xmkt = X.mean(axis=1).reshape(t, 1)
# compute sample covariance matrix
sample = np.cov(np.append(Xm, xmkt, axis=1), rowvar=False) * (t - 1) / t
betas = sample[0:n, n].reshape(n, 1)
varmkt = sample[n, n]
sample = sample[:n, :n]
F = np.dot(betas, betas.T) / varmkt
F[np.eye(n) == 1] = np.diag(sample)
# compute shrinkage parameters
c = np.linalg.norm(sample - F, "fro") ** 2
y = Xm ** 2
p = 1 / t * np.sum(np.dot(y.T, y)) - np.sum(sample ** 2)
# r is divided into diagonal
# and off-diagonal terms, and the off-diagonal term
# is itself divided into smaller terms
rdiag = 1 / t * np.sum(y ** 2) - sum(np.diag(sample) ** 2)
z = Xm * np.tile(xmkt, (n,))
v1 = 1 / t * np.dot(y.T, z) - np.tile(betas, (n,)) * sample
roff1 = (
np.sum(v1 * np.tile(betas, (n,)).T) / varmkt
- np.sum(np.diag(v1) * betas.T) / varmkt
)
v3 = 1 / t * np.dot(z.T, z) - varmkt * sample
roff3 = (
np.sum(v3 * np.dot(betas, betas.T)) / varmkt ** 2
- np.sum(np.diag(v3).reshape(-1, 1) * betas ** 2) / varmkt ** 2
)
roff = 2 * roff1 - roff3
r = rdiag + roff
# compute shrinkage constant
k = (p - r) / c
delta = max(0, min(1, k / t))
# compute the estimator
shrunk_cov = delta * F + (1 - delta) * sample
return shrunk_cov, delta
def _ledoit_wolf_constant_correlation(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the constant correlation matrix as the shrinkage target.
See Ledoit and Wolf (2003)
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
t, n = np.shape(X)
S = self.S # sample cov matrix
# Constant correlation target
var = np.diag(S).reshape(-1, 1)
std = np.sqrt(var)
_var = np.tile(var, (n,))
_std = np.tile(std, (n,))
r_bar = (np.sum(S / (_std * _std.T)) - n) / (n * (n - 1))
F = r_bar * (_std * _std.T)
F[np.eye(n) == 1] = var.reshape(-1)
# Estimate pi
Xm = X - X.mean(axis=0)
y = Xm ** 2
pi_mat = np.dot(y.T, y) / t - 2 * np.dot(Xm.T, Xm) * S / t + S ** 2
pi_hat = np.sum(pi_mat)
# Theta matrix, expanded term by term
term1 = np.dot((Xm ** 3).T, Xm) / t
help_ = np.dot(Xm.T, Xm) / t
help_diag = np.diag(help_)
term2 = np.tile(help_diag, (n, 1)).T * S
term3 = help_ * _var
term4 = _var * S
theta_mat = term1 - term2 - term3 + term4
theta_mat[np.eye(n) == 1] = np.zeros(n)
rho_hat = sum(np.diag(pi_mat)) + r_bar * np.sum(
np.dot((1 / std), std.T) * theta_mat
)
# Estimate gamma
gamma_hat = np.linalg.norm(S - F, "fro") ** 2
# Compute shrinkage constant
kappa_hat = (pi_hat - rho_hat) / gamma_hat
delta = max(0.0, min(1.0, kappa_hat / t))
# Compute shrunk covariance matrix
shrunk_cov = delta * F + (1 - delta) * S
return shrunk_cov, delta
[docs] def oracle_approximating(self):
"""
Calculate the Oracle Approximating Shrinkage estimate
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.oas(X)
return self._format_and_annualize(shrunk_cov)