Source code for composer.algorithms.factorize.factorize_core

# Copyright 2021 MosaicML. All Rights Reserved.

import dataclasses
from typing import Optional, Tuple, Union

import numpy as np
import torch
import torch.nn.functional as F


[docs]@dataclasses.dataclass class LowRankSolution: """Bundles tensors used by a factorized linear operator. The factorization always splits the operator into two smaller linear operators. The first takes in input of the original shape and embeds it in a lower-dimensional space. The second maps this lower-dimensional space to the original output space. Args: Wa: First linear operation in the factorized approximation. For a factorized linear operation, ``Wa`` is a matrix. For a factorized convolution, ``Wa`` matches the shape of the convolution's original weight parameter, except along the channel axis. Wb: Second linear operation in the factorized approximation. Shape is such that composing ``Wb`` with ``Wb`` yields an output of the same size as the original operation. bias: vector added to the output of the second linear operation rank: output dimensionality (channels or features) of the first linear operation, and input dimensionality of the second input operation. nmse: normalized mean squared error obtained during the optimization procedure used to derive ``Wa``, ``Wb``, and ``bias``. This is equal to the raw mean squared error between the factorized approximation's output and the original output, divided by the variance of the original output. A value of 0 means no error was introduced, and a value of 1 corresponds to capturing the output no better than chance. """ Wa: Optional[torch.Tensor] = None Wb: Optional[torch.Tensor] = None bias: Optional[torch.Tensor] = None rank: int = -1 nmse: float = 0
def _lstsq(A: torch.Tensor, B: torch.Tensor) -> torch.Tensor: if A.shape[0] != B.shape[0]: raise RuntimeError(f"A has different number of rows than B! A.shape = {A.shape}, B.shape = {B.shape}") if A.ndim != 2: raise RuntimeError("A is not a rank 2 tensor: has shape", A.shape) if B.ndim != 2: raise RuntimeError("B is not a rank 2 tensor: has shape", A.shape) # TODO more intelligence regarding choice of lstsq `driver` arg return torch.linalg.lstsq(A, B).solution def _nmse(Y: torch.Tensor, Y_hat: torch.Tensor) -> float: diffs = Y.detach() - Y_hat.detach() return float((diffs * diffs).mean() / Y.var()) def _svd_initialize(Wa: torch.Tensor, Wb: Optional[torch.Tensor], k: int) -> Tuple[torch.Tensor, torch.Tensor]: if Wb is None: W = Wa else: W = Wa @ Wb # TODO rank k randomized svd if k small enough U, s, Vt = torch.linalg.svd(W, full_matrices=False) Wa = U[:, :k] Wb = Vt[:k] # scale matrices equally for numerical "load-balancing" s_sqrt = torch.sqrt(s[:k]) # s is already a vector, not mat Wa *= s_sqrt Wb *= s_sqrt.reshape(-1, 1) return Wa, Wb
[docs]def factorize_matrix(X: torch.Tensor, Y: torch.Tensor, Wa: torch.Tensor, Wb: Optional[torch.Tensor] = None, bias: Optional[torch.Tensor] = None, rank: Union[int, float] = .25, n_iters: int = 3) -> LowRankSolution: """Approximates a matrix by factorizing it into a product of two smaller matrices. Given a matrix ``W`` of shape ``[D, M]``, a bias vector of length ``M``, and a target rank ``rank < D``, returns a solution ``(Wa, Wb, new_bias)`` of tensors of shapes ``[N, rank]``, ``[rank, D]``, and ``M``, respectively. These tensors are chosen so as to minimize: :math:`||` ``Y - (X @ Wa @ Wb + new_bias)`` :math:`||_F`, where ``Y = X @ W + bias``, ``@`` denotes matrix multiplication, ``new_bias`` broadcasts along the row dimension, and :math:`||\\cdot||_F` denotes the sum of squared elements. In the case that rows of ``X`` correspond to samples from some distribution, this amounts to minimizing the expected mean squared error in the output. The input matrix can either be a single matrix ``W`` or a pair of matrices ``(Wa, Wb)``. The latter case corresponds to using a matrix ``W = Wa @ Wb`` that has already been factorized, and is supported in order to facilitate progressively decreasing the rank of matrix. Args: X: input used to evaluate the quality of the approximation. Shape is ``[N, D]``, where ``N`` is often the number of input samples and ``D`` is the dimensionality of each sample. Y: output of applying the original matrix to ``X``. Must have shape ``[N, M]`` for some ``M``. Wa: either the matrix to be factorized, or the first of the two smaller matrices in the already-factorized representation of this matrix. Must be of shape ``[D, M]`` in the former case and shape ``[D, d]`` in the latter, for some ``d < D``. Wb: if present, ``Wa`` is interpreted as the first of two smaller matrices, and ``Wb`` is taken to be the second. Must be of shape ``[d, M]``. bias: a vector added to the output after performing the matrix product with X rank: number of columns in the latent representation of X n_iters: number of iterations used in the optimization process. Higher numbers yield lower mean squared error, though there are usually diminishing returns after a handful of iterations. Returns: solution: a :class:`~LowRankSolution` of rank ``rank`` that approximates the original matrix. """ X = X.detach() Y = Y.detach() Wa = Wa.detach() Wb = Wb.detach() if Wb is not None else None if rank < 1: # fraction of input dimensionality (or current rank, if smaller) rank = min(int(rank * X.shape[1]), Wa.shape[1]) k = int(rank) solution = LowRankSolution() original_bias = None if bias is not None: original_bias = bias.detach() Y = Y - original_bias solution.bias = original_bias # if requested latent rank is greater than or equal to either # input rank or output rank, factorization is counterproductive, so # return a single matrix if k >= X.shape[1] or k >= Y.shape[1]: Wa = _lstsq(X, Y) solution.Wa = Wa solution.rank = -1 return solution # if requested latent rank is greater than current latent rank, # skip the factorization if k >= Wa.shape[1]: solution.Wa = Wa solution.Wb = Wb solution.rank = -1 return solution Wa, Wb = _svd_initialize(Wa, Wb, k) Ya = _lstsq(X, Y) for _ in range(n_iters): # update Wb Xb = X @ Wa Yb = Y Wb = _lstsq(Xb, Yb) # update Wa # We need to solve (AXB = Y) <=> (AX = B.I @ Y) not (AX = BY). # Since X and Y are constants, we can precompute pinv(A) @ Y. # We then have: # pinv(A) @ A @ X @ B = pinv(A) @ Y # (A.T@A).I @ A.T @ A @ X @ B = pinv(A) @ Y # X @ B = pinv(A) @ Y # Y.T @ pinv(A).T = B.T @ X.T # then we just solve for X.T: # B.T @ X.T = Y.T @ pinv(A).T # also, note that pinv(A) @ Y = lstsq(A, Y); this makes sense; # means that targets for XB are the optimal coeffs mapping A to Y # also, observe that AXB = Y is using X and Y as variable to solve # for and targets, not the X and Y vars we have in this function Xa = Wb Wa_T = _lstsq(Xa.T, Ya.T) Wa = Wa_T.T solution.Wa = Wa solution.Wb = Wb solution.rank = k Y_hat = (X @ Wa) @ Wb bias = (Y - Y_hat).mean(dim=0) if original_bias is not None: bias += original_bias solution.bias = bias Y_hat += bias solution.nmse = _nmse(Y, Y_hat) return solution
def _activations_conv2d_to_mat(activations, kernel_size, padding=0, padding_mode='zeros', stride=1, dilation=1, groups=1): if np.max(stride) > 1: raise NotImplementedError(f"Stride != 1 not implemented; got {stride}") if np.max(dilation) > 1: raise NotImplementedError(f"Dilation != 1 not implemented; got {dilation}") if groups != 1: raise NotImplementedError(f"Groups != 1 not implemented; got {groups}") if np.max(padding) > 0 and padding_mode.lower() != 'zeros': if not isinstance(padding, list): padding = [padding] activations = F.pad(activations, pad=padding, mode=padding_mode) padding = 0 # always default to stride=1 to maximize amount of data we get here # TODO downsample in batch size dim or use stride > 1 if it looks like # materializing full matrix will OOM ret = F.unfold(activations, kernel_size=kernel_size, padding=padding) ret = ret.transpose(1, 2) # batch_sz, n_positions, fan_in return ret.reshape(-1, ret.shape[2]) # batch_sz * n_positions, fan_in def _weights_conv2d_to_mat(weights: torch.Tensor): return weights.reshape(weights.shape[0], -1).T # fan_in, out_channels def _mat_to_weights_conv2d(mat: Optional[torch.Tensor], kernel_size) -> Optional[torch.Tensor]: if mat is None: return None w = mat.T # fan_in, out_channels -> out_channels, fan_in # XXX(nchw) This might silently do the wrong thing with nhwc layout return w.reshape(w.shape[0], -1, *kernel_size)
[docs]def factorize_conv2d(X, Wa: torch.Tensor, Wb: Optional[torch.Tensor] = None, rank: Union[int, float] = .25, biasA: Optional[torch.Tensor] = None, biasB: Optional[torch.Tensor] = None, n_iters=3, **conv2d_kwargs) -> LowRankSolution: """Approximates a KxK convolution by factorizing it into a KxK convolution with fewer channels followed by a 1x1 convolution. Given a convolutional weight tensor ``W`` for a 2d convolution of shape ``[out_channels, in_channels, k_h, k_w]`` and a vector ``bias`` of length ``out_channels``, returns a triple ``(Wa, Wb, new_bias)`` of tensors with shapes ``[rank, in_channels, k_h, k_w]``, ``[out_channels, rank, 1, 1]``, and ``[out_channels]``, respectively. ``Wa``, ``Wb``, and ``new_bias`` are chosen so as to minimize: :math:`||` ``(W * X + bias) - (Wb * (Wa * X) + new_bias)`` :math:`||_F`, where :math:`*` denotes convolution, ``bias`` broadcasts along all non-channel dimensions, and :math:`||\\cdot||_F` denotes the sum of squared elements. Similar to :func:`~factorize_matrix`, this function allows passing in an already-factorized weight tensor in order to enable progressive factorization. In this case, the single tensor ``W`` is replaced with a similar ``(Wa, Wb)`` pair as the output, though not necessarily with the same rank. Args: X: a tensor of shape ``[N, in_channels, H, W]``, for some ``N``, ``H``, and ``W``. Wa: The first weight tensor to convolve with ``X``. If ``Wb`` is not provided, must be of shape ``[out_channels, in_channels, k_h, k_w]``. Otherwise, must be of shape ``[original_rank, in_channels, k_h, k_w]`` for some ``original_rank < min(in_channels, out_channels)``. Wb: The second weight tensor to convolve with the input. If provided, must be of shape ``[out_channels, original_rank, 1, 1]``. rank: number of channels in the latent representation of ``X`` biasA: optional vector of biases. If ``Wb`` is ``None``, must have length ``out_channels``. Otherwise must have length ``original_rank``. biasB: if provided, must have length ``out_channels``. n_iters: number of iterations used in the optimization process. Higher numbers yield lower mean squared error, though there are usually diminishing returns after a handful of iterations. **conv2d_kwargs: arguments such as ``padding``, ``stride``, ``dilation``, ``groups``, etc used in the original convolution. If these are not provided, the factorized tensors might not preserve the function computed by the original weight tensor as well. Note that not all combinations of arguments are supported. Returns: solution: A :class:`~composer.algorithms.factorize.LowRankSolution` of rank ``rank`` that approximates the original convolution operation Raises: RuntimeError: If ``biasB`` is provided but not ``Wb`` is not. NotImplementedError: if ``conv2d_kwargs['dilation'] != 1`` or ``conv2d_kwargs['groups'] != 1``. """ X = X.detach() Wa = Wa.detach() kernel_size = Wa.shape[2:] X_mat = _activations_conv2d_to_mat(X, kernel_size=kernel_size, **conv2d_kwargs) Wa = _weights_conv2d_to_mat(Wa) # NOTE: we compute outputs ourselves, instead of having an arg for them, # since 1) we ignore input stride, and 2) any other intermediate ops # or other discrepancies between user's actual settings and args they pass # would either cause errors or silently mess up the regression Y_mat = (X_mat @ Wa) if biasA is not None: biasA = biasA.detach() Y_mat += biasA if Wb is not None: Wb = Wb.detach() Wb = _weights_conv2d_to_mat(Wb) Y_mat = Y_mat @ Wb if biasB is not None: biasB = biasB.detach() Y_mat += biasB elif biasB is not None: # fail fast if user passes in inconsistent combination of args raise RuntimeError("Got biasB, but Wb=None; cannot apply bias") ret = factorize_matrix(X_mat, Y_mat, Wa, Wb, rank=rank, n_iters=n_iters) # now we need to convert from two matrices to one kxk conv kernel and one # 1x1 conv kernel. Here's why the 2nd has to be a 1x1: if it were instead # k'xk' for some k' > 1, we would either be doing k'^2 as much work # for fixed embedding size at each pixel, or we'd be need to have the # intermediate embeddings be 1/k'^2 as large. In the latter case, we'd # lose a lot of representational capacity. Also, the first op has to match # the kernel size of the original conv or the shapes don't work out. assert ret.Wa is not None ret.Wa = _mat_to_weights_conv2d(ret.Wa, kernel_size=kernel_size) ret.Wb = _mat_to_weights_conv2d(ret.Wb, kernel_size=(1, 1)) return ret