Source code for graspologic.align.seedless_procrustes

# Copyright (c) Microsoft Corporation and contributors.
# Licensed under the MIT License.

import ot
import numpy as np
from sklearn.utils import check_array

from .base import BaseAlign
from .sign_flips import SignFlips
from .orthogonal_procrustes import OrthogonalProcrustes


[docs]class SeedlessProcrustes(BaseAlign): """ Matches two datasets using an orthogonal matrix. Unlike :class:`~graspologic.align.OrthogonalProcrustes`, this does not require a matching between entries. It can even be used in the settings where the two datasets do not have the same number of entries. In graph setting, it is used to align the embeddings of two different graphs, when it requires some simultaneous inference task and no 1-1 matching between the vertices of the two graphs can be established, for example, inside of the test for the equivalence of the latent distributions (see: :class:`~graspologic.inference.LatentDistributionTest`). Parameters ---------- optimal_transport_lambda : float (default=0.1), optional Regularization term of the Sinkhorn optimal transport algorithm. optimal_transport_eps : float (default=0.01), optional Tolerance parameter for the each Sinkhorn optimal transport algorithm. I.e. tolerance for each "E-step". optimal_transport_num_reps : int (default=1000), optional Number of repetitions in each iteration of the iterative optimal transport problem. I.e. maximum number of repetitions in each "E-step". iterative_num_reps : int (default=100), optional Number of reps in each iteration of the iterative optimal transport problem. I.e. maxumum number of total iterations the whole "EM" algorithm. init : string, {'2d' (default), 'sign_flips', 'custom'}, optional - '2d' Uses :math:`2^d` different restarts, where :math:`d` is the dimension of the datasets. In particular, tries all matrices that are simultaneously diagonal and orthogonal. In other words, these are diagonal matrices with all entries on the diagonal being either +1 or -1. This is motivated by the fact that spectral graph embeddings have two types of orthogonal non-identifiability, one of which is captured by the orthogonal diagonal matrices. The final result is picked based on the final values of the objective function. For more on this, see [2]_. - 'sign_flips' Initial alignment done by making the median value in each dimension have the same sign. The motivation is similar to that in '2d', except this is a heuristic that can save time, but can sometimes yield suboptimal results. - 'custom' Expects either an initial guess for `self.Q_` or an initial guess for `self.P_`, but not both. See `initial_Q` and `initial_P`, respectively. If neither is provided, initializes `initial_Q` to an identity with an appropriate number of dimensions. initial_Q : np.ndarray, shape (d, d) or None, optional (default=None) An initial guess for the alignment matrix, `self.Q_`, if such exists. Only one of `initial_Q`, `initial_P` can be provided at the same time, and only if `init` argument is set to 'custom'. If None, and `initial_P` is also None - initializes `initial_Q` to identity matrix. Must be an orthogonal matrix, if provided. initial_P : np.ndarray, shape (n, m) or None, optional (default=None) Initial guess for the optimal transport matrix, `self.P_`, if such exists. Only one of `initial_Q`, `initial_P` can be provided at the same time, and only if `init` argument is set to 'custom'. If None, and `initial_Q` is also None - initializes `initial_Q` to identity matrix. Must be a soft assignment matrix if provided (rows sum up to 1/n, cols sum up to 1/m.) Attributes ---------- Q_ : array, size (d, d) Final orthogonal matrix, used to modify `X`. P_ : array, size (n, m) where n and m are the sizes of two datasets Final matrix of optimal transports, represent soft matching weights from points in one dataset to the other, normalized such that all rows sum to 1/n and all columns sum to 1/m. score_ : float Final value of the objective function: :math:`|| X Q - P Y ||_F` Lower means the datasets have been matched together better. selected_initial_Q_ : array, size (d, d) Initial orthogonal matrix which was used as the initialization. If `init` was set to `2d` or `sign_flips`, then it is the adaptively selected matrix. If `init` was set to custom, and `initial_Q` was provided, then equal to that. If it was not provided, but `initial_P` was, then it is the matrix after the first procrustes performed. If neither was provided, then it is the identity matrix. References ---------- .. [1] Agterberg, J. # TODO Cite the Seedless Procrustes preprint whenever available. .. [2] Agterberg, J., Tang, M., Priebe., C. E. (2020). "On Two Distinct Sources of Nonidentifiability in Latent Position Random Graph Models" arXiv:2003.14250 Notes ----- In essence, the goal of this procedure is to simultaneously obtain a, not necessarily 1-to-1, correspondence between the vertices of the two data sets, and an orthogonal alignment between two datasets. If the two datasets are represented with matrices :math:`X \in M_{n, d}` and :math:`Y \in M_{m, d}`, then the correspondence is a matrix :math:`P \in M_{n, m}` that is soft assignment matrix (that is, its rows sum to :math:`1/n`, and columns sum to :math:`1/m`) and the orthogonal alignment is an orthogonal matrix :math:`Q \in M_{d, d}` (an orthogonal matrix is any matrix that satisfies :math:`Q^T Q = Q Q^T = I`). The global objective function is :math:`|| X Q - P Y ||_F`. Note that both :math:`X` and :math:`PY` are matrices in :math:`M_{n, d}`. Thus, if one knew :math:`P`, it would be simple to obtain an estimate for :math:`Q`, using the regular orthogonal procrustes. On the other hand, if :math:`Q` was known, then :math:`XQ` and :math:`Y` could be thought of distributions over a finite number of masses, each with weight :math:`1/n` or :math:`1/m`, respectively. These distributions could be "matched" via solving an optimal transport problem. However, both :math:`Q` and :math:`P` are simultaneously unknown here. So the algorithm performs a sequence of alternating steps, obtaining iteratively improving estimates of :math:`Q` and :math:`P`, similarly to an expectation-maximization (EM) procedure. It is not known whether this procedure is formally an EM, but the analogy can be drawn as follows: after obtaining an initial guess of of :math:`\hat{Q}_{0}`, obtaining an assignment matrix :math:`\hat{P}_{i+1} | \hat{Q}_{i}` ("E-step") is done by solving an optimal transport problem via Sinkhorn algorithm, whereas obtaining an orthogonal alignment matrix :math:`Q_{i+1} | P_{i}` ("M-step") is done via regular orthogonal procurstes. These alternating steps are performed until `iterative_num_reps` is reached. For more on how the initial guess can be performed, see `init`. """ def __init__( self, optimal_transport_lambda=0.1, optimal_transport_eps=0.01, optimal_transport_num_reps=1000, iterative_num_reps=100, init="2d", initial_Q=None, initial_P=None, ): # check optimal_transport_lambda argument if type(optimal_transport_lambda) is not float: msg = "Optimal_transport_lambda must be a float, not {}".format( type(optimal_transport_lambda) ) raise TypeError(msg) if optimal_transport_lambda < 0: msg = "{} is an invalud value of the optimal_transport_lambda, must be non-negative".format( optimal_transport_lambda ) raise ValueError(msg) # check optimal_transport_lambda argument if type(optimal_transport_eps) is not float: msg = "Optimal_transport_eps must be a float, not {}".format( type(optimal_transport_eps) ) raise TypeError(msg) if optimal_transport_eps <= 0: msg = "{} is an invalid value of the optimal transport eps, must be postitive".format( optimal_transport_eps ) raise ValueError(msg) # check optimal_transport_num_reps argument if type(optimal_transport_num_reps) is not int: msg = "Optimal_transport_num_reps must be a int, not {}".format( type(optimal_transport_num_reps) ) raise TypeError(msg) if optimal_transport_num_reps < 1: msg = "{} is an invalid number of repetitions, must be non-negative".format( iterative_num_reps ) raise ValueError(msg) # check iterative_num_reps argument if type(iterative_num_reps) is not int: msg = "Iterative_num_reps must be a int, not {}".format( type(iterative_num_reps) ) raise TypeError(msg) if iterative_num_reps < 0: msg = "{} is an invalid number of repetitions, must be non-negative".format( iterative_num_reps ) raise ValueError(msg) # check init argument if type(init) is not str: msg = "Init must be a str, not {}".format(type(init)) raise TypeError(msg) inits_supported = ["2d", "sign_flips", "custom"] if init not in inits_supported: msg = "Supported inits are {}".format(inits_supported) raise ValueError(msg) # check that initial_Q and intial_P aren't provided when shouldn't be if initial_Q is not None and init != "custom": msg = "Initial_Q can only be provided if init is set to custom" raise ValueError(msg) if initial_P is not None and init != "custom": msg = "Initial_P can only be provided if init is set to custom" raise ValueError(msg) if initial_Q is not None and initial_P is not None: msg = "Initial_Q and initial_P cannot be provided simultaneously" raise ValueError(msg) # check initial_Q argument if initial_Q is not None: if not isinstance(initial_Q, np.ndarray): msg = f"Initial_Q must be np.ndarray or None, not {type(initial_Q)}" raise TypeError(msg) initial_Q = check_array(initial_Q, copy=True) if initial_Q.shape[0] != initial_Q.shape[1]: msg = "Initial_Q must be a square orthogonal matrix" raise ValueError(msg) if not np.allclose(initial_Q.T @ initial_Q, np.eye(initial_Q.shape[0])): msg = "Initial_Q must be a square orthogonal matrix" raise ValueError(msg) # check initial_P argument if initial_P is not None: if not isinstance(initial_P, np.ndarray): msg = f"Initial_P must be np.ndarray or None, not {type(initial_P)}" raise TypeError(msg) initial_P = check_array(initial_P, copy=True) n, m = initial_P.shape if not ( np.allclose(initial_P.sum(axis=0), np.ones(m) / m) and np.allclose(initial_P.sum(axis=1), np.ones(n) / n) ): msg = ( "Initial_P must be a soft assignment matrix " "(rows add up to (1/number of cols) " "and columns add up to (1/number of rows))" ) raise ValueError(msg) super().__init__() self.optimal_transport_eps = optimal_transport_eps self.optimal_transport_num_reps = optimal_transport_num_reps self.optimal_transport_lambda = optimal_transport_lambda self.iterative_num_reps = iterative_num_reps self.init = init self.initial_Q = initial_Q self.initial_P = initial_P def _optimal_transport(self, X, Y, Q): # "E step" of the SeedlessProcrustes. n, d = X.shape m, _ = Y.shape # initialize probability mass arrays & the cost matrix ; run sinkhorn probability_mass_X = np.ones(n) / n probability_mass_Y = np.ones(m) / m cost_matrix = ( np.linalg.norm((X @ Q).reshape(n, 1, d) - Y.reshape(1, m, d), axis=2) ** 2 ) P = ot.sinkhorn( a=probability_mass_X, b=probability_mass_Y, M=cost_matrix, reg=self.optimal_transport_lambda, numItermax=self.optimal_transport_eps, stopThr=self.optimal_transport_eps, ) return P def _procrustes(self, X, Y, P): # "M step" of the SeedlessProcurstes. aligner = OrthogonalProcrustes() Q = aligner.fit(X, P @ Y).Q_ return Q def _iterative_ot(self, X, Y, Q): # this P is not used. it is set to default in case numreps=0 P = np.ones((X.shape[0], Y.shape[0])) / (X.shape[0] * Y.shape[0]) for i in range(self.iterative_num_reps): P = self._optimal_transport(X, Y, Q) Q = self._procrustes(X, Y, P) return P, Q def _compute_objective(self, X, Y, Q=None, P=None): if Q is None: Q = self.Q_ if P is None: P = self.P_ return np.linalg.norm(X @ Q - P @ Y, ord="fro")
[docs] def fit(self, X, Y): """ Uses the two datasets to learn the matrix `self.Q_` that aligns the first dataset with the second. Parameters ---------- X : np.ndarray, shape (n, d) Dataset to be mapped to `Y`, must have same number of dimensions (axis 1) as `Y`. Y : np.ndarray, shape (m, d) Target dataset, must have same number of dimensions (axis 1) as `X`. Returns ------- self : returns an instance of self """ X, Y = self._check_datasets(X, Y) n, d = X.shape m, _ = Y.shape if self.init == "2d": P_matrices = np.zeros((2 ** d, n, m)) Q_matrices = np.zeros((2 ** d, d, d)) objectives = np.zeros(2 ** d) # try 2^d different initializations for i in range(2 ** d): initial_Q = _sign_flip_matrix_from_int(i, d) P_matrices[i], Q_matrices[i] = P, Q = self._iterative_ot( X, Y, initial_Q ) objectives[i] = self._compute_objective(X, Y, Q, P) # pick the best one, using the objective function value best = np.argmin(objectives) self.selected_initial_Q_ = _sign_flip_matrix_from_int(best, d) self.P_, self.Q_ = P_matrices[best], Q_matrices[best] elif self.init == "sign_flips": aligner = SignFlips() self.selected_initial_Q_ = aligner.fit(X, Y).Q_ self.P_, self.Q_ = self._iterative_ot(X, Y, self.selected_initial_Q_) else: # determine initial Q if "custom" if self.initial_Q is not None: self.selected_initial_Q_ = self.initial_Q elif self.initial_P is not None: # use initial P, if provided self.selected_initial_Q_ = self._procrustes(X, Y, self.initial_P) else: # set to initial Q to identity if neither Q nor P provided self.selected_initial_Q_ = np.eye(d) self.P_, self.Q_ = self._iterative_ot(X, Y, self.selected_initial_Q_) self.score_ = self._compute_objective(X, Y) return self
def _sign_flip_matrix_from_int(val_int, d): val_bin = bin(val_int)[2:] val_bin = "0" * (d - len(val_bin)) + val_bin return np.diag(np.array([(float(i) - 0.5) * -2 for i in val_bin]))