# Source code for graspy.simulations.simulations

```
import numpy as np
from ..utils import symmetrize
def cartprod(*arrays):
N = len(arrays)
return np.transpose(
np.meshgrid(*arrays, indexing="ij"), np.roll(np.arange(N + 1), -1)
).reshape(-1, N)
def sample_edges(P, directed=False, loops=False):
"""
Gemerates a binary random graph based on the P matrix provided
Each element in P represents the probability of a connection between
a vertex indexed by the row i and the column j.
Parameters
----------
P: np.ndarray (num_vertices, num_vertices)
Matrix of probabilities (between 0 and 1) for a random graph
directed: boolean (default False)
Whether to force symmetry upon the resulting graph by only
sampling from the upper triangle of P and then reflecting the
sampled values accross the diagonal
loops: boolean
Returns
-------
A: np.ndarray (num_vertices, num_vertices)
Binary adjacency matrix the same size as P representing a random
graph
References
----------
.. [1] Sussman, D.L., Tang, M., Fishkind, D.E., Priebe, C.E. "A
Consistent Adjacency Spectral Embedding for Stochastic Blockmodel Graphs,"
Journal of the American Statistical Association, Vol. 107(499), 2012
"""
if type(P) is not np.ndarray:
raise TypeError("P must be numpy.ndarray")
if len(P.shape) != 2:
raise ValueError("P must have dimension 2 (n_vertices, n_dimensions)")
if P.shape[0] != P.shape[1]:
raise ValueError("P must be a square matrix")
if not directed:
# can cut down on sampling by ~half
triu_inds = np.triu_indices(P.shape[0])
samples = np.random.binomial(1, P[triu_inds])
A = np.zeros_like(P)
A[triu_inds] = samples
A = symmetrize(A)
else:
A = np.random.binomial(1, P)
if loops:
return A
else:
return A - np.diag(np.diag(A))
[docs]def er_np(n, p, directed=False, loops=False, wt=1, wtargs=None):
r"""
Samples a Erdos Renyi (n, p) graph with specified edge probability.
Erdos Renyi (n, p) graph is a simple graph with n vertices and a probability
p of edges being connected.
Parameters
----------
n: int
the number of vertices
p: float
the probability of an edge existing between two vertices,
between 0 and 1.
directed: boolean optional, default = False
Whether to create a directed graph or not.
loops: boolean optional, default = False
Whether to include self-loops or not.
wt: object
a weight function for each of the edges, taking
only a size argument. This weight function will
be randomly assigned for selected edges. If 1,
graph produced is binary.
wtargs: dictionary
optional arguments for parameters that can be passed
to weight function wt.
Returns
-------
A : array-like, shape (n, n)
Sampled adjacency matrix
"""
if not np.issubdtype(type(p), np.floating):
raise TypeError("p is not of type float.")
elif p < 0:
msg = "You have passed a probability, {}, less than 0."
msg = msg.format(float(p))
raise ValueError(msg)
elif p > 1:
msg = "You have passed a probability, {}, greater than 1."
msg = msg.format(float(p))
raise ValueError(msg)
if not np.issubdtype(type(n), np.integer):
raise TypeError("n is not of type int.")
elif n <= 0:
msg = "n must be > 0."
raise ValueError(msg)
if type(directed) is not bool:
raise TypeError("directed is not of type bool.")
if type(loops) is not bool:
raise TypeError("loops is not of type bool.")
# check weight function
if not np.issubdtype(type(wt), np.number):
if not callable(wt):
raise TypeError("You have not passed a function for wt.")
probs = np.ones((n, n)) * p
A = sample_edges(probs, directed, loops)
if not np.issubdtype(type(wt), np.number):
weights = wt(size=int(A.sum()), **wtargs)
A[A == 1] = weights
else:
A *= wt
if not directed:
A = symmetrize(A)
return A
[docs]def er_nm(n, m, directed=False, loops=False, wt=1, wtargs=None):
r"""
Samples a weighted Erdos Renyi (n, m) graph with specified number of edges.
Erdos Renyi (n, m) graph is a simple graph with n vertices and exactly m
number of total edges.
Parameters
----------
n: int
Number of vertices
m: int
Number of edges, a value between 1 and :math:`n^2`.
directed: boolean optional, default = False
Whether to create a directed graph or not.
loops: boolean optional, default = False
Whether to include self-loops or not.
wt: object
a weight function for each of the edges, taking
only a size argument. This weight function will
be randomly assigned for selected edges. If 1,
graph produced is binary.
wtargs: dictionary
optional arguments for parameters that can be passed
to weight function wt.
Returns
-------
A: array-like, shape (n, n)
Sampled adjacency matrix
Examples
--------
>>> n = 100
>>> m = 20
>>> wt = np.random.uniform
>>> wtargs = dict(low=1, high=2)
>>> A = weighted_er_nm(n, m, wt=wt, wtargs=wtargs)
"""
if not np.issubdtype(type(m), np.integer):
raise TypeError("m is not of type int.")
elif m <= 0:
msg = "m must be > 0."
raise ValueError(msg)
if not np.issubdtype(type(n), np.integer):
raise TypeError("n is not of type int.")
elif n <= 0:
msg = "n must be > 0."
raise ValueError(msg)
if type(directed) is not bool:
raise TypeError("directed is not of type bool.")
if type(loops) is not bool:
raise TypeError("loops is not of type bool.")
# check weight function
if not np.issubdtype(type(wt), np.integer):
if not callable(wt):
raise TypeError("You have not passed a function for wt.")
# compute max number of edges to sample
if loops:
if directed:
max_edges = n ** 2
msg = "n^2"
else:
max_edges = n * (n + 1) // 2
msg = "n(n+1)/2"
else:
if directed:
max_edges = n * (n - 1)
msg = "n(n-1)"
else:
max_edges = n * (n - 1) // 2
msg = "n(n-1)/2"
if m > max_edges:
msg = "You have passed a number of edges, {}, exceeding {}, {}."
msg = msg.format(m, msg, max_edges)
raise ValueError(msg)
A = np.zeros((n, n))
# check if directedness is desired
if directed:
if loops:
# use all of the indices
idx = np.where(np.logical_not(A))
else:
# use only the off-diagonal indices
idx = np.where(~np.eye(n, dtype=bool))
else:
# use upper-triangle indices, and ignore diagonal according
# to loops argument
idx = np.triu_indices(n, k=int(loops == False))
# get idx in 1d coordinates by ravelling
triu = np.ravel_multi_index(idx, dims=A.shape)
# choose M of them
triu = np.random.choice(triu, size=m, replace=False)
# unravel back
triu = np.unravel_index(triu, dims=A.shape)
# check weight function
if not np.issubdtype(type(wt), np.number):
wt = wt(size=m, **wtargs)
A[triu] = wt
if not directed:
A = symmetrize(A)
return A
[docs]def sbm(n, p, directed=False, loops=False, wt=1, wtargs=None):
"""
n: list of int, shape (n_communities)
the number of vertices in each community. Communities
are assigned n[0], n[1], ...
p: array-like, shape (n_communities, n_communities)
the probability of an edge between each of the communities,
where p[i, j] indicates the probability of a connection
between edges in communities [i, j]. 0 < p[i, j] < 1
for all i, j.
directed: boolean
whether or not the graph will be directed.
loops: boolean
whether to allow self-loops for vertices.
wt: object or array-like, shape (n_communities, n_communities)
if Wt is an object, a weight function to use globally over
the sbm for assigning weights. 1 indicates to produce a binary
graph. If Wt is an array-like, a weight function for each of
the edge communities. Wt[i, j] corresponds to the weight function
between communities i and j. If the entry is a function, should
accept an argument for size. An entry of Wt[i, j] = 1 will produce a
binary subgraph over the i, j community.
wtargs: dictionary or array-like, shape (n_communities, n_communities)
if Wt is an object, Wtargs corresponds to the trailing arguments
to pass to the weight function. If Wt is an array-like, Wtargs[i, j]
corresponds to trailing arguments to pass to Wt[i, j].
"""
# Check n
if not isinstance(n, (list, np.ndarray)):
msg = "n must be a list or np.array, not {}.".format(type(n))
raise TypeError(msg)
else:
n = np.array(n)
if not np.issubdtype(n.dtype, np.integer):
msg = "There are non-integer elements in n"
raise ValueError(msg)
# Check p
if not isinstance(p, (list, np.ndarray)):
msg = "p must be a list or np.array, not {}.".format(type(p))
raise TypeError(msg)
else:
p = np.array(p)
if not np.issubdtype(p.dtype, np.number):
msg = "There are non-numeric elements in p"
raise ValueError(msg)
elif p.shape != (n.size, n.size):
msg = "p is must have shape len(n) x len(n), not {}".format(p.shape)
raise ValueError(msg)
elif np.any(p < 0) or np.any(p > 1):
msg = "Values in p must be in between 0 and 1."
raise ValueError(msg)
# Check wt and wtargs
if not np.issubdtype(type(wt), np.number) and not callable(wt):
if not isinstance(wt, (list, np.ndarray)):
msg = "wt must be a numeric, list, or np.array, not{}".format(type(wt))
raise TypeError(msg)
if not isinstance(wtargs, (list, np.ndarray)):
msg = "wtargs must be a numeric, list, or np.array, not{}".format(
type(wtargs)
)
raise TypeError(msg)
wt = np.array(wt, dtype=object)
wtargs = np.array(wtargs, dtype=object)
# if not number, check dimensions
if wt.shape != (n.size, n.size):
msg = "wt must have size len(n) x len(n), not {}".format(wt.shape)
raise ValueError(msg)
if wtargs.shape != (n.size, n.size):
msg = "wtargs must have size len(n) x len(n), not {}".format(wtargs.shape)
raise ValueError(msg)
# check if each element is a function
for element in wt.ravel():
if not callable(element):
msg = "{} is not a callable function.".format(element)
raise TypeError(msg)
else:
wt = np.full(p.shape, wt, dtype=object)
wtargs = np.full(p.shape, wtargs, dtype=object)
# Check directed
if not directed:
if np.any(p != p.T):
raise ValueError("Specified undirected, but P is directed.")
if np.any(wt != wt.T):
raise ValueError("Specified undirected, but Wt is directed.")
if np.any(wtargs != wtargs.T):
raise ValueError("Specified undirected, but Wtargs is directed.")
K = len(n) # the number of communities
counter = 0
# get a list of community indices
cmties = []
for i in range(0, K):
cmties.append(range(counter, counter + n[i]))
counter += n[i]
A = np.zeros((sum(n), sum(n)))
for i in range(0, K):
if directed:
jrange = range(0, K)
else:
jrange = range(i, K)
for j in jrange:
block_wt = wt[i, j]
block_wtargs = wtargs[i, j]
block_p = p[i, j]
# identify submatrix for community i, j
# cartesian product to identify edges for community i,j pair
cprod = cartprod(cmties[i], cmties[j])
# get idx in 1d coordinates by ravelling
triu = np.ravel_multi_index((cprod[:, 0], cprod[:, 1]), dims=A.shape)
pchoice = np.random.uniform(size=len(triu))
# connected with probability p
triu = triu[pchoice < block_p]
if type(block_wt) is not int:
block_wt = block_wt(size=len(triu), **block_wtargs)
triu = np.unravel_index(triu, dims=A.shape)
A[triu] = block_wt
if not loops:
A = A - np.diag(np.diag(A))
if not directed:
A = symmetrize(A)
return A
[docs]def rdpg(X, Y=None, rescale=True, directed=False, loops=True, wt=1, wtargs=None):
"""
Samples a random graph based on the latent positions in X (and
optionally in Y)
If only X is given, the P matrix is calculated as :math:`P = XX^T`
If X and Y is given, then :math:`P = XY^T`
These operations correspond to the dot products between a set of latent
positions, so each row in X or Y represents the latent positions in
:math:`\R^{num_columns}` for a single vertex in the random graph
Note that this function may also rescale or clip the resulting P
matrix to get probabilities between 0 and 1, or remove loops.
A binary random graph is then sampled from the P matrix described
by X (and possibly Y)
Parameters
----------
X: np.ndarray (2 dimensions, same shape as Y if given)
latent position from which to generate a P matrix
if Y is given, interpreted as the left latent position
Y: np.ndarray (2 dimensions, same shape as X)
right latent position from which to generate a P matrix
rescale: boolean (default True)
when rescale is True, will subtract the minimum value in
P (if it is below 0) and divide by the maximum (if it is
above 1) to ensure that P has entries between 0 and 1. If
False, elements of P outside of [0, 1] will be clipped
directed: boolean (default False)
Whether to force symmetry upon the resulting graph by only
sampling from the upper triangle of P and then reflecting the
sampled values accross the diagonal
loops: boolean (default True)
whether to allow elements on the diagonal (corresponding
to self connections in a graph) in the returned P matrix.
If loops is False, these elements are removed prior to
rescaling (see above) which may affect behavior
wt: object
a weight function for each of the edges, taking
only a size argument. This weight function will
be randomly assigned for selected edges. If 1,
graph produced is binary.
wtargs: dictionary
optional arguments for parameters that can be passed
to weight function wt.
Returns
-------
P: np.ndarray (X.shape[0], X.shape[0])
A matrix representing the probabilities of connections between
vertices in a random graph based on their latent positions
References
----------
.. [1] Sussman, D.L., Tang, M., Fishkind, D.E., Priebe, C.E. "A
Consistent Adjacency Spectral Embedding for Stochastic Blockmodel Graphs,"
Journal of the American Statistical Association, Vol. 107(499), 2012
"""
P = p_from_latent(X, Y, rescale=rescale, loops=loops)
A = sample_edges(P, directed=directed, loops=loops)
# check weight function
if (not np.issubdtype(type(wt), np.integer)) and (
not np.issubdtyp(type(wt), np.floating)
):
if not callable(wt):
raise TypeError("You have not passed a function for wt.")
if not np.issubdtype(type(wt), np.number):
wts = wt(size=(np.count_nonzero(A)), **wtargs)
A[A > 0] = wts
else:
A *= wt
return A
def p_from_latent(X, Y=None, rescale=True, loops=True):
"""
Gemerates a matrix of connection probabilities for a random graph
based on a set of latent positions
If only X is given, the P matrix is calculated as :math:`P = XX^T`
If X and Y is given, then :math:`P = XY^T`
These operations correspond to the dot products between a set of latent
positions, so each row in X or Y represents the latent positions in
:math:`\R^{num_columns}` for a single vertex in the random graph
Note that this function may also rescale or clip the resulting P
matrix to get probabilities between 0 and 1, or remove loops
Parameters
----------
X: np.ndarray (2 dimensions, same shape as Y if given)
latent position from which to generate a P matrix
if Y is given, interpreted as the left latent position
Y: np.ndarray (2 dimensions, same shape as X)
right latent position from which to generate a P matrix
rescale: boolean (default True)
when rescale is True, will subtract the minimum value in
P (if it is below 0) and divide by the maximum (if it is
above 1) to ensure that P has entries between 0 and 1. If
False, elements of P outside of [0, 1] will be clipped
loops: boolean (default True)
whether to allow elements on the diagonal (corresponding
to self connections in a graph) in the returned P matrix.
If loops is False, these elements are removed prior to
rescaling (see above) which may affect behavior
Returns
-------
P: np.ndarray (X.shape[0], X.shape[0])
A matrix representing the probabilities of connections between
vertices in a random graph based on their latent positions
References
----------
.. [1] Sussman, D.L., Tang, M., Fishkind, D.E., Priebe, C.E. "A
Consistent Adjacency Spectral Embedding for Stochastic Blockmodel Graphs,"
Journal of the American Statistical Association, Vol. 107(499), 2012
"""
if Y is None:
Y = X
if type(X) is not np.ndarray or type(Y) is not np.ndarray:
raise TypeError("Latent positions must be numpy.ndarray")
if X.ndim != 2 or Y.ndim != 2:
raise ValueError(
"Latent positions must have dimension 2 (n_vertices, n_dimensions)"
)
if X.shape != Y.shape:
raise ValueError("Dimensions of latent positions X and Y must be the same")
P = X @ Y.T
# should this be before or after the rescaling, could give diff answers
if not loops:
P = P - np.diag(np.diag(P))
if rescale:
if P.min() < 0:
P = P - P.min()
if P.max() > 1:
P = P / P.max()
else:
P[P < 0] = 0
P[P > 1] = 1
return P
```