Source code for graspy.simulations.simulations

# Copyright 2019 NeuroData (http://neurodata.io)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np

from ..utils import symmetrize, cartprod
import warnings


def sample_edges(P, directed=False, loops=False):
    """
    Gemerates a binary random graph based on the P matrix provided

    Each element in P represents the probability of a connection between 
    a vertex indexed by the row i and the column j. 

    Parameters
    ----------
    P: np.ndarray, shape (n_vertices, n_vertices)
        Matrix of probabilities (between 0 and 1) for a random graph
    directed: boolean, optional (default=False)
        If False, output adjacency matrix will be symmetric. Otherwise, output adjacency
        matrix will be asymmetric.
    loops: boolean, optional (default=False)
        If False, no edges will be sampled in the diagonal. Otherwise, edges
        are sampled in the diagonal.

    Returns
    -------
    A: ndarray (n_vertices, n_vertices)
        Binary adjacency matrix the same size as P representing a random
        graph

    References
    ----------
    .. [1] Sussman, D.L., Tang, M., Fishkind, D.E., Priebe, C.E.  "A
       Consistent Adjacency Spectral Embedding for Stochastic Blockmodel Graphs,"
       Journal of the American Statistical Association, Vol. 107(499), 2012
    """
    if type(P) is not np.ndarray:
        raise TypeError("P must be numpy.ndarray")
    if len(P.shape) != 2:
        raise ValueError("P must have dimension 2 (n_vertices, n_dimensions)")
    if P.shape[0] != P.shape[1]:
        raise ValueError("P must be a square matrix")
    if not directed:
        # can cut down on sampling by ~half
        triu_inds = np.triu_indices(P.shape[0])
        samples = np.random.binomial(1, P[triu_inds])
        A = np.zeros_like(P)
        A[triu_inds] = samples
        A = symmetrize(A)
    else:
        A = np.random.binomial(1, P)

    if loops:
        return A
    else:
        return A - np.diag(np.diag(A))


