import numpy as np
from scipy.optimize import least_squares as lsq
from scipy.stats import binom
import pandas as pd
from fast_poibin import PoiBin
from joblib import Parallel, delayed
import numba
from numba import jit,prange
from statsmodels.stats.multitest import multipletests
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, BoundaryNorm
import seaborn as sns
from scipy.stats import ks_2samp
#Set number of threads for parallel computation in input
[docs]class TSeries:
"""
TSeries: Graph-Based Time Series Analysis Class
The TSeries class provides a comprehensive framework for analyzing time series data using graph-based methods.
It is designed to process a weighted adjacency matrix (2D numpy array) representing a N x T time series and extract a rich set of statistics,
including binary signatures, motif statistics, model fitting, signed graph projection, and signed community detection.
Key Features:
-------------
- Standardizes input time series data (row-wise mean ~0, std ~1).
- Computes binary time series representations (positive/negative motifs).
- Calculates motif-based signatures and concordance/discordance matrices.
- Supports model fitting for binary Signed Random Graph Model (bSRGM) and binary Signed Configuration Model (bSCM).
- Predicts event probabilities for fitted models.
- Validates signature distributions via ensemble simulations and analytical methods (KS test).
- Builds filtered graphs based on statistical significance (with FDR correction).
- Detects communities using greedy minimization of BIC or frustration objectives.
- Visualizes graphs, communities, and block matrices.
data : np.ndarray
2D numpy array (N x T) representing the time series matrix (N nodes, T time steps).
n_jobs : int, optional
Number of parallel jobs for computations (default: 1).
Attributes
n_jobs : int
Number of parallel jobs used.
N : int
Number of nodes (rows in data).
T : int
Number of time steps (columns in data).
tseries : np.ndarray
Standardized time series matrix.
binary_tseries : np.ndarray
Binary sign matrix of time series.
binary_tseries_positive : np.ndarray
Matrix of positive binary motifs.
binary_tseries_negative : np.ndarray
Matrix of negative binary motifs.
ai_plus, ai_minus : np.ndarray
Row-wise sums of positive/negative motifs.
kt_plus, kt_minus : np.ndarray
Column-wise sums of positive/negative motifs.
a_plus, a_minus : float
Total sum of positive/negative motifs.
binary_concordant_motifs : np.ndarray
Matrix of concordant motif counts.
binary_discordant_motifs : np.ndarray
Matrix of discordant motif counts.
binary_signature : np.ndarray
Matrix of binary signature values.
params : np.ndarray
Fitted model parameters.
ll : float
Log-likelihood of the fitted model.
jac : np.ndarray
Jacobian of the fitted model.
norm : float
Norm of the Jacobian.
aic : float
Akaike Information Criterion of the fitted model.
norm_rel_error : float
Relative error of the fitted model.
Name of the fitted model.
x0 : np.ndarray
Initial guess for model parameters.
tol : float
Tolerance for optimization.
eps : float
Step size for numerical approximation.
maxiter : int
Maximum number of optimization iterations.
verbose : int
Verbosity level.
pit_plus, pit_minus : np.ndarray
Predicted probabilities for positive/negative events.
n_ensemble : int
Number of ensemble realizations for validation.
ensemble_signature : np.ndarray
Ensemble signature matrix.
analytical_signature : np.ndarray
Analytical signature matrix.
analytical_signature_dist : np.ndarray
Analytical signature distribution.
ks_score : float
Kolmogorov-Smirnov score for signature validation.
p_values_corrected : np.ndarray
FDR-corrected p-values matrix.
cdfx_condition : np.ndarray
Matrix indicating direction of statistical significance.
graph : np.ndarray
Filtered adjacency matrix (projection graph).
communities : np.ndarray
Community labels for each node.
Methods
__init__(self, data=None, n_jobs=1)
Initialize the TSeries instance and compute marginals.
compute_signature(self)
Compute binary signatures and motif statistics.
fit(self, model, ...)
Fit a specified model ('bSRGM', 'bSCM') to the data.
predict(self)
Predict event probabilities for the fitted model.
check_distribution_signature(self, n_ensemble=1000, ks_score=True, alpha=0.05)
Validate signature distribution using ensemble and analytical methods.
build_graph(self, fdr_correction_flag=True, alpha=0.05)
Build filtered graph using statistical significance (with FDR correction).
plot_graph(self, export_path='', show=True)
Plot the adjacency matrix as a heatmap.
community_detection(self, trials=500, n_jobs=None, method="bic", show=False, ...)
Detect communities using greedy minimization of BIC or frustration.
plot_communities(self, export_path="", show=True)
Plot reordered adjacency matrix with community blocks.
plot_block_matrix(self, export_path="", show=True)
- The class is optimized for parallel computation and large time series datasets.
- All statistical tests and corrections are performed on the upper triangular part of the projection matrices for efficiency.
- Community detection supports robust initialization strategies for reproducibility.
- Visualization methods use discrete colormaps for signed graphs.
If input data is missing or incorrectly formatted, or if required computations are not performed.
If input types are incorrect or unsupported.
"""
def __init__(
self,
data = None, n_jobs=1
):
#Initialization
self.n_jobs = n_jobs
self.params = None
self.ll = None
self.jac = None
self.norm = None
self.aic = None
self.args = None
self.norm_rel_error = None
self.binary_signature = None
self.ensemble_signature = None
self.model = None
self.args = None
self.x0 = None
self.tol = None
self.eps = None
self.maxiter = None
self.verbose = None
self.pit_plus = None
self.pit_minus = None
self.n_ensemble = None
#Check on data
if data is None:
raise ValueError('Time Series matrix is missing!')
elif type(data) != np.ndarray:
raise TypeError('Time Series matrix must be a numpy array!')
if np.issubdtype(data.dtype, np.integer):
raise ValueError('Time Series matrix must be a float matrix!')
numba.set_num_threads(self.n_jobs)
#Implemented models
self.implemented_models = ['naive','bSRGM','bSCM']
# Check if data is standardized on rows (mean ~0, std ~1), if not, standardize
row_means = np.mean(data, axis=1)
row_stds = np.std(data, axis=1)
if not (np.allclose(row_means, 0, atol=1e-6) and np.allclose(row_stds, 1, atol=1e-6)):
data = (data - row_means[:, None]) / (row_stds[:, None] + 1e-12)
# print("Data standardized on rows.")
# Inizialization of data and computation of marginals
self.N = data.shape[0]
self.T = data.shape[1]
#Binary time series
self.tseries = data
self.binary_tseries = np.sign(data)
#Marginals for positive weights
self.binary_tseries_positive = np.where(self.binary_tseries > 0, self.binary_tseries, 0)
self.binary_tseries_negative = np.abs(np.where(self.binary_tseries < 0, self.binary_tseries, 0))
self.ai_plus = self.binary_tseries_positive.sum(axis=1).astype(float) # row-wise sum of binary positive weights
self.kt_plus = self.binary_tseries_positive.sum(axis=0).astype(float) # column-wise sum of binary positive weights
self.a_plus = self.binary_tseries_positive.sum().astype(float)
self.ai_minus = self.binary_tseries_negative.sum(axis=1).astype(float) # row-wise sum of binary negative weights
self.kt_minus = self.binary_tseries_negative.sum(axis=0).astype(float) # column-wise sum of binary negative weights
self.a_minus = self.binary_tseries_negative.sum().astype(float)
[docs] def compute_signature(self):
"""
Computes the binary signatures of time series data.
This method calculates the concordant and discordant motifs for binary time series data.
It then computes the binary signature by subtracting the discordant motifs from the concordant motifs.
The method performs the following steps:
1. Computes pairwise motifs for binary time series data (positive-positive, positive-negative, negative-positive, negative-negative).
2. Calculates the binary concordant motifs as the sum of positive-positive and negative-negative motifs.
3. Calculates the binary discordant motifs as the sum of positive-negative and negative-positive motifs.
4. Computes the binary signature as the difference between binary concordant and discordant motifs.
Attributes:
binary_concordant_motifs (int): Sum of concordant motifs for binary time series data.
binary_discordant_motifs (int): Sum of discordant motifs for binary time series data.
binary_signature (int): Difference between binary concordant and discordant motifs.
"""
@jit(nopython=True)
def pairwise_motif(data1, data2):
"""
Compute the cofluctuation dynamic matrix for two time series datasets.
This function calculates the cofluctuation dynamic matrix, which is an NxN matrix for each time interval.
The matrix element C_ij is defined as:
- 1 if series 'i' and series 'j' fluctuate with the same sign,
- -1 if they fluctuate with opposite signs,
- 0 otherwise.
Parameters:
data1 (numpy.ndarray): A 2D array of shape (N, T) representing the first time series dataset.
data2 (numpy.ndarray): A 2D array of shape (N, T) representing the second time series dataset.
Returns:
numpy.ndarray: An NxN matrix representing the cofluctuation dynamic matrix.
"""
# Use matrix multiplication for efficient computation
motif = np.dot(data1, data2.T)
return motif
motif_plus_plus = pairwise_motif(self.binary_tseries_positive,self.binary_tseries_positive)
motif_plus_minus = pairwise_motif(self.binary_tseries_positive,self.binary_tseries_negative)
motif_minus_plus = pairwise_motif(self.binary_tseries_negative,self.binary_tseries_positive)
motif_minus_minus = pairwise_motif(self.binary_tseries_negative,self.binary_tseries_negative)
self.binary_concordant_motifs = motif_plus_plus + motif_minus_minus
self.binary_discordant_motifs = motif_plus_minus + motif_minus_plus
self.binary_signature = self.binary_concordant_motifs - self.binary_discordant_motifs
return self.binary_signature
[docs] def fit(
self,
model,
x0 = None,
maxiter = 1000,
max_nfev = 1000,
verbose= 0,
tol = 1e-8,
eps = 1e-8,
output_params_path = None, imported_params = None, solver_type = 'fixed_point'
):
"""
Fit the specified model to the data.
Parameters:
-----------
model : str
The model to be fitted. Must be one of the implemented models: 'bSRGM', 'bSCM'.
x0 : array-like, optional
Initial guess for the parameters. If None, a random initialization will be used.
maxiter : int, optional
Maximum number of iterations for the optimization algorithm. Default is 1000.
max_nfev : int, optional
Maximum number of function evaluations for the optimization algorithm. Default is 1000.
verbose : int, optional
Verbosity level of the optimization algorithm. Default is 0.
tol : float, optional
Tolerance for termination by the optimization algorithm. Default is 1e-8.
eps : float, optional
Step size used for numerical approximation of the Jacobian. Default is 1e-8.
output_params_path : str, optional
Path to save the fitted parameters. If None, the parameters will not be saved.
Raises:
-------
ValueError
If the model is not initialized or not implemented.
TypeError
If output_params_path is not a string.
Returns:
--------
None
"""
### Initialization
self.x0 = x0
self.tol = tol
self.eps = eps
self.maxiter = maxiter
self.verbose = verbose
self.model = model
self.solver_type = solver_type
### Input Validation
if model is None:
raise ValueError('model must be initialized')
elif model not in self.implemented_models:
raise ValueError('model is not implemented! Inspect "self.implemented_models"')
elif model == 'naive':
pass
### Inizialization of arguments for each model and of x0 if not initialized in input.
if self.model == 'bSRGM': # Binary Bipartite-Signed Random Graph Model (for Time Series)
self.args = (self.a_plus,self.a_minus, (self.N,self.T))
self.x0 = np.random.random(2)
elif self.model == 'bSCM': # Binary Bipartite-Signed Configuration Model (for Time Series)
self.args = (self.ai_plus,self.kt_plus,self.ai_minus,self.kt_minus)
self.x0 = np.random.random(2*self.N + 2*self.T)
if self.model == 'bSRGM':
@jit(nopython=True)
def loglikelihood_bsr_model(params,a_plus,a_minus,shape):
"""Log-likelihood function for the Binary Bipartite-Signed Random Graph Model (for Time Series)"""
N = shape[0]
T = shape[1]
alpha = params[0]
gamma = params[1]
ll = - alpha * a_plus - gamma * a_minus - N*T*np.log(np.exp(-alpha) + np.exp(-gamma))
return - ll
@jit(nopython=True)
def jacobian_bsr_model(params,a_plus,a_minus,shape):
"""Jacobian function for the Binary Bipartite-Signed Random Graph Model (for Time Series)"""
N = shape[0]
T = shape[1]
alpha = params[0]
gamma = params[1]
au_alpha = np.exp(-alpha)
au_gamma = np.exp(-gamma)
jac = np.empty(len(params))
a_plus_th = N*T*au_alpha/(au_alpha+au_gamma)
a_minus_th = N*T*au_gamma/(au_alpha+au_gamma)
jac[0] = -a_plus + a_plus_th
jac[1] = -a_minus + a_minus_th
return - jac
@jit(nopython=True)
def relative_error_bsr_model(params,a_plus,a_minus,shape,tol=1e-10):
"""Relative error function for the Binary Bipartite-Signed Random Graph Model (for Time Series)"""
N = shape[0]
T = shape[1]
alpha = params[0]
gamma = params[1]
au_alpha = np.exp(-alpha)
au_gamma = np.exp(-gamma)
jac = np.empty(len(params))
a_plus_th = N*T*au_alpha/(au_alpha+au_gamma)
a_minus_th = N*T*au_gamma/(au_alpha+au_gamma)
jac[0] = (-a_plus + a_plus_th)/(a_plus+tol)
jac[1] = (-a_minus + a_minus_th)/(a_minus+tol)
return - jac
if imported_params is None:
self.params = lsq(relative_error_bsr_model,x0=self.x0,args=self.args,
gtol=self.tol,xtol=self.eps,max_nfev=max_nfev,verbose=self.verbose,tr_solver='lsmr').x
else:
self.params = imported_params
self.ll = - loglikelihood_bsr_model(self.params,*self.args)
self.jac = - jacobian_bsr_model(self.params,*self.args)
self.norm = np.linalg.norm(self.jac,ord=np.inf)
rel_error = - relative_error_bsr_model(self.params,*self.args)
self.norm_rel_error = np.linalg.norm(rel_error,ord=np.inf)
self.aic = 2*len(self.params) - 2*self.ll
elif self.model == 'bSCM':
@jit(nopython=True)
def loglikelihood_bscm_model(params,ai_plus,kt_plus,ai_minus,kt_minus):
"""Log-likelihood function for the Binary Bipartite Signed Configuration Model (for Time Series)"""
N = len(ai_plus)
T = len(kt_plus)
alphai = params[:N]
betat = params[N:N+T]
gammai = params[N+T:N+T+N]
deltat = params[N+N+T:]
H = np.sum(alphai*ai_plus+gammai*ai_minus) + np.sum(betat*kt_plus + deltat*kt_minus)
lnZ = 0
for i in range(N):
for t in range(T):
aut1 = np.exp(-alphai[i]-betat[t])
aut2 = np.exp(-gammai[i]-deltat[t])
lnZ += np.log(aut1+aut2)
ll = - H - lnZ
return - ll
@jit(nopython=True, parallel=True)
def jacobian_bscm_model(params,ai_plus,kt_plus,ai_minus,kt_minus):
"""Jacobian function for the Binary Bipartite-Signed Configuration Model (for Time Series)"""
N = len(ai_plus)
T = len(kt_plus)
alphai = params[:N]
betat = params[N:N+T]
gammai = params[N+T:N+T+N]
deltat = params[N+N+T:]
jac = np.empty(len(params))
jac[:N] = -ai_plus
jac[N:N+T] = -kt_plus
jac[N+T:N+N+T] = -ai_minus
jac[N+N+T:] = -kt_minus
for i in prange(N):
for t in range(T):
aut1 = np.exp(-alphai[i]-betat[t])
aut2 = np.exp(-gammai[i]-deltat[t])
jac[i] += aut1 / ((aut1 + aut2))
jac[N+T+i] += aut2 / (aut1 + aut2)
for t in prange(T):
for i in range(N):
aut1 = np.exp(-alphai[i]-betat[t])
aut2 = np.exp(-gammai[i]-deltat[t])
jac[N+t] += aut1 / ((aut1 + aut2))
jac[N+N+T+t] += aut2 / (aut1 + aut2)
return - jac
@jit(nopython=True, parallel=True)
def relative_error_bscm_model(params,ai_plus,kt_plus,ai_minus,kt_minus,tol=1e-10):
"""Relative error function for the Binary Bipartite-Signed Configuration Model (for Time Series)"""
N = len(ai_plus)
T = len(kt_plus)
alphai = params[:N]
betat = params[N:N+T]
gammai = params[N+T:N+T+N]
deltat = params[N+N+T:]
jac = np.empty(len(params))
jac[:N] = -ai_plus
jac[N:N+T] = -kt_plus
jac[N+T:N+N+T] = -ai_minus
jac[N+N+T:] = -kt_minus
for i in prange(N):
for t in range(T):
aut1 = np.exp(-alphai[i]-betat[t])
aut2 = np.exp(-gammai[i]-deltat[t])
jac[i] += aut1 / ((aut1 + aut2))
jac[N+T+i] += aut2 / (aut1 + aut2)
for t in prange(T):
for i in range(N):
aut1 = np.exp(-alphai[i]-betat[t])
aut2 = np.exp(-gammai[i]-deltat[t])
jac[N+t] += aut1 / ((aut1 + aut2))
jac[N+N+T+t] += aut2 / (aut1 + aut2)
jac[:N] /= (ai_plus+tol)
jac[N:N+T] /= (kt_plus+tol)
jac[N+T:N+N+T] /= (ai_minus+tol)
jac[N+N+T:] /= (kt_minus+tol)
return - jac
@jit(nopython=True)
def fixed_point_solver_bscm_model(ai_plus, kt_plus, ai_minus, kt_minus, max_iterations=10000, diff=1e-08, tol=1e-06, print_steps=100):
"""Jacobian function for the Binary Bipartite-Signed Configuration Model (for Time Series)"""
N = len(ai_plus)
T = len(kt_plus)
xi = ai_plus / np.sqrt(np.sum(ai_plus))
zt = kt_plus / np.sqrt(np.sum(kt_plus))
yi = ai_minus / np.sqrt(np.sum(ai_minus))
vt = kt_minus / np.sqrt(np.sum(kt_minus))
xi_new = xi.copy()
zt_new = zt.copy()
yi_new = yi.copy()
vt_new = vt.copy()
for it in range(max_iterations):
xi = xi_new.copy()
yi = yi_new.copy()
zt = zt_new.copy()
vt = vt_new.copy()
for i in range(N):
den_xi_new = np.sum(zt / (1. + xi[i] * zt + yi[i] * vt))
den_yi_new = np.sum(vt / (1. + xi[i] * zt + yi[i] * vt))
xi_new[i] = ai_plus[i] / den_xi_new
yi_new[i] = ai_minus[i] / den_yi_new
for t in range(T):
den_zt_new = np.sum(xi / (1. + xi * zt[t] + yi * vt[t]))
den_vt_new = np.sum(yi / (1. + xi * zt[t] + yi * vt[t]))
zt_new[t] = kt_plus[t] / den_zt_new
vt_new[t] = kt_minus[t] / den_vt_new
normies = np.empty(4)
normies[0] = np.linalg.norm(xi - xi_new)
normies[1] = np.linalg.norm(yi - yi_new)
normies[2] = np.linalg.norm(zt - zt_new)
normies[3] = np.linalg.norm(vt - vt_new)
if it % print_steps == 0:
alphai = -np.log(xi_new)
betat = -np.log(zt_new)
gammai = -np.log(yi_new)
deltat = -np.log(vt_new)
whole_params = np.concatenate((alphai, betat, gammai, deltat))
rel_error = np.linalg.norm(relative_error_bscm_model(whole_params, ai_plus,kt_plus,ai_minus,kt_minus),ord=np.inf)
if rel_error < tol:
# print(f"Convergence reached at Iteration {it} for gtol.")
break
if np.max(normies) < diff:
# print(f"Convergence reached at Iteration {it} for xtol.")
break
alphai = -np.log(xi_new)
betat = -np.log(zt_new)
gammai = -np.log(yi_new)
deltat = -np.log(vt_new)
whole_params = np.concatenate((alphai, betat, gammai, deltat))
return whole_params
if imported_params is None:
if self.solver_type == 'lsq':
self.params = lsq(relative_error_bscm_model,x0=self.x0,args=self.args,
gtol=self.tol,xtol=self.eps,max_nfev=max_nfev,verbose=self.verbose,tr_solver='lsmr').x
elif self.solver_type == 'fixed_point':
self.params = fixed_point_solver_bscm_model(*self.args)
else:
raise TypeError('wrong solver type!')
else:
self.params = imported_params
self.ll = - loglikelihood_bscm_model(self.params,*self.args)
self.jac = - jacobian_bscm_model(self.params,*self.args)
self.norm = np.linalg.norm(self.jac,ord=np.inf)
rel_error = - relative_error_bscm_model(self.params,*self.args)
self.norm_rel_error = np.linalg.norm(rel_error,ord=np.inf)
self.aic = 2*len(self.params) - 2*self.ll
if verbose > 0:
print('Fitting Completed with infinite norm:',self.norm, 'and infinite relative norm:',self.norm_rel_error)
if output_params_path is not None:
if isinstance(output_params_path,str):
output_path = output_params_path
params = pd.DataFrame(self.params)
params.to_csv(output_path)
else:
raise TypeError('output_params_path must be a string')
[docs] def predict(self):
"""
Predict the probabilities of events based on the specified model.
This method computes the probabilities of the occurrence of events for the implemented models:
- binary Signed Random Graph Model (bSRGM)
- binary Signed Configuration Model (bSCM)
Returns:
tuple: For "bSRGM" and "bSCM", returns the computed probabilities:
- (pit_plus, pit_minus)
"""
if self.model == "bSRGM":
def bsr_model_proba_events(params, shape):
"""Compute the probabilities of the occurrence of the events for the Binary Bipartite-Signed Random Graph Model (for Time Series)"""
alpha = np.exp(-params[0])
gamma = np.exp(-params[1])
N = shape[0]
T = shape[1]
pit_plus = np.ones((N,T))*alpha/(alpha+gamma)
pit_minus = np.ones((N,T))*gamma/(alpha+gamma)
return pit_plus,pit_minus
self.pit_plus,self.pit_minus = bsr_model_proba_events(self.params,(self.N,self.T))
return self.pit_plus,self.pit_minus
elif self.model == "bSCM":
def bscm_model_proba_events(params,shape):
"""Compute the probabilities of the occurrence of the events for the Binary Bipartite-Signed Configuration Model (for Time Series)"""
N = shape[0]
T = shape[1]
exp_betai = np.exp(-params[:N])
exp_deltat = np.exp(-params[N:N+T])
exp_gammai = np.exp(-params[N+T:N+N+T])
exp_etat = np.exp(-params[N+N+T:])
pit_plus = np.empty((N,T))
pit_minus = np.empty((N,T))
for i in range(N):
for t in range(T):
aut1 = exp_betai[i]*exp_deltat[t]
aut2 = exp_gammai[i]*exp_etat[t]
pit_plus[i,t] = aut1/(aut1+aut2)
pit_minus[i,t] = aut2/(aut1+aut2)
return pit_plus,pit_minus
self.pit_plus,self.pit_minus = bscm_model_proba_events(self.params,(self.N,self.T))
return self.pit_plus,self.pit_minus
elif self.model == "naive":pass
[docs] def check_distribution_signature(self, n_ensemble = 1000, ks_score=True, alpha = 0.05):
"""
Validate the signature of the model using either ensemble or analytical methods.
Parameters:
-----------
n_ensemble : int, optional
Number of ensemble realizations used to build the empirical signature distribution. Default is 1000.
ks_score : bool, optional
If True, compute the Kolmogorov–Smirnov agreement score between empirical and analytical
signature distributions. Default is True.
alpha : float, optional
Significance level used in the KS test when computing the KS score. Default is 0.05.
Flag to indicate whether to use analytical methods for validation. Default is True.
Raises:
-------
ValueError
If the predicted probabilities and conditional weights are not computed before validation.
If the model specified is not valid.
Notes:
------
This function validates the signature of the model by computing p-values and applying FDR correction.
Depending on the model type and the analytical flag, it uses different methods for validation:
- For ensemble-based validation, it computes ensemble signatures and elaborates statistics.
- For analytical validation, it computes p-values using specific analytical models for different types of models.
"""
if self.pit_plus is None:
raise ValueError("Predict probabilities and conditional weights first!")
self.n_ensemble = n_ensemble
@jit(nopython=True)
def pairwise_motif(data1, data2):
"""
Compute the cofluctuation dynamic matrix for two time series datasets.
This function calculates the cofluctuation dynamic matrix, which is an NxN matrix for each time interval.
The matrix element C_ij is defined as:
- 1 if series 'i' and series 'j' fluctuate with the same sign,
- -1 if they fluctuate with opposite signs,
- 0 otherwise.
Parameters:
data1 (numpy.ndarray): A 2D array of shape (N, T) representing the first time series dataset.
data2 (numpy.ndarray): A 2D array of shape (N, T) representing the second time series dataset.
Returns:
numpy.ndarray: An NxN matrix representing the cofluctuation dynamic matrix.
"""
# Use matrix multiplication for efficient computation
motif = np.dot(data1, data2.T)
return motif
@jit(nopython=True)
def sample_single_realization_binary(pit_plus):
"""
Generates a single realization of binary data based on the given probabilities.
Parameters:
pit_plus (numpy.ndarray): A 2D array of shape (N, T) containing the probabilities of the positive outcome (1) for each element.
pit_minus (numpy.ndarray): A 2D array of shape (N, T) containing the probabilities of the negative outcome (-1) for each element.
Returns:
numpy.ndarray: A 2D array of shape (N, T) containing the generated binary data, where each element is either 1 or -1 based on the given probabilities.
"""
N = pit_plus.shape[0]
T = pit_plus.shape[1]
realization_data = np.zeros((N,T))
for i in range(N):
for t in range(T):
p_plus = pit_plus[i,t]
ran = np.random.rand()
if ran < p_plus:
realization_data[i,t] = 1
else:
realization_data[i,t] = -1
return realization_data
@jit(nopython=True,parallel=True)
def ensemble_signature_computation_binary(pit_plus,n_ensemble):
"""Compute the ensemble statistics for the co-fluctuation matrices and correlation matrices."""
N = pit_plus.shape[0]
ensemble_signature = np.empty((n_ensemble,N,N))
for n_ens in prange(n_ensemble):
realization_data = sample_single_realization_binary(pit_plus)
positive_realization = np.where(realization_data > 0, realization_data, 0)
negative_realization = np.abs(np.where(realization_data < 0, realization_data, 0))
motif_plus_plus = pairwise_motif(positive_realization,positive_realization)
motif_plus_minus = pairwise_motif(positive_realization,negative_realization)
motif_minus_plus = pairwise_motif(negative_realization,positive_realization)
motif_minus_minus = pairwise_motif(negative_realization,negative_realization)
ensemble_signature[n_ens,:,:] = motif_plus_plus + motif_minus_minus - motif_plus_minus - motif_minus_plus
return ensemble_signature
if self.model == 'bSRGM':
@jit(nopython=True,parallel=True)
def sample_analytical_bsr_model(pit_plus,pit_minus,n_ensemble):
"""Compute the p-values for the Binary Bipartite-Signed Random Graph Model (for Time Series)"""
N = pit_plus.shape[0]
T = pit_plus.shape[1]
q_plus = (pit_plus**2 + pit_minus**2)[0]
signature_ens = np.empty((N,N,n_ensemble))
for i in prange(N):
for j in range(N):
if j != i:
c_ens = np.random.binomial(T,q_plus[0],n_ensemble)
signature_ens[i,j,:] = 2*c_ens - T
return signature_ens
def compute_c_ens(i, j, T, ensemble_signature, q_plus):
"""Helper function to compute c_ens for a specific (i, j)."""
ensemble_cij = (T + ensemble_signature[i, j, :]) / 2
return binom.pmf(ensemble_cij.astype(int), T, q_plus[0])
def sample_analytical_bsr_model_2(pit_plus, pit_minus, ensemble_signature, n_jobs):
"""Compute the p-values for the Binary Bipartite-Signed Random Graph Model (for Time Series)"""
N = pit_plus.shape[0]
T = pit_plus.shape[1]
q_plus = (pit_plus**2 + pit_minus**2)[0]
signature_ens = ensemble_signature.copy()
# Parallel computation for (i, j) pairs where i != j
results = Parallel(n_jobs=n_jobs)(
delayed(compute_c_ens)(i, j, T, ensemble_signature, q_plus)
for i in range(N) for j in range(N) if i != j
)
# Update signature_ens with the computed results
index = 0
for i in range(N):
for j in range(N):
if i != j:
signature_ens[i, j, :] = results[index]
index += 1
return signature_ens
ensemble_signature = ensemble_signature_computation_binary(self.pit_plus,self.n_ensemble).transpose(1,2,0)
analytical_signature = sample_analytical_bsr_model(self.pit_plus,self.pit_minus,self.n_ensemble)
analytical_signature_dist = sample_analytical_bsr_model_2(self.pit_plus,self.pit_minus,ensemble_signature, self.n_jobs)
elif self.model == 'bSCM':
def compute_pair_signature(i, j, pit_plus, pit_minus, n_ensemble, T):
"""
Compute the ensemble signature for a single node pair (i, j).
Parameters:
i (int): Row index.
j (int): Column index.
pit_plus (numpy.ndarray): N x T matrix of probabilities for positive interactions.
pit_minus (numpy.ndarray): N x T matrix of probabilities for negative interactions.
n_ensemble (int): Number of ensemble samples to generate.
T (int): Number of trials (time steps).
Returns:
tuple: (i, j, ensemble_signature), where ensemble_signature is an array of size n_ensemble.
"""
if i == j:
# Self-loop case: Set signature to T
return i, j, np.full(n_ensemble, T)
# Calculate Poisson Binomial probabilities
probabilities = pit_plus[i, :] * pit_plus[j, :] + pit_minus[i, :] * pit_minus[j, :]
# Initialize the Poisson Binomial distribution
pb = PoiBin(probabilities)
# Precompute the CDF for all possible outcomes
cdf_values = pb.cdf
# Generate uniform random numbers
uniform_samples = np.random.uniform(0, 1, n_ensemble)
# Map uniform samples to Poisson Binomial outcomes
poibin_samples = np.searchsorted(cdf_values, uniform_samples)
# Compute ensemble signature
ensemble_signature = 2 * poibin_samples - T
return i, j, ensemble_signature
def sample_analytical_bscm_model_poibin(pit_plus, pit_minus, n_ensemble, n_jobs=-1):
"""
Compute ensemble signatures for the Binary Bipartite-Signed Random Graph Model with parallelization.
Parameters:
pit_plus (numpy.ndarray): N x T matrix of probabilities for positive interactions.
pit_minus (numpy.ndarray): N x T matrix of probabilities for negative interactions.
n_ensemble (int): Number of ensemble samples to generate.
n_jobs (int): Number of parallel jobs (-1 to use all available cores).
Returns:
numpy.ndarray: N x N x n_ensemble array of ensemble signatures.
"""
N, T = pit_plus.shape
ensemble_signature = np.empty((N, N, n_ensemble))
# Outer loop over i with progress tracking
for i in range(N):
# Parallel processing of inner loop over j
results = Parallel(n_jobs=n_jobs)(
delayed(compute_pair_signature)(i, j, pit_plus, pit_minus, n_ensemble, T)
for j in range(N)
)
for _, j, signature in results:
ensemble_signature[i, j, :] = signature
return ensemble_signature
def compute_pair_iteration(i, j, pit_plus, pit_minus, T):
"""
Compute the ensemble PMF for a single node pair (i, j).
Parameters:
i (int): Row index.
j (int): Column index.
pit_plus (numpy.ndarray): N x T matrix of probabilities for positive interactions.
pit_minus (numpy.ndarray): N x T matrix of probabilities for negative interactions.
T (int): Number of trials (time steps).
Returns:
tuple: (i, j, pmf_values), where pmf_values is the PMF array.
"""
if i == j:
# Self-loop case: Set PMF to T
return i, j, np.full(T + 1, T)
# Calculate Poisson Binomial probabilities
probabilities = pit_plus[i, :] * pit_plus[j, :] + pit_minus[i, :] * pit_minus[j, :]
# Initialize the Poisson Binomial distribution
pb = PoiBin(probabilities)
# Compute the PMF
pmf_values = pb.pmf
return i, j, pmf_values
def sample_analytical_bscm_model_poibin_dist(pit_plus, pit_minus, n_jobs=-1):
"""
Compute ensemble PMFs for the Binary Bipartite-Signed Random Graph Model with parallelization.
Parameters:
pit_plus (numpy.ndarray): N x T matrix of probabilities for positive interactions.
pit_minus (numpy.ndarray): N x T matrix of probabilities for negative interactions.
ensemble_signature (numpy.ndarray): Precomputed ensemble signature array.
n_jobs (int): Number of parallel jobs (-1 to use all available cores).
Returns:
numpy.ndarray: N x N x (T+1) array of PMFs.
"""
N, T = pit_plus.shape
dist_signature = np.empty((N, N, T + 1), dtype=float)
# Outer loop over i with progress tracking
for i in range(N):
results = Parallel(n_jobs=n_jobs)(
delayed(compute_pair_iteration)(i, j, pit_plus, pit_minus, T)
for j in range(N)
)
for _, j, pmf_values in results:
dist_signature[i, j, :] = pmf_values
return dist_signature
ensemble_signature = ensemble_signature_computation_binary(self.pit_plus,self.n_ensemble).transpose(1,2,0)
analytical_signature = np.array(sample_analytical_bscm_model_poibin(self.pit_plus,self.pit_minus,self.n_ensemble))
analytical_signature_dist = np.array(sample_analytical_bscm_model_poibin_dist(self.pit_plus,self.pit_minus,self.n_jobs))
self.ensemble_signature = ensemble_signature
self.analytical_signature = analytical_signature
self.analytical_signature_dist = analytical_signature_dist
if ks_score==True:
### Statistical KS_scores
def compute_ks_score(ensemble_signature, analytical_signature, alpha):
"""
Compute the Kolmogorov-Smirnov (KS) scores between ensemble and analytical signatures.
Parameters:
ensemble_signature (numpy.ndarray): N x N x n_ensemble array of ensemble signatures.
analytical_signature (numpy.ndarray): N x N x n_analytical array of analytical signatures.
alpha (float): threshold for kS test
Returns:
Returns (float): fraction of node pairs (i, j) for which the KS test p-value ≥ alpha
"""
N = ensemble_signature.shape[0]
ks_score = 0
num_tot = 0
for i in range(N):
for j in range(i,N):
if i != j:
num_tot += 1
_, p_KS = ks_2samp(ensemble_signature[i, j, :], analytical_signature[i, j, :], alternative='two-sided', mode='auto')
if p_KS >= alpha:
ks_score += 1
ks_score_normalized = ks_score / num_tot
return ks_score_normalized
self.ks_score = compute_ks_score(self.ensemble_signature, self.analytical_signature, alpha)
return self.ks_score
[docs] def build_graph(self, fdr_correction_flag = True, alpha = 0.05):
"""
This function validates the signature of the model by computing p-values and applying
False Discovery Rate (FDR) correction. Depending on the model type, it uses analytical
methods for validation. The function supports two model types: 'bSRGM' and 'bSCM'.
A filtered signature matrix where elements are retained based on the significance level.
- For the 'bSRGM' model, p-values are computed using a binomial cumulative distribution function.
- For the 'bSCM' model, p-values are computed using the Poisson Binomial distribution.
- The FDR correction is applied to the upper triangular part of the p-values matrix, and the
corrected matrix is made symmetric.
- The filtered signature matrix is computed by retaining elements of the empirical signature
matrix where the corrected p-values are below the significance level.
Validate the signature of the model using analytical methods.
Parameters:
-----------
fdr_correction_flag : bool, optional
Flag to indicate whether to apply False Discovery Rate (FDR) correction. Default is True.
alpha : float, optional
Significance level for statistical tests. Default is 0.05.
Raises:
-------
ValueError
If the predicted probabilities and conditional weights are not computed before validation.
If the model specified is not valid.
Notes:
------
This function validates the signature of the model by computing p-values and applying FDR correction.
Depending on the model type, it uses analytical methods for validation:
- It computes p-values using specific analytical models for different types of models.
"""
if self.model == 'naive':
self.graph = np.sign(self.binary_signature)
return self.graph
if self.pit_plus is None:
raise ValueError("Predict probabilities and conditional weights first!")
self.alpha = alpha
self.fdr_correction_flag = fdr_correction_flag
def fdr_correction(p_values, alpha=0.05):
"""
Apply False Discovery Rate (FDR) correction to the upper triangular part of a matrix of p-values,
and ensure the corrected matrix is symmetric.
Parameters:
p_values (numpy.ndarray): A square numpy matrix of p-values to be corrected.
alpha (float, optional): Significance level for the FDR correction. Default is 0.05.
Returns:
numpy.ndarray: A symmetric numpy array of p-values after FDR correction, with the same shape as the input array.
"""
# Get the upper triangular indices (excluding the diagonal)
triu_indices = np.triu_indices(p_values.shape[0], k=1)
# Flatten the upper triangular part of the p-values matrix
p_values_upper = p_values[triu_indices]
# Apply the FDR correction using multipletests on the upper triangular part
_, p_values_corrected, _, _ = multipletests(p_values_upper, alpha=alpha, method='fdr_bh')
# Rebuild the corrected matrix
corrected_p_values = p_values.copy()
corrected_p_values[triu_indices] = p_values_corrected
# Make the matrix symmetric by copying the upper triangular part to the lower triangular part
corrected_p_values.T[triu_indices] = p_values_corrected
return corrected_p_values
if self.model == 'bSRGM':
def p_values_analytical_bsr_model(pit_plus,pit_minus,concordant_motifs):
"""
Compute the p-values for the Binary Bipartite-Signed Random Graph Model (for Time Series).
This function calculates the p-values for a given binary bipartite-signed random graph model
using analytical methods. It evaluates the statistical significance of concordant motifs
between pairs of nodes in a time series dataset.
Args:
pit_plus (numpy.ndarray): A 2D array of shape (N, T) representing the positive PIT (Probability Integral Transform) values
for N nodes over T time steps.
pit_minus (numpy.ndarray): A 2D array of shape (N, T) representing the negative PIT values
for N nodes over T time steps.
concordant_motifs (numpy.ndarray): A 2D array of shape (N, N) representing the number of concordant motifs
between pairs of nodes.
Returns:
numpy.ndarray: A 2D array of shape (N, N) containing the computed p-values for each pair of nodes.
The diagonal elements are set to 1.0 as self-comparisons are not meaningful.
Notes:
- The p-values are computed using the cumulative distribution function (CDF) of the binomial distribution.
- The function assumes that the input arrays `pit_plus` and `pit_minus` are properly normalized.
- The p-values are two-tailed, calculated as `2 * min(CDF, 1 - CDF)`.
"""
N = pit_plus.shape[0]
T = pit_plus.shape[1]
q_plus = (pit_plus**2 + pit_minus**2)[0]
cdfx_condition = np.zeros((N,N))
p_values = np.empty((N,N))
for i in range(N):
for j in range(N):
if i != j:
cdfx = binom.cdf(concordant_motifs[i,j],T,q_plus[0])
p_values[i,j] = 2.*min(cdfx,1.-cdfx)
cdfx_condition[i,j] = 1 if cdfx > 0.5 else -1
else:
p_values[i,j] = 1.0
return p_values, cdfx_condition
model_p_values, cdfx_condition = p_values_analytical_bsr_model(self.pit_plus,self.pit_minus,self.binary_concordant_motifs)
if self.fdr_correction_flag: #the flag was not used otherwise
self.p_values_corrected = fdr_correction(model_p_values, alpha=self.alpha)
else:
self.p_values_corrected = model_p_values
self.cdfx_condition = cdfx_condition
elif self.model == 'bSCM':
def p_values_analytical_bscm_model(pit_plus,pit_minus,concordant_motifs):
"""
Compute the p-values for a given analytical bSCM (binary Signed Configuration Model) model.
This function calculates the p-values for concordant motifs between pairs of nodes
based on the provided PIT (Probability Integral Transform) matrices and concordant motifs matrix.
Args:
pit_plus (numpy.ndarray): A 2D array of shape (N, T) representing the PIT values for positive motifs.
N is the number of nodes, and T is the number of time steps.
pit_minus (numpy.ndarray): A 2D array of shape (N, T) representing the PIT values for negative motifs.
N is the number of nodes, and T is the number of time steps.
concordant_motifs (numpy.ndarray): A 2D array of shape (N, N) representing the concordant motifs
between pairs of nodes.
Returns:
numpy.ndarray: A 2D array of shape (N, N) containing the computed p-values for each pair of nodes.
The diagonal elements are set to 1.0 as self-comparisons are not meaningful.
"""
N = pit_plus.shape[0]
cdfx_condition = np.zeros((N,N))
p_values = np.empty((N,N))
for i in range(N):
for j in range(N):
if i != j:
probabilities = pit_plus[i,:] * pit_plus[j,:] + pit_minus[i,:] * pit_minus[j,:]
pb = PoiBin(list(probabilities))
cdfx = pb.cdf[int(concordant_motifs[i,j])]
if cdfx > 0.5:
cdfx_condition[i,j] = 1
else:
cdfx_condition[i,j] = -1
p_values[i,j] = 2.*min(cdfx,1.-cdfx)
else:
p_values[i,j] = 1.0
return p_values, cdfx_condition
model_p_values, cdfx_condition = p_values_analytical_bscm_model(self.pit_plus,self.pit_minus,self.binary_concordant_motifs)
if self.fdr_correction_flag:
self.p_values_corrected = fdr_correction(model_p_values, alpha=self.alpha)
else:
self.p_values_corrected = model_p_values
self.cdfx_condition = cdfx_condition
else:
raise ValueError('The model is not valid!')
self.graph = np.where(self.p_values_corrected < self.alpha, self.cdfx_condition, 0) # filtered graph without sign information
return self.graph
[docs] def plot_graph(self, export_path='', show=True):
"""
Plots the naive and filtered adjacency matrices as heatmaps.
Parameters:
-----------
export_path : str, optional
The file path (excluding extension) where the plot will be saved as a PDF.
If not provided, the plot will not be saved. Default is an empty string.
show : bool, optional
If True, displays the plot. Default is True.
Raises:
-------
ValueError
If `self.filtered_graph` is None, indicating that the graph has not been built.
Notes:
------
- The naive adjacency matrix is plotted on the left, and the filtered adjacency
matrix is plotted on the right.
- The heatmaps use a discrete colormap with three colors: red (-1), white (0),
and blue (1).
- If `export_path` is provided, the plot is saved as a PDF with the suffix
"_adjacency.pdf".
"""
if self.graph is None:
raise ValueError("Build the graph first!")
# Define a discrete colormap
colors = ["red", "white", "blue"] # Colors for -1, 0, and 1
cmap = ListedColormap(colors)
bounds = [-1.5, -0.5, 0.5, 1.5] # Boundaries for discrete values
norm = BoundaryNorm(bounds, cmap.N)
def plot_heatmap(matrix, title, ax):
# Plot heatmap using the discrete colormap
cax = ax.matshow(matrix, cmap=cmap, norm=norm)
ax.set_title(title)
# Place colorbar horizontally below the figure
plt.colorbar(
cax, ax=ax, orientation='horizontal', ticks=[-1, 0, 1],
fraction=0.046, pad=0.15, label='Link Value'
)
fig, ax = plt.subplots(figsize=(5, 5))
plot_heatmap(self.graph, 'Projection Matrix', ax)
plt.tight_layout()
if export_path:
export_path_corr_matrix = f"{export_path}_adjacency.pdf"
plt.savefig(export_path_corr_matrix, dpi=600)
if show:
plt.show()
plt.close()
def _reorder_graph(self, graph, labels):
"""Reorder adjacency matrix by community labels."""
sorted_idx = np.argsort(labels)
return graph[sorted_idx][:, sorted_idx], labels[sorted_idx]
def _draw_community_blocks(self, ax, labels, color='black', linewidth=1.5):
"""Draw lines to separate communities."""
boundaries = np.cumsum(np.unique(labels, return_counts=True)[1])[:-1]
for b in boundaries:
ax.axhline(b - 0.1, color=color, linewidth=linewidth)
ax.axvline(b - 0.1, color=color, linewidth=linewidth)
[docs] def plot_communities(self, export_path="", show=True):
"""
Plot reordered adjacency matrix by community labels with boxes.
Parameters:
-----------
graph_type : str, optional
Either "naive" or "filtered" (default="filtered").
export_path : str, optional
Path to save the PDF figure. If empty, the plot is not saved.
show : bool, optional
If True, display the figure.
"""
graph = self.graph
labels = self.communities
if graph is None or labels is None:
raise ValueError(f"Graph or communities not available. Run build_graph() and community_detection().")
reordered_graph, reordered_labels = self._reorder_graph(graph, labels)
cmap = ListedColormap(["red", "white", "blue"])
bounds = [-1.5, -0.5, 0.5, 1.5]
norm = BoundaryNorm(bounds, cmap.N)
fig, ax = plt.subplots(figsize=(5, 5))
sns.heatmap(reordered_graph, cmap=cmap, norm=norm,
cbar_kws={'ticks': [-1, 0, 1], 'orientation':'horizontal','fraction':0.046}, ax=ax, square=True)
self._draw_community_blocks(ax, reordered_labels, color='black', linewidth=1.5)
ax.set_title(f"Graph Reordered by Communities", fontsize=14, weight='bold')
ax.set_xlabel("ROI (Reordered)", fontsize=12)
ax.set_ylabel("ROI (Reordered)", fontsize=12)
ax.set_xticklabels([])
ax.set_yticklabels([])
plt.tight_layout()
if export_path:
plt.savefig(f"{export_path}_communities.pdf", dpi=600)
if show:
plt.show()
plt.close()
[docs] def plot_block_matrix(self, export_path="", show=True):
"""
Plot block matrix of the graph based on detected communities.
Parameters:
-----------
export_path : str, optional
Path to save the PDF figure. If empty, the plot is not saved.
show : bool, optional
If True, display the figure.
"""
if self.graph is None or self.communities is None:
raise ValueError(f"Graph or communities not available. Run build_graph() and community_detection().")
def compute_diagonal_block_probabilities(adj_matrix, community_labels):
"""
Compute the probabilities of positive and negative links in the diagonal blocks of an adjacency matrix.
Parameters:
- adj_matrix: numpy array, adjacency matrix of the graph
- community_labels: numpy array, community labels for each node
Returns:
- dict: A dictionary with probabilities of positive and negative links for each community
"""
unique_labels = np.unique(community_labels)
probabilities = {}
for label in unique_labels:
# Get indices of nodes in the current community
community_indices = np.where(community_labels == label)[0]
N_c = len(community_indices)
if N_c > 1: # Only compute if the community has more than one node
# Extract the diagonal block for the current community
community_block = adj_matrix[np.ix_(community_indices, community_indices)]
# Count positive and negative links
num_positive_links = np.sum(community_block == 1)
num_negative_links = np.sum(community_block == -1)
# Total possible links in the diagonal block
total_links = N_c * (N_c - 1)
# Compute probabilities
prob_positive_links = num_positive_links / total_links
prob_negative_links = num_negative_links / total_links
probabilities[label] = {
'prob_positive_links': prob_positive_links,
'prob_negative_links': prob_negative_links
}
else:
probabilities[label] = {
'prob_positive_links': 0,
'prob_negative_links': 0
}
return probabilities
def compute_off_diagonal_block_probabilities(adj_matrix, community_labels):
"""
Compute the probabilities of positive and negative links in the off-diagonal blocks of an adjacency matrix.
Parameters:
- adj_matrix: numpy array, adjacency matrix of the graph
- community_labels: numpy array, community labels for each node
Returns:
- dict: A dictionary with probabilities of positive and negative links for each pair of communities
"""
unique_labels = np.unique(community_labels)
probabilities = {}
for i, label_i in enumerate(unique_labels):
for j, label_j in enumerate(unique_labels):
if i < j: # Only consider off-diagonal blocks
# Get indices of nodes in the two communities
indices_i = np.where(community_labels == label_i)[0]
indices_j = np.where(community_labels == label_j)[0]
# Extract the off-diagonal block
block = adj_matrix[np.ix_(indices_i, indices_j)]
# Count positive and negative links
num_positive_links = np.sum(block == 1)
num_negative_links = np.sum(block == -1)
# Total possible links in the off-diagonal block
total_links = len(indices_i) * len(indices_j)
# Compute probabilities
prob_positive_links = num_positive_links / total_links if total_links > 0 else 0
prob_negative_links = num_negative_links / total_links if total_links > 0 else 0
probabilities[(label_i, label_j)] = {
'prob_positive_links': prob_positive_links,
'prob_negative_links': prob_negative_links
}
return probabilities
diag_probs = compute_diagonal_block_probabilities(self.graph, self.communities)
off_probs = compute_off_diagonal_block_probabilities(self.graph, self.communities)
unique_labels = np.unique(self.communities)
M = np.empty((len(unique_labels), len(unique_labels)))
for i, li in enumerate(unique_labels):
for j, lj in enumerate(unique_labels):
if i == j:
p_pos = diag_probs[li]['prob_positive_links']
p_neg = diag_probs[li]['prob_negative_links']
else:
key = (li, lj) if (li, lj) in off_probs else (lj, li)
p_pos = off_probs[key]['prob_positive_links']
p_neg = off_probs[key]['prob_negative_links']
M[i, j] = 1 if p_pos > p_neg else -1 if p_pos < p_neg else 0
fig,ax = plt.subplots(figsize=(5,5))
cmap = ListedColormap(["red", "white", "blue"])
bounds = [-1.5, -0.5, 0.5, 1.5]
norm = BoundaryNorm(bounds, cmap.N)
sns.heatmap(M, cmap=cmap, norm=norm, linecolor='gray', linewidths=0.5,
cbar_kws={'ticks': [-1, 0, 1], 'orientation':'horizontal','fraction':0.046,'label':'Dominant Link Type'}, ax=ax, square=True)
ax.set_title(f"Block Matrix based on Communities", fontsize=14, weight='bold')
ax.set_xlabel("Community", fontsize=12)
ax.set_ylabel("Community", fontsize=12)
print(M.shape)
num_labels = M.shape[0]
ax.xaxis.set_major_locator(plt.FixedLocator(range(num_labels)))
ax.yaxis.set_major_locator(plt.FixedLocator(range(num_labels)))
ax.set_xticklabels([str(j+1) for j in range(num_labels)])
ax.set_yticklabels([str(j+1) for j in range(num_labels)], rotation=0)
plt.tight_layout()
if export_path:
plt.savefig(f"{export_path}_block_matrix.pdf", dpi=600)
if show:
plt.show()
plt.close()
return M