Source code for DyGyS.Directed_class

import numpy as np
from . import ll_Functions_conditional as lfC
from . import ll_Functions_integrated as lfI
from . import ll_Functions_econometrics as lfE
from . import solver_Functions as sF
from . import netstats_Functions as nF
from . import ensemble_Functions as eF
from numba.typed import List

[docs]class DirectedGraph: """"Directed Graph instance must be initialised with adjacency weighted matrix. An exploratory analysis is conducted during initialization. See .implemented_models, .implemented_network_statistics and .implemented_classifier_statistics for available methods and statistics. :params adjacency (np.ndarray,list): Weighted adjacency matrix in numpy 1D, numpy 2D or list format. """ def __init__( self, adjacency=None, ): if adjacency is not None: if isinstance(adjacency,(np.ndarray,)): if len(adjacency.shape) == 2: adjacency = nF.flatten(adjacency) elif len(adjacency.shape) > 2: raise TypeError("Adjacency must be a 1-D or 2-D numpy array") if not isinstance(adjacency,(np.ndarray,list)): raise TypeError("Adjacency must be a 1-D or 2-D numpy array") elif adjacency.size > 0: if np.sum(adjacency < 0): raise TypeError( "The adjacency matrix entries must be semi-definite positive." ) if isinstance( adjacency, list ): self.adjacency = np.array(adjacency) elif isinstance(adjacency, np.ndarray): self.adjacency = adjacency else: raise TypeError("Adjacency matrix must be initialized.") self.binary_adjacency = nF.binarize(adjacency) self.degree = nF.deg(self.binary_adjacency) self.degree_in = nF.deg_in(self.binary_adjacency) self.annd = nF.knn_single(self.binary_adjacency) self.clust = nF.clust_single_fast(self.binary_adjacency) self.strength = nF.strength(adjacency) self.strength_in = nF.strength_in(adjacency) self.anns = nF.stnn_single(adjacency,self.binary_adjacency) self.clust_w = nF.clust_w_single_fast(adjacency,self.binary_adjacency) self.n_edges = self.binary_adjacency.sum() self.n_nodes = int(np.sqrt(len(adjacency))) self.implemented_models = ["POIS","ZIP","NB2","ZINB","L-CGeom","k-CGeom","L-IGeom","k-IGeom", "L-CExp","k-CExp","L-IExp","k-IExp","L-CPareto","k-CPareto","L-CGamma","k-CGamma","L-CLognormal","k-CLognormal"] self.discrete_models = ["POIS","ZIP","NB2","ZINB","L-IGeom","k-IGeom","L-CGeom","k-CGeom"] self.continuous_models = ["L-IExp","k-IExp","k-CExp","L-CExp","L-CPareto","k-CPareto","L-CGamma","k-CGamma","L-CLognormal","k-CLognormal"] self.implemented_network_statistics = ["degree","degree_in","annd","clust","strength","strength_in","anns","clust_w"] self.implemented_classifier_statistics = ["TPR","SPC","PPV","ACC","BACC","F1_score"] # Model Matrices self.model_weighted_adjacency = None self.model_binary_adjacency = None # Problem solutions self.params = None self.model = None self.optimization_time = None # Problem (reduced) residuals self.ll = None self.ll_binary = None self.jacobian = None self.norm = None self.aic = None self.aic_binary = None self.bic = None self.export_name = None # function self.args = None #classifier ensembler self.avg_nlinks = None self.std_nlinks = None self.avg_TPR = None self.std_TPR = None self.avg_SPC = None self.std_SPC = None self.avg_PPV = None self.std_PPV = None self.avg_ACC = None self.std_ACC = None self.avg_BACC = None self.std_BACC = None self.avg_F1score = None self.std_F1score = None #netstats ensembler self.avg_degree = None self.avg_annd = None self.avg_clust = None self.avg_strength = None self.avg_anns = None self.avg_clust_w = None self.std_degree = None self.std_annd = None self.std_clust = None self.std_strength = None self.std_anns = None self.std_clust_w = None #netstats_direct self.model_degree = None self.model_annd = None self.model_clust = None self.model_strength = None self.model_anns = None self.model_clust_w = None self.adjacency = adjacency #weighted_prediction self.cond_w_score = None
[docs] def solver( self, model, exogenous_variables = np.array([]), selection_variables = np.array([]), fixed_selection_params = np.array([]), verbose = False, print_steps = 1, imported_params = np.array([]), use_guess = np.array([]), maxiter = 10, tol = 1e-5 ): """Optimize chosen model for Directed Graphs and compute model selection measures such as AIC and BIC. :param model: Chosen model :type model: string :param exogenous_variables: Exogenous Variables used for the weighted step, in numpy 2-D format of the type N X k where N are the observations and k are the params, :type exogenous_variables: np.ndarray :param selection_variables: Exogenous Variables used for the topological step, in numpy 2-D format of the type N X k where N are the observations and k are the params, :type selection_variables: np.ndarray :param fixed_selection_params: Optional Constant Parameters for the topological step. When initialized different from null vector, it fixes the last "n" topological parameters, when selection_variables are used and n is the size of fixed_selection_params :type fixed_selection_params: np.ndarray :param verbose: True if you want to see every n*print_steps iterations, default is False :type verbose: boolean :param print_steps: If verbose is True, you print on screen every n*print_steps iterations. default is 1 :type print_steps: int :param print_steps: number of steps for which you can investigate iterations, if Verbose is True :type print_steps: int :param imported_params: If used, uses wanted params as solution. :type imported_params: np.ndarray :param use_guess: If used, uses wanted params as starters in the optimization. :type use_guess: np.ndarray :param maxiter: Maximum Iterations of solver function. :type maxiter: int :param tol: tolerance for infinite norm in the optimization process :type tol: float """ self.model = model self.exogenous_variables = exogenous_variables self.selection_variables = selection_variables self.fixed_selection_params = fixed_selection_params n_countries = int(np.sqrt(len(self.adjacency))) if model in ["ZIP", "ZINB", "L-CGeom", "L-CExp", "L-CPareto", "L-CGamma", "L-CLognormal"]: self.args = (self.adjacency,self.selection_variables,self.exogenous_variables,self.fixed_selection_params) elif model == "Logit": self.args = (self.adjacency,self.selection_variables,self.fixed_selection_params) elif model in ["UBCM","DBCM"]: self.args = (self.adjacency,) else: self.args = (self.adjacency,self.exogenous_variables) self.conditional_models = ["L-CGeom","k-CGeom","L-CExp","k-CExp","L-CPareto","k-CPareto","L-CGamma","k-CGamma","L-CLognormal","k-CLognormal"] ll_fun = { "Logit": lambda x: -lfC.ll_logit(x, self.adjacency,self.selection_variables,self.fixed_selection_params), "UBCM": lambda x: -lfC.ll_BCM( x, self.adjacency), "DBCM": lambda x: -lfC.ll_DBCM( x, self.adjacency), "POIS": lambda x: -lfE.ll_POIS(x, *self.args), "ZIP": lambda x: -lfE.ll_ZIP( x, *self.args), "NB2": lambda x: -lfE.ll_NB2(x, *self.args), "ZINB": lambda x: -lfE.ll_ZINB( x, *self.args), "L-IGeom": lambda x: -lfI.ll_LIGeom(x, *self.args), "k-IGeom": lambda x: -lfI.ll_kIGeom_directed( x, *self.args), "L-IExp": lambda x: -lfI.ll_LIExp(x, *self.args), "k-IExp": lambda x: -lfI.ll_kIExp_directed( x, *self.args), "CGeom": lambda x: -lfC.ll_CGeom(x, self.adjacency,self.exogenous_variables), "CExp": lambda x: -lfC.ll_CExp(x, self.adjacency,self.exogenous_variables), "CPareto": lambda x: -lfC.ll_CPareto(x, self.adjacency,self.exogenous_variables), "CGamma": lambda x: -lfC.ll_CGamma(x, self.adjacency,self.exogenous_variables), "CLognormal": lambda x: -lfC.ll_CLognormal(x, self.adjacency,self.exogenous_variables), } ll_fun_binary = { "Logit": lambda x: -lfC.ll_logit(x, self.adjacency,self.selection_variables,self.fixed_selection_params), "UBCM": lambda x: -lfC.ll_BCM( x, self.adjacency), "DBCM": lambda x: -lfC.ll_DBCM( x, self.adjacency), "POIS": lambda x: -lfE.ll_POIS_binary(x, *self.args), "ZIP": lambda x: -lfE.ll_ZIP_binary( x, *self.args), "NB2": lambda x: -lfE.ll_NB2_binary(x, *self.args), "ZINB": lambda x: -lfE.ll_ZINB_binary( x, *self.args), "L-IGeom": lambda x: -lfI.ll_LIGeom_binary(x, *self.args), "k-IGeom": lambda x: -lfI.ll_kIGeom_binary_directed( x, *self.args), "L-IExp": lambda x: -lfI.ll_LIExp_binary(x, *self.args), "k-IExp": lambda x: -lfI.ll_kIExp_binary_directed( x, *self.args), } jac_fun = { "Logit": lambda x: -lfC.jac_logit(x, self.adjacency,self.selection_variables,self.fixed_selection_params), "UBCM": lambda x: -lfC.jac_BCM( x, self.adjacency), "DBCM": lambda x: -lfC.jac_DBCM( x, self.adjacency), "POIS": lambda x: -lfE.jac_POIS(x, *self.args), "ZIP": lambda x: -lfE.jac_ZIP( x, *self.args), "NB2": lambda x: -lfE.jac_NB2(x, *self.args), "ZINB": lambda x: -lfE.jac_ZINB( x, *self.args), "L-IGeom": lambda x: -lfI.jac_LIGeom(x, *self.args), "k-IGeom": lambda x: -lfI.jac_kIGeom_directed( x, *self.args), "L-IExp": lambda x: -lfI.jac_LIExp(x, *self.args), "k-IExp": lambda x: -lfI.jac_kIExp_directed( x, *self.args), "CGeom": lambda x: -lfC.jac_CGeom(x, self.adjacency,self.exogenous_variables), "CExp": lambda x: -lfC.jac_CExp(x, self.adjacency,self.exogenous_variables), "CPareto": lambda x: -lfC.jac_CPareto(x, self.adjacency,self.exogenous_variables), "CGamma": lambda x: -lfC.jac_CGamma(x, self.adjacency,self.exogenous_variables), "CLognormal": lambda x: -lfC.jac_CLognormal(x, self.adjacency,self.exogenous_variables), } hess_fun = { "UBCM": lambda x: -lfC.hess_BCM( x, self.adjacency), "DBCM": lambda x: -lfC.hess_DBCM( x, self.adjacency), "L-IGeom": lambda x: -lfI.hess_LIGeom(x, *self.args), "L-IExp": lambda x: -lfI.hess_LIExp(x, *self.args), "CGeom": lambda x: -lfC.hess_CGeom(x, self.adjacency,self.exogenous_variables), "CExp": lambda x: -lfC.hess_CExp(x, self.adjacency,self.exogenous_variables), } if model in self.conditional_models: if model[:3] == "L-C": if len(imported_params) ==0: solution = sF.solver(model=self.model,Wij=self.adjacency,selection_variables=self.selection_variables,exogenous_variables=self.exogenous_variables, fixed_selection_params=self.fixed_selection_params,tol=tol,use_guess= use_guess, verbose=verbose, print_steps= print_steps, maxiter=maxiter ) self.params = solution else: self.params = imported_params n_selection_params = self.selection_variables.shape[1] - len(self.fixed_selection_params) ll_topological = ll_fun["Logit"](self.params[:n_selection_params]) #lfC.ll_logit(self.params[:n_selection_params],self.adjacency,self.selection_variables,self.fixed_selection_params) ll_weighted = ll_fun[self.model[2:]](self.params[n_selection_params:]) self.ll = ll_topological + ll_weighted self.ll_binary = ll_topological jac_topological = jac_fun["Logit"](self.params[:n_selection_params]) jac_weighted = jac_fun[self.model[2:]](self.params[n_selection_params:]) # print('norm_top',np.linalg.norm(jac_topological,ord=np.inf),'norm_w',np.linalg.norm(jac_weighted,ord=np.inf)) self.jacobian = np.concatenate((jac_topological,jac_weighted)) self.norm = np.linalg.norm(self.jacobian,ord=np.inf) self.aic = nF.AIC(self.ll,len(self.params)) self.aic_binary = nF.AIC(self.ll_binary,len(self.params[:n_selection_params])) self.bic = nF.BIC(self.ll,len(self.params),len(self.adjacency)) elif model[:3] == "k-C": if len(imported_params) == 0: self.input_model = self.model self.input_model = self.input_model + "_directed" solution = sF.solver(model=self.input_model,Wij=self.adjacency,selection_variables=self.selection_variables,exogenous_variables=self.exogenous_variables, fixed_selection_params=self.fixed_selection_params,tol=tol,use_guess= use_guess, verbose=verbose, print_steps= print_steps, maxiter=maxiter ) self.params = solution else: self.params = imported_params n_countries = int(np.sqrt(len(self.adjacency))) ll_topological = ll_fun["DBCM"](self.params[:2*n_countries]) #lfC.ll_logit(self.params[:n_countries],self.adjacency,self.selection_variables,self.fixed_selection_params) ll_weighted = ll_fun[self.model[2:]](self.params[2*n_countries:]) self.ll = ll_topological + ll_weighted self.ll_binary = ll_topological jac_topological = jac_fun["DBCM"](self.params[:2*n_countries]) jac_weighted = jac_fun[self.model[2:]](self.params[2*n_countries:]) # print('norm_top',np.linalg.norm(jac_topological,ord=np.inf),'norm_w',np.linalg.norm(jac_weighted,ord=np.inf)) self.jacobian = np.concatenate((jac_topological,jac_weighted)) self.norm = np.linalg.norm(self.jacobian,ord=np.inf) self.aic = nF.AIC(self.ll,len(self.params)) self.aic_binary = nF.AIC(self.ll_binary,len(self.params[:2*n_countries])) self.bic = nF.BIC(self.ll,len(self.params),len(self.adjacency)) else: if len(imported_params) == 0: self.input_model = self.model if self.input_model in ["k-IExp","k-IGeom"]: self.input_model = self.input_model + "_directed" solution = sF.solver(model=self.input_model,Wij=self.adjacency,selection_variables=self.selection_variables,exogenous_variables=self.exogenous_variables, fixed_selection_params=self.fixed_selection_params,tol=tol,use_guess= use_guess, verbose=verbose, print_steps= print_steps, maxiter=maxiter ) self.params = solution else: self.params = imported_params self.ll = ll_fun[self.model](self.params) self.ll_binary = ll_fun_binary[self.model](self.params) self.jacobian = jac_fun[self.model](self.params) self.norm = np.linalg.norm(self.jacobian,ord=np.inf) self.aic = nF.AIC(self.ll,len(self.params)) self.aic_binary = nF.AIC(self.ll_binary,len(self.params)) self.bic = nF.BIC(self.ll,len(self.params),len(self.adjacency))
[docs] def gen_ensemble(self,n_ensemble = 1000): """Generate an ensemble of -n_ensemble- networks according to the selected model :param n_ensemble: Number of wanted Graph realizations :type n_ensemble: int :return: N_obs X n_ensemble numpy matrix that collects all the ensemble adjacency matrices :rtype: np.ndarray """ if self.model in self.continuous_models: w_mat_ensemble = eF.faster_ensemble_matrix_directed(params=self.params,Wij=self.adjacency, model=self.model, exogenous_variables=self.exogenous_variables, selection_variables=self.selection_variables, fixed_selection_params = self.fixed_selection_params, n_ensemble=n_ensemble) elif self.model in self.discrete_models: w_mat_ensemble = eF.discrete_ensemble_matrix_directed(self.params,self.adjacency,self.model,self.exogenous_variables, self.selection_variables,self.fixed_selection_params,n_ensemble) else: raise TypeError("model not implemented!") self.w_ensemble_matrix = w_mat_ensemble
[docs] def classification_measures(self,n_ensemble = 1000, percentiles = (2.5,97.5),stats=[]): """Computes Measures for the quality of classification, listed in self.implemented_classifier_statistics. To be used after .solve() :param n_ensemble: Wanted number of ensemble graphs, default is 1000. :type n_ensemble: int :param percentiles: Explicit the percentiles used for the construction of the confidence interval, default is (2.5,97.5) for a 95 CI. :type percentiles: tuple :param stats: numpy array or list of classifier statistics in string format. :type stats: list of strings :return: Average value, standard deviation, percentiles and ensemble distribution of * classifier statistic in the graph ensemble. :rtype: float, float, tuple, np.ndarray """ if len(stats) == 0: stats = self.implemented_classifier_statistics top_mat = nF.pij_matrix_directed(self.params,self.model,self.adjacency, self.selection_variables,self.exogenous_variables,self.fixed_selection_params) self.model_binary_adjacency = top_mat if "TPR" in stats: self.avg_TPR, self.std_TPR, self.percentiles_TPR, self.array_TPR = nF.TPR_ensemble(top_mat,self.binary_adjacency,n_ensemble,percentiles) if "SPC" in stats: self.avg_SPC, self.std_SPC, self.percentiles_SPC, self.array_SPC = nF.SPC_ensemble(top_mat,self.binary_adjacency,n_ensemble,percentiles) if "PPV" in stats: self.avg_PPV, self.std_PPV, self.percentiles_PPV, self.array_PPV = nF.PPV_ensemble(top_mat,self.binary_adjacency,n_ensemble,percentiles) if "ACC" in stats: self.avg_ACC, self.std_ACC, self.percentiles_ACC, self.array_ACC = nF.ACC_ensemble(top_mat,self.binary_adjacency,n_ensemble,percentiles) if "BACC" in stats: self.avg_BACC, self.std_BACC, self.percentiles_BACC, self.array_BACC = nF.BACC_ensemble(top_mat,self.binary_adjacency,n_ensemble,percentiles) if "F1_score" in stats: self.avg_F1_score, self.std_F1_score, self.percentiles_F1_score, self.array_F1_score = nF.F1_score_ensemble(top_mat,self.binary_adjacency,n_ensemble,percentiles)
[docs] def netstats_measures(self,percentiles = (2.5,97.5),stats=[]): """Computes available network statistics, To be used after .solve() and .gen_ensemble(). Available attributes are .avg_*, .std_*, .percentiles_* and .array_* where * stands for the wanted statistics, avg_* is namely the ensemble average, std_* is the ensemble standard deviation, percentiles_* is a Tuple made by (percentile[0],percentile[1]) of the ensemble distribution, array_* is the whole ensemble distribution. :param percentiles: Explicit the percentiles used for the construction of the confidence interval, default is (2.5,97.5) for a 95 CI. :type percentiles: Tuple :param stats: numpy array or list of available network statistics. The wanted stats must be in the list -.implemented_network_statistics- :type stats: list """ if len(stats) == 0: stats = self.implemented_network_statistics if "degree" in stats: self.avg_degree, self.std_degree, self.percentiles_degree, self.array_degree = nF.degree_ensemble(self.w_ensemble_matrix,percentiles) if "degree_in" in stats: self.avg_degree_in, self.std_degree_in, self.percentiles_degree_in, self.array_degree_in = nF.degree_in_ensemble(self.w_ensemble_matrix,percentiles) if "annd" in stats: self.avg_annd, self.std_annd, self.percentiles_annd, self.array_annd = nF.annd_ensemble(self.w_ensemble_matrix,percentiles) if "clust" in stats: self.avg_clust, self.std_clust, self.percentiles_clust, self.array_clust = nF.clust_ensemble(self.w_ensemble_matrix,percentiles) if "strength" in stats: self.avg_strength, self.std_strength, self.percentiles_strength, self.array_strength = nF.st_ensemble(self.w_ensemble_matrix,percentiles) if "strength_in" in stats: self.avg_strength, self.std_strength, self.percentiles_strength, self.array_strength = nF.st_in_ensemble(self.w_ensemble_matrix,percentiles) if "anns" in stats: self.avg_anns, self.std_anns, self.percentiles_anns, self.array_anns = nF.anns_ensemble(self.w_ensemble_matrix,percentiles) if "clust_w" in stats: self.avg_clust_w, self.std_clust_w, self.percentiles_clust_w, self.array_clust_w = nF.cw_ensemble(self.w_ensemble_matrix,percentiles)
[docs] def reproduction_accuracy_s(self,percentiles = (2.5,97.5),stats=[]): """Computes RA_s, the percentage of nodes for which the network statistics are compatible with the graph ensemble according to a percentile confidence interval. To be used after .solve() and .gen_ensemble(). :param percentiles: Explicit the percentiles used for the construction of the confidence interval, default is (2.5,97.5) for a 95 CI. :type percentiles: tuple :param stats: numpy array or list of network statistics for which it is possible to recover RA_s. The wanted stats must be in the list -.implemented_network_statistics- :type stats: list of strings :return: L-list where L is the number of statistics, the order follows the input stats array :rtype: list of float """ if len(stats) == 0: stats = List(self.implemented_network_statistics) else: stats = List(stats) self.RA_s = nF.ensemble_coverage(self.w_ensemble_matrix,self.adjacency,percentiles,stats)
[docs] def reproduction_accuracy_w(self,percentiles = (2.5,97.5)): """Computes RA_w, the percentage of couples for which the weights are compatible with the graph ensemble according to a percentile confidence interval. To be used after .solve() and .gen_ensemble(). :param percentiles: Explicit the percentiles used for the construction of the confidence interval, default is (2.5,97.5) for a 95 CI. :type percentiles: Tuple :return: A number (0,1) that explicits RA_w :rtype: np.ndarray """ self.RA_w = nF.weighted_coverage(self.w_ensemble_matrix,self.adjacency, percentiles)