[docs]def er_np(n, p, directed=False, loops=False, wt=1, wtargs=None, dc=None, dc_kws={}): r""" Samples a Erdos Renyi (n, p) graph with specified edge probability. Erdos Renyi (n, p) graph is a simple graph with n vertices and a probability p of edges being connected. Parameters ---------- n: int Number of vertices p: float Probability of an edge existing between two vertices, between 0 and 1. directed: boolean, optional (default=False) If False, output adjacency matrix will be symmetric. Otherwise, output adjacency matrix will be asymmetric. loops: boolean, optional (default=False) If False, no edges will be sampled in the diagonal. Otherwise, edges are sampled in the diagonal. wt: object, optional (default=1) Weight function for each of the edges, taking only a size argument. This weight function will be randomly assigned for selected edges. If 1, graph produced is binary. wtargs: dictionary, optional (default=None) Optional arguments for parameters that can be passed to weight function ``wt``. dc: function or array-like, shape (n_vertices) `dc` is used to generate a degree-corrected Erdos Renyi Model in which each node in the graph has a parameter to specify its expected degree relative to other nodes. - function: should generate a non-negative number to be used as a degree correction to create a heterogenous degree distribution. A weight will be generated for each vertex, normalized so that the sum of weights is 1. - array-like of scalars, shape (n_vertices): The weights should sum to 1; otherwise, they will be normalized and a warning will be thrown. The scalar associated with each vertex is the node's relative expected degree. dc_kws: dictionary Ignored if `dc` is none or array of scalar. If `dc` is a function, `dc_kws` corresponds to its named arguments. If not specified, in either case all functions will assume their default parameters. Returns ------- A : ndarray, shape (n, n) Sampled adjacency matrix """ if isinstance(dc, (list, np.ndarray)) and all(callable(f) for f in dc): raise TypeError("dc is not of type function or array-like of scalars") if not np.issubdtype(type(n), np.integer): raise TypeError("n is not of type int.") if not np.issubdtype(type(p), np.floating): raise TypeError("p is not of type float.") if type(loops) is not bool: raise TypeError("loops is not of type bool.") if type(directed) is not bool: raise TypeError("directed is not of type bool.") n_sbm = np.array([n]) p_sbm = np.array([[p]]) g = sbm(n_sbm, p_sbm, directed, loops, wt, wtargs, dc, dc_kws) return g
[docs]def er_nm(n, m, directed=False, loops=False, wt=1, wtargs=None): r""" Samples an Erdos Renyi (n, m) graph with specified number of edges. Erdos Renyi (n, m) graph is a simple graph with n vertices and exactly m number of total edges. Parameters ---------- n: int Number of vertices m: int Number of edges, a value between 1 and :math:`n^2`. directed: boolean, optional (default=False) If False, output adjacency matrix will be symmetric. Otherwise, output adjacency matrix will be asymmetric. loops: boolean, optional (default=False) If False, no edges will be sampled in the diagonal. Otherwise, edges are sampled in the diagonal. wt: object, optional (default=1) Weight function for each of the edges, taking only a size argument. This weight function will be randomly assigned for selected edges. If 1, graph produced is binary. wtargs: dictionary, optional (default=None) Optional arguments for parameters that can be passed to weight function ``wt``. Returns ------- A: ndarray, shape (n, n) Sampled adjacency matrix Examples -------- >>> n = 100 >>> m = 20 >>> wt = np.random.uniform >>> wtargs = dict(low=1, high=2) >>> A = weighted_er_nm(n, m, wt=wt, wtargs=wtargs) """ if not np.issubdtype(type(m), np.integer): raise TypeError("m is not of type int.") elif m <= 0: msg = "m must be > 0." raise ValueError(msg) if not np.issubdtype(type(n), np.integer): raise TypeError("n is not of type int.") elif n <= 0: msg = "n must be > 0." raise ValueError(msg) if type(directed) is not bool: raise TypeError("directed is not of type bool.") if type(loops) is not bool: raise TypeError("loops is not of type bool.") # check weight function if not np.issubdtype(type(wt), np.integer): if not callable(wt): raise TypeError("You have not passed a function for wt.") # compute max number of edges to sample if loops: if directed: max_edges = n ** 2 msg = "n^2" else: max_edges = n * (n + 1) // 2 msg = "n(n+1)/2" else: if directed: max_edges = n * (n - 1) msg = "n(n-1)" else: max_edges = n * (n - 1) // 2 msg = "n(n-1)/2" if m > max_edges: msg = "You have passed a number of edges, {}, exceeding {}, {}." msg = msg.format(m, msg, max_edges) raise ValueError(msg) A = np.zeros((n, n)) # check if directedness is desired if directed: if loops: # use all of the indices idx = np.where(np.logical_not(A)) else: # use only the off-diagonal indices idx = np.where(~np.eye(n, dtype=bool)) else: # use upper-triangle indices, and ignore diagonal according # to loops argument idx = np.triu_indices(n, k=int(loops is False)) # get idx in 1d coordinates by ravelling triu = np.ravel_multi_index(idx, A.shape) # choose M of them triu = np.random.choice(triu, size=m, replace=False) # unravel back triu = np.unravel_index(triu, A.shape) # check weight function if not np.issubdtype(type(wt), np.number): wt = wt(size=m, **wtargs) A[triu] = wt if not directed: A = symmetrize(A) return A
[docs]def sbm(n, p, directed=False, loops=False, wt=1, wtargs=None, dc=None, dc_kws={}): """ Samples a graph from the stochastic block model (SBM). SBM produces a graph with specified communities, in which each community can have different sizes and edge probabilities. Parameters ---------- n: list of int, shape (n_communities) Number of vertices in each community. Communities are assigned n[0], n[1], ... p: array-like, shape (n_communities, n_communities) Probability of an edge between each of the communities, where p[i, j] indicates the probability of a connection between edges in communities [i, j]. 0 < p[i, j] < 1 for all i, j. directed: boolean, optional (default=False) If False, output adjacency matrix will be symmetric. Otherwise, output adjacency matrix will be asymmetric. loops: boolean, optional (default=False) If False, no edges will be sampled in the diagonal. Otherwise, edges are sampled in the diagonal. wt: object or array-like, shape (n_communities, n_communities) if Wt is an object, a weight function to use globally over the sbm for assigning weights. 1 indicates to produce a binary graph. If Wt is an array-like, a weight function for each of the edge communities. Wt[i, j] corresponds to the weight function between communities i and j. If the entry is a function, should accept an argument for size. An entry of Wt[i, j] = 1 will produce a binary subgraph over the i, j community. wtargs: dictionary or array-like, shape (n_communities, n_communities) if Wt is an object, Wtargs corresponds to the trailing arguments to pass to the weight function. If Wt is an array-like, Wtargs[i, j] corresponds to trailing arguments to pass to Wt[i, j]. dc: function or array-like, shape (n_vertices) or (n_communities), optional `dc` is used to generate a degree-corrected stochastic block model [1] in which each node in the graph has a parameter to specify its expected degree relative to other nodes within its community. - function: should generate a non-negative number to be used as a degree correction to create a heterogenous degree distribution. A weight will be generated for each vertex, normalized so that the sum of weights in each block is 1. - array-like of functions, shape (n_communities): Each function will generate the degree distribution for its respective community. - array-like of scalars, shape (n_vertices): The weights in each block should sum to 1; otherwise, they will be normalized and a warning will be thrown. The scalar associated with each vertex is the node's relative expected degree within its community. dc_kws: dictionary or array-like, shape (n_communities), optional Ignored if `dc` is none or array of scalar. If `dc` is a function, `dc_kws` corresponds to its named arguments. If `dc` is an array-like of functions, `dc_kws` should be an array-like, shape (n_communities), of dictionary. Each dictionary is the named arguments for the corresponding function for that community. If not specified, in either case all functions will assume their default parameters. References ---------- .. [1] Tai Qin and Karl Rohe. "Regularized spectral clustering under the Degree-Corrected Stochastic Blockmodel," Advances in Neural Information Processing Systems 26, 2013 Returns ------- A: ndarray, shape (sum(n), sum(n)) Sampled adjacency matrix """ # Check n if not isinstance(n, (list, np.ndarray)): msg = "n must be a list or np.array, not {}.".format(type(n)) raise TypeError(msg) else: n = np.array(n) if not np.issubdtype(n.dtype, np.integer): msg = "There are non-integer elements in n" raise ValueError(msg) # Check p if not isinstance(p, (list, np.ndarray)): msg = "p must be a list or np.array, not {}.".format(type(p)) raise TypeError(msg) else: p = np.array(p) if not np.issubdtype(p.dtype, np.number): msg = "There are non-numeric elements in p" raise ValueError(msg) elif p.shape != (n.size, n.size): msg = "p is must have shape len(n) x len(n), not {}".format(p.shape) raise ValueError(msg) elif np.any(p < 0) or np.any(p > 1): msg = "Values in p must be in between 0 and 1." raise ValueError(msg) # Check wt and wtargs if not np.issubdtype(type(wt), np.number) and not callable(wt): if not isinstance(wt, (list, np.ndarray)): msg = "wt must be a numeric, list, or np.array, not {}".format(type(wt)) raise TypeError(msg) if not isinstance(wtargs, (list, np.ndarray)): msg = "wtargs must be a numeric, list, or np.array, not {}".format( type(wtargs) ) raise TypeError(msg) wt = np.array(wt, dtype=object) wtargs = np.array(wtargs, dtype=object) # if not number, check dimensions if wt.shape != (n.size, n.size): msg = "wt must have size len(n) x len(n), not {}".format(wt.shape) raise ValueError(msg) if wtargs.shape != (n.size, n.size): msg = "wtargs must have size len(n) x len(n), not {}".format(wtargs.shape) raise ValueError(msg) # check if each element is a function for element in wt.ravel(): if not callable(element): msg = "{} is not a callable function.".format(element) raise TypeError(msg) else: wt = np.full(p.shape, wt, dtype=object) wtargs = np.full(p.shape, wtargs, dtype=object) # Check directed if not directed: if np.any(p != p.T): raise ValueError("Specified undirected, but P is directed.") if np.any(wt != wt.T): raise ValueError("Specified undirected, but Wt is directed.") if np.any(wtargs != wtargs.T): raise ValueError("Specified undirected, but Wtargs is directed.") K = len(n) # the number of communities counter = 0 # get a list of community indices cmties = [] for i in range(0, K): cmties.append(range(counter, counter + n[i])) counter += n[i] # Check degree-corrected input parameters if callable(dc): # Check that the paramters are a dict if not isinstance(dc_kws, dict): msg = "dc_kws must be of type dict not{}".format(type(dc_kws)) raise TypeError(msg) # Create the probability matrix for each vertex dcProbs = np.array([dc(**dc_kws) for _ in range(0, sum(n))], dtype="float") for indices in cmties: dcProbs[indices] /= sum(dcProbs[indices]) elif isinstance(dc, (list, np.ndarray)) and np.issubdtype( np.array(dc).dtype, np.number ): dcProbs = np.array(dc, dtype=float) # Check size and element types if not np.issubdtype(dcProbs.dtype, np.number): msg = "There are non-numeric elements in dc, {}".format(dcProbs.dtype) raise ValueError(msg) elif dcProbs.shape != (sum(n),): msg = "dc must have size equal to the number of" msg += " vertices {0}, not {1}".format(sum(n), dcProbs.shape) raise ValueError(msg) elif np.any(dcProbs < 0): msg = "Values in dc cannot be negative." raise ValueError(msg) # Check that probabilities sum to 1 in each block for i in range(0, K): if not np.isclose(sum(dcProbs[cmties[i]]), 1, atol=1.0e-8): msg = "Block {} probabilities should sum to 1, normalizing...".format(i) warnings.warn(msg, UserWarning) dcProbs[cmties[i]] /= sum(dcProbs[cmties[i]]) elif isinstance(dc, (list, np.ndarray)) and all(callable(f) for f in dc): dcFuncs = np.array(dc) if dcFuncs.shape != (len(n),): msg = "dc must have size equal to the number of blocks {0}, not {1}".format( len(n), dcFuncs.shape ) raise ValueError(msg) # Check that the parameters type, length, and type if not isinstance(dc_kws, (list, np.ndarray)): # Allows for nonspecification of default parameters for all functions if dc_kws == {}: dc_kws = [{} for _ in range(0, len(n))] else: msg = "dc_kws must be of type list or np.ndarray, not {}".format( type(dc_kws) ) raise TypeError(msg) elif not len(dc_kws) == len(n): msg = "dc_kws must have size equal to" msg += " the number of blocks {0}, not {1}".format(len(n), len(dc_kws)) raise ValueError(msg) elif not all(type(kw) == dict for kw in dc_kws): msg = "dc_kws elements must all be of type dict" raise TypeError(msg) # Create the probability matrix for each vertex dcProbs = np.array( [ dcFunc(**kws) for dcFunc, kws, size in zip(dcFuncs, dc_kws, n) for _ in range(0, size) ], dtype="float", ) # dcProbs = dcProbs.astype(float) for indices in cmties: dcProbs[indices] /= sum(dcProbs[indices]) # dcProbs[indices] = dcProbs / dcProbs[indices].sum() elif dc is not None: msg = "dc must be a function or a list or np.array of numbers or callable" msg += " functions, not {}".format(type(dc)) raise ValueError(msg) # End Checks, begin simulation A = np.zeros((sum(n), sum(n))) for i in range(0, K): if directed: jrange = range(0, K) else: jrange = range(i, K) for j in jrange: block_wt = wt[i, j] block_wtargs = wtargs[i, j] block_p = p[i, j] # identify submatrix for community i, j # cartesian product to identify edges for community i,j pair cprod = cartprod(cmties[i], cmties[j]) # get idx in 1d coordinates by ravelling triu = np.ravel_multi_index((cprod[:, 0], cprod[:, 1]), A.shape) pchoice = np.random.uniform(size=len(triu)) if dc is not None: # (v1,v2) connected with probability p*k_i*k_j*dcP[v1]*dcP[v2] num_edges = sum(pchoice < block_p) edge_dist = dcProbs[cprod[:, 0]] * dcProbs[cprod[:, 1]] # If n_edges greater than support of dc distribution, pick fewer edges if num_edges > sum(edge_dist > 0): msg = "More edges sampled than nonzero pairwise dc entries." msg += " Picking fewer edges" warnings.warn(msg, UserWarning) num_edges = sum(edge_dist > 0) triu = np.random.choice( triu, size=num_edges, replace=False, p=edge_dist ) else: # connected with probability p triu = triu[pchoice < block_p] if type(block_wt) is not int: block_wt = block_wt(size=len(triu), **block_wtargs) triu = np.unravel_index(triu, A.shape) A[triu] = block_wt if not loops: A = A - np.diag(np.diag(A)) if not directed: A = symmetrize(A) return A
[docs]def rdpg(X, Y=None, rescale=True, directed=False, loops=True, wt=1, wtargs=None): r""" Samples a random graph based on the latent positions in X (and optionally in Y) If only X :math:`\in\mathbb{R}^{n\times d}` is given, the P matrix is calculated as :math:`P = XX^T`. If X, Y :math:`\in\mathbb{R}^{n\times d}` is given, then :math:`P = XY^T`. These operations correspond to the dot products between a set of latent positions, so each row in X or Y represents the latent positions in :math:`\mathbb{R}^{d}` for a single vertex in the random graph Note that this function may also rescale or clip the resulting P matrix to get probabilities between 0 and 1, or remove loops. A binary random graph is then sampled from the P matrix described by X (and possibly Y). Parameters ---------- X: np.ndarray, shape (n_vertices, n_dimensions) latent position from which to generate a P matrix if Y is given, interpreted as the left latent position Y: np.ndarray, shape (n_vertices, n_dimensions) or None, optional right latent position from which to generate a P matrix rescale: boolean, optional (default=True) when rescale is True, will subtract the minimum value in P (if it is below 0) and divide by the maximum (if it is above 1) to ensure that P has entries between 0 and 1. If False, elements of P outside of [0, 1] will be clipped directed: boolean, optional (default=False) If False, output adjacency matrix will be symmetric. Otherwise, output adjacency matrix will be asymmetric. loops: boolean, optional (default=True) If False, no edges will be sampled in the diagonal. Diagonal elements in P matrix are removed prior to rescaling (see above) which may affect behavior. Otherwise, edges are sampled in the diagonal. wt: object, optional (default=1) Weight function for each of the edges, taking only a size argument. This weight function will be randomly assigned for selected edges. If 1, graph produced is binary. wtargs: dictionary, optional (default=None) Optional arguments for parameters that can be passed to weight function ``wt``. Returns ------- A: ndarray (n_vertices, n_vertices) A matrix representing the probabilities of connections between vertices in a random graph based on their latent positions References ---------- .. [1] Sussman, D.L., Tang, M., Fishkind, D.E., Priebe, C.E. "A Consistent Adjacency Spectral Embedding for Stochastic Blockmodel Graphs," Journal of the American Statistical Association, Vol. 107(499), 2012 """ P = p_from_latent(X, Y, rescale=rescale, loops=loops) A = sample_edges(P, directed=directed, loops=loops) # check weight function if (not np.issubdtype(type(wt), np.integer)) and ( not np.issubdtype(type(wt), np.floating) ): if not callable(wt): raise TypeError("You have not passed a function for wt.") if not np.issubdtype(type(wt), np.number): wts = wt(size=(np.count_nonzero(A)), **wtargs) A[A > 0] = wts else: A *= wt return A
def p_from_latent(X, Y=None, rescale=True, loops=True): r""" Gemerates a matrix of connection probabilities for a random graph based on a set of latent positions If only X is given, the P matrix is calculated as :math:`P = XX^T` If X and Y is given, then :math:`P = XY^T` These operations correspond to the dot products between a set of latent positions, so each row in X or Y represents the latent positions in :math:`\mathbb{R}^{num-columns}` for a single vertex in the random graph Note that this function may also rescale or clip the resulting P matrix to get probabilities between 0 and 1, or remove loops Parameters ---------- X: np.ndarray, shape (n_vertices, n_dimensions) latent position from which to generate a P matrix if Y is given, interpreted as the left latent position Y: np.ndarray, shape (n_vertices, n_dimensions) or None, optional right latent position from which to generate a P matrix rescale: boolean, optional (default=True) when rescale is True, will subtract the minimum value in P (if it is below 0) and divide by the maximum (if it is above 1) to ensure that P has entries between 0 and 1. If False, elements of P outside of [0, 1] will be clipped loops: boolean, optional (default=True) whether to allow elements on the diagonal (corresponding to self connections in a graph) in the returned P matrix. If loops is False, these elements are removed prior to rescaling (see above) which may affect behavior Returns ------- P: ndarray (n_vertices, n_vertices) A matrix representing the probabilities of connections between vertices in a random graph based on their latent positions References ---------- .. [1] Sussman, D.L., Tang, M., Fishkind, D.E., Priebe, C.E. "A Consistent Adjacency Spectral Embedding for Stochastic Blockmodel Graphs," Journal of the American Statistical Association, Vol. 107(499), 2012 """ if Y is None: Y = X if type(X) is not np.ndarray or type(Y) is not np.ndarray: raise TypeError("Latent positions must be numpy.ndarray") if X.ndim != 2 or Y.ndim != 2: raise ValueError( "Latent positions must have dimension 2 (n_vertices, n_dimensions)" ) if X.shape != Y.shape: raise ValueError("Dimensions of latent positions X and Y must be the same") P = X @ Y.T # should this be before or after the rescaling, could give diff answers if not loops: P = P - np.diag(np.diag(P)) if rescale: if P.min() < 0: P = P - P.min() if P.max() > 1: P = P / P.max() else: P[P < 0] = 0 P[P > 1] = 1 return P