Source code for tsseg.algorithms.hidalgo.detector

"""Hidalgo (Heterogeneous Intrinsic Dimensionality Algorithm) Segmentation."""

__maintainer__ = []
__all__ = ["HidalgoDetector"]


from functools import reduce

import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.utils.validation import check_random_state

from ..base import BaseSegmenter
from ..param_schema import (
    Closed,
    HasType,
    Interval,
    ParamDef,
)


[docs] class HidalgoDetector(BaseSegmenter): """Heteregeneous Intrinsic Dimensionality Algorithm (Hidalgo) model. Hidalgo is a robust approach in discriminating regions with different local intrinsic dimensionality (topological feature measuring complexity). Hidalgo offers unsupervised segmentation of high-dimensional data. Parameters ---------- metric : str, or callable, optional, default="euclidean" directly passed to sklearn KNearestNeighbors, must be str or callable that can be passed to KNearestNeighbors distance used in the nearest neighbors part of the algorithm K_states : int, optional, default=2 number of manifolds used in algorithm zeta : float, optional, default=0.8 "local homogeneity level" used in the algorithm, see equation (4) q : int, optional, default=3 number of points for local Z interaction, "local homogeneity range" see equation (4) n_iter : int, optional, default=1000 number of Gibbs sampling iterations n_replicas : int, optional, default=1 number of random starts to run Gibbs sampling burn_in : float, optional, default=0.9 percentage of Gibbs sampling iterations discarded, "burn-in fraction" fixed_Z : bool, optional, default=False estimate parameters with fixed z (joint posterior approximation via Gibbs) z = (z_1, ..., z_K) is a latent variable introduced, where z_i = k indicates point i belongs to manifold K use_Potts : bool, optional, default=True if using local interaction between z, see equation (4) estimate_zeta : bool, optional, default=False update zeta in the sampling sampling_rate: int, optional, default=10 rate at which to save samples for each n_iter a : np.ArrayLike, optional, default=None prior parameters of d, the dimensionality of manifold k b : np.ArrayLike, optional, default=None prior parameters of d, the dimensionality of manifold k c : np.ArrayLike, optional, default=None prior parameters of p, the probability that point belongs to manifold k f : np.ArrayLike, optional, default=None parameters of zeta seed : int, optional, default = 0 generate random numbers with seed References ---------- Allegra, Michele, et al. "Data segmentation based on the local intrinsic dimension." Scientific reports 10.1 (2020): 1-12. https://www.nature.com/articles/s41598-020-72222-0 Examples -------- >>> from aeon.segmentation import HidalgoSegmenter >>> import numpy as np >>> np.random.seed(123) >>> X = np.random.rand(10,3) >>> X[:6, 1:] += 10 >>> X[6:, 1:] = 0 >>> model = HidalgoSegmenter(K_states=2, burn_in=0.8, n_iter=100, seed=10) >>> seg = model.fit_predict(X, axis=0) >>> seg.tolist() [1, 1, 1, 1, 1, 1, 0, 0, 0, 0] """ _tags = { "capability:univariate": False, "capability:multivariate": True, "fit_is_empty": False, "returns_dense": False, "detector_type": "state_detection", "capability:unsupervised": False, "capability:semi_supervised": True, } _parameter_schema = { "metric": ParamDef( constraint=None, description="Distance metric (str or callable for sklearn NearestNeighbors).", ), "K_states": ParamDef( constraint=Interval(int, 1, None, Closed.LEFT), description="Number of manifolds / states.", ), "zeta": ParamDef( constraint=Interval(float, 0, 1, Closed.NEITHER), description="Local homogeneity level, in (0, 1).", ), "q": ParamDef( constraint=Interval(int, 1, None, Closed.LEFT), description="Number of points for local Z interaction.", ), "n_iter": ParamDef( constraint=Interval(int, 1, None, Closed.LEFT), description="Number of Gibbs sampling iterations.", ), "n_replicas": ParamDef( constraint=Interval(int, 1, None, Closed.LEFT), description="Number of random restarts for Gibbs sampling.", ), "burn_in": ParamDef( constraint=Interval(float, 0, 1, Closed.LEFT), description="Fraction of iterations discarded as burn-in, in [0, 1).", ), "fixed_Z": ParamDef( constraint=HasType((bool,)), description="Estimate params with fixed Z.", ), "use_Potts": ParamDef( constraint=HasType((bool,)), description="Use local Potts interaction between Z.", ), "estimate_zeta": ParamDef( constraint=HasType((bool,)), description="Update zeta during sampling.", ), "sampling_rate": ParamDef( constraint=Interval(int, 1, None, Closed.LEFT), description="Rate at which to save samples.", ), "a": ParamDef( constraint=None, description="Prior parameter for d (array or None).", nullable=True, ui_hidden=True, group="priors", ), "b": ParamDef( constraint=None, description="Prior parameter for d (array or None).", nullable=True, ui_hidden=True, group="priors", ), "c": ParamDef( constraint=None, description="Prior parameter for p (array or None).", nullable=True, ui_hidden=True, group="priors", ), "f": ParamDef( constraint=None, description="Parameter for zeta (array or None).", nullable=True, ui_hidden=True, group="priors", ), "seed": ParamDef( constraint=Interval(int, 0, None, Closed.LEFT), description="Random seed.", ), } def __init__( self, metric="euclidean", K_states=1, zeta=0.8, q=3, n_iter=1000, n_replicas=1, burn_in=0.9, fixed_Z=False, use_Potts=True, estimate_zeta=False, sampling_rate=10, a=None, b=None, c=None, f=None, seed=0, ): self.metric = metric self.K_states = K_states self.zeta = zeta self.q = q self.n_iter = n_iter self.burn_in = burn_in self.n_replicas = n_replicas self.fixed_Z = fixed_Z self.use_Potts = use_Potts self.estimate_zeta = estimate_zeta self.sampling_rate = sampling_rate self.a = a self.b = b self.c = c self.f = f self.seed = seed super().__init__(axis=0) def _get_neighbourhood_params(self, X): """ Neighbourhood information from input data X. Parameters ---------- X : np.ndarray 2D array of shape (n_timepoints, n_channels), where dim > 1 data to fit the algorithm to Returns ------- m : int Number of rows (timepoints) of X. mu : np.ndarray 1D np.ndarray of length m. parameter in Pereto distribution estimated by ``r2/r1`` Iin : 1D np.ndarray of length m * q encodes the q neighbour index values for point index i in 0:m-1 e.g. popint i=0 has neighbours 2, 4, 7 and point i=1 has neighbours 3, 9, 4 Iin = np.array([2, 4, 7, 3, 9, 4,...]) Iout : 1D np.ndarray of length m * q array of indices for which i is neighbour for i in 0:m-1 e.g. point i=0 is also neighbour of points 2, 4 and point i=1 is a neighbour of point 3 only Iout = np.array([2, 4, 3,...]) Iout_count : 1D np.ndarray of length m count of how many neighbours point i has for i in 0:m-1 e.g. Iout_count = np.array([2, 1, 1,...]) Iout_track : 1D np.ndarray of length m cumulative sum of Iout_count at i-1 for i in 1:m-1 e.g. Iout_track = np.array([0, 2, 3, 4,...]) """ q = self.q metric = self.metric m, _ = np.shape(X) nbrs = NearestNeighbors( n_neighbors=q + 1, algorithm="ball_tree", metric=metric ).fit(X) distances, Iin = nbrs.kneighbors(X) denom = distances[:, 1] # mu = np.divide(distances[:, 2], distances[:, 1]) # [Original implementation from aeon] # Guard against zero-distance neighbours which can appear on duplicated samples. with np.errstate(divide="ignore", invalid="ignore"): mu = np.divide( distances[:, 2], denom, out=np.ones_like(denom), where=denom > 0, ) nbrmat = np.zeros((m, m)) for n in range(q): nbrmat[Iin[:, 0], Iin[:, n + 1]] = 1 Iout_count = np.sum(nbrmat, axis=0).astype(int) Iout = np.where(nbrmat.T)[1].astype(int) Iout_track = np.cumsum(Iout_count) Iout_track = np.append(0, Iout_track[:-1]).astype(int) Iin = Iin[:, 1:] Iin = np.reshape(Iin, (m * q,)).astype(int) return m, mu, Iin, Iout, Iout_count, Iout_track def _update_zeta_prior(self, Z, N, Iin): """Update prior parameters for zeta.""" q = self.q f = self.f if f is None: f = np.ones(2) N_in = sum([Z[Iin[q * i + j]] == Z[i] for j in range(q) for i in range(N)]) f1 = np.empty(shape=2) f1[0] = f[0] + N_in f1[1] = f[1] + N * q - N_in return N_in, f1 def _initialise_params(self, N, mu, Iin, _rng): """ Initialise parameters used in algorithm. Outputs ---------- V : 1D np.ndarray of length K sum(log(mu_i)) for k in 0:K-1, when mu_i belongs to manifold k NN : 1D np.ndarray of length K count for k in 0:K-1, when data at index i belongs to manifold k a1 : 1D np.ndarray of length K prior parameters of d b1 : 1D np.ndarray of length K prior parameters of d c1 : 1D np.ndarray of length K prior parameters of p Z : 1D np.ndarray of length N segmentation based on manifold k f1 : 1D np.ndarray of length 2 parameters of zeta N_in : int parameters of zeta """ K = self.K_states a = self.a b = self.b c = self.c fixed_Z = self.fixed_Z if a is None: a = np.ones(K) if b is None: b = np.ones(K) if c is None: c = np.ones(K) if not fixed_Z: random_z = _rng.randint(0, K, N) Z = np.array(random_z, dtype=int) else: Z = np.zeros(N, dtype=int) V = [sum(np.log(mu[[Z[i] == k for i in range(N)]])) for k in range(K)] NN = [sum([Z[i] == k for i in range(N)]) for k in range(K)] a1 = a + NN b1 = b + V c1 = c + NN N_in, f1 = self._update_zeta_prior(Z, N, Iin) return (V, NN, a1, b1, c1, Z, f1, N_in) def _gibbs_sampling( self, N, mu, Iin, Iout, Iout_count, Iout_track, V, NN, a1, b1, c1, Z, f1, N_in, _rng, ): """ Gibbs sampling method to find joint posterior distribution of target variables. Notes ----- Target parameters are d, p, Z zeta must be computed for the probability distribution of the q-Neighbourhood matrix Parameters ---------- V : 1D np.ndarray of length K sum(log(mu_i)) for k in 0:K-1, when mu_i belongs to manifold k NN : 1D np.ndarray of length K count for k in 0:K-1, when data at index i belongs to manifold k a1 : 1D np.ndarray of length K prior parameters of d b1 : 1D np.ndarray of length K prior parameters of d c1 : 1D np.ndarray of length K prior parameters of p Z : 1D np.ndarray of length N segmentation based on manifold k f1 : 1D np.ndarray of length 2 parameters of zeta N_in : int parameters of zeta Returns ------- sampling : 2D np.ndarray of shape (n_iter, Npar), where Npar = N + 2 * K + 2 + 1 posterior samples of d, p, Z and likelihood samples, respectively. """ zeta = self.zeta q = self.q K = self.K_states n_iter = self.n_iter fixed_Z = self.fixed_Z use_Potts = self.use_Potts estimate_zeta = self.estimate_zeta sampling = np.empty(shape=0) pp = (K - 1) / K p = np.ones(shape=K) / K def sample_d(K, a1, b1, _rng): """Sample d, the dimension of manifold k.""" d = np.empty(shape=K) for k in range(K): stop = False while stop is False: r1 = _rng.random() * 200 # random sample for d[k] r2 = _rng.random() # random number for accepting rmax = (a1[k] - 1) / b1[k] if a1[k] - 1 > 0: assert rmax > 0 frac = np.exp( -b1[k] * (r1 - rmax) - (a1[k] - 1) * (np.log(rmax) - np.log(r1)) ) else: frac = np.exp(-b1[k] * r1) if frac > r2: stop = True d[k] = r1 return d def sample_p(K, p, pp, c1, _rng): """Sample p, the prior probability that a point belongs to manifold k.""" for k in range(K - 1): stop = False while stop is False: r1 = _rng.random() # random sample for p[k] r2 = _rng.random() # random number for accepting # --- FIX START: Use log-domain calculation to prevent overflow --- exp1 = c1[k] - 1 exp2 = c1[K - 1] - 1 if exp1 + exp2 <= 1e-9: # Effectively 0 stop = True else: rmax = exp1 / (exp1 + exp2) log_frac = 0.0 is_valid = True # Calculate log of first term: (r1 / rmax) ** exp1 if exp1 > 0: if r1 <= 0: is_valid = False else: # We assume rmax > 0 because exp1 > 0 and sum > 0 log_frac += exp1 * (np.log(r1) - np.log(rmax)) # Calculate log of second term: ((1 - r1) / (1 - rmax)) ** exp2 if is_valid and exp2 > 0: if r1 >= 1.0: is_valid = False # Should be rare with random() < 1.0 else: # We assume 1-rmax > 0 because if exp2 > 0, rmax < 1 log_frac += exp2 * (np.log(1 - r1) - np.log(1 - rmax)) if is_valid: # Compare log(frac) > log(r2) # r2 is in [0, 1). If r2=0, log(r2) is -inf, condition holds if r2 <= 0: stop = True elif log_frac > np.log(r2): stop = True # --- FIX END --- if stop: r1 = r1 * (1.0 - pp + p[k]) p[K - 1] += p[k] - r1 pp -= p[k] - r1 p[k] = r1 return (p, pp) def sample_zeta(N, K, zeta, use_Potts, estimate_zeta, q, NN, f1, it, _rng): """Sample zeta, sample probability neighbour of point from same manifold.""" stop = False maxval = -100000 if use_Potts and estimate_zeta: for zeta_candidates in range(10): zeta1 = 0.5 + 0.05 * zeta_candidates ZZ = [_partition_function(N, NN[k], zeta1, q) for k in range(K)] h = [NN[k] * np.log(ZZ[k]) for k in range(K)] val = ( (f1[0] - 1) * np.log(zeta1) + (f1[1] - 1) * np.log(1 - zeta1) - h ) if val > maxval: maxval = val while stop is False: r1 = _rng.random() # random sample for zeta r2 = _rng.random() # random number for accepting ZZ = [_partition_function(N, NN[k], r1, q) for k in range(K)] h = [NN[k] * np.log(ZZ[k]) for k in range(K)] val = (f1[0] - 1) * np.log(r1) + (f1[1] - 1) * np.log(1 - r1) - h frac = np.exp(val - maxval) if frac > r2: stop = True if it > 0: zeta = r1 return zeta def sample_Z( N, mu, Iin, Iout, Iout_count, Iout_track, Z, NN, a1, c1, V, b1, zeta, fixed_Z, q, _rng, ): """Sample z, latent variable indicating point i belongs to manifold k.""" if (abs(zeta - 1) < 1e-5) or fixed_Z: return Z, NN, a1, c1, V, b1 for i in range(N): stop = False gg = np.empty(shape=K) gmax = 0 for k1 in range(K): g = 0 if use_Potts: n_in = sum([Z[Iin[q * i + j]] == k1 for j in range(q)]) m_in = sum( [ Iout[Iout_track[i] + j] > -1 and Z[Iout[Iout_track[i] + j]] == k1 for j in range(Iout_count[i]) ] ) g = (n_in + m_in) * np.log(zeta / (1 - zeta)) - np.log( _partition_function(N, NN[k1], zeta, q) ) var = _partition_function( N, NN[k1] - 1, zeta, q ) / _partition_function(N, NN[k1], zeta, q) assert var > 0 g = g + np.log(var) * (NN[k1] - 1) if g > gmax: gmax = g gg[k1] = g gg = [np.exp(gg[k1] - gmax) for k1 in range(K)] prob = p * d * mu[i] ** (-(d + 1)) * gg prob /= prob.sum() while stop is False: r1 = int(np.floor(_rng.random() * K)) # random sample for Z r2 = _rng.random() # random number for accepting if prob[r1] > r2: stop = True # minus values NN[Z[i]] -= 1 a1[Z[i]] -= 1 c1[Z[i]] -= 1 V[Z[i]] -= np.log(mu[i]) b1[Z[i]] -= np.log(mu[i]) # change Z, add values Z[i] = r1 NN[Z[i]] += 1 a1[Z[i]] += 1 c1[Z[i]] += 1 V[Z[i]] += np.log(mu[i]) b1[Z[i]] += np.log(mu[i]) return Z, NN, a1, c1, V, b1 def sample_likelihood(N, mu, p, d, Z, N_in, zeta, NN): """Sample likelihood values of mu, and local inhomogeneity penalisation.""" lik0 = 0 for i in range(N): lik0 = ( lik0 + np.log(p[Z[i]]) + np.log(d[Z[i]]) - (d[Z[i]] + 1) * np.log(mu[i]) ) lik1 = lik0 + np.log(zeta / (1 - zeta)) * N_in for k1 in range(K): lik1 = lik1 - (NN[k1] * np.log(_partition_function(N, NN[k1], zeta, q))) return lik0, lik1 for it in range(n_iter): d = sample_d(K, a1, b1, _rng) sampling = np.append(sampling, d) (p, pp) = sample_p(K, p, pp, c1, _rng) sampling = np.append(sampling, p[: K - 1]) sampling = np.append(sampling, (1 - pp)) zeta = sample_zeta( N, K, zeta, use_Potts, estimate_zeta, q, NN, f1, it, _rng ) sampling = np.append(sampling, zeta) Z, NN, a1, c1, V, b1 = sample_Z( N, mu, Iin, Iout, Iout_count, Iout_track, Z, NN, a1, c1, V, b1, zeta, fixed_Z, q, _rng, ) sampling = np.append(sampling, Z) N_in, f1 = self._update_zeta_prior(Z, N, Iin) likelihood = sample_likelihood(N, mu, p, d, Z, N_in, zeta, NN) sampling = np.append(sampling, likelihood) return sampling def _fit(self, X, y=None): """Run the Hidalgo algorithm. Find parameter estimates as distributions in sampling. Iterate through n_replicas random starts and get posterior samples with best max likelihood. Notes ----- Writes to self _d : 1D np.ndarray of length K posterior mean of d, from posterior sample in gibbs_sampling _derr : 1D np.ndarray of length K posterior std of d, from posterior sample in gibbs_sampling _p : 1D np.ndarray of length K posterior mean of p, from posterior sample in gibbs_sampling _perr : 1D np.ndarray of length K posterior std of p, from posterior sample in gibbs_sampling _lik : float mean of likelihood, from sample in gibbs_sampling _likerr : float std of likelihood, from sample in gibbs_sampling _Pi : 2D np.ndarray of shape (K, N) probability of posterior of z_i = k, point i can be safely assigned to manifold k if Pi > 0.8 _Z : 1D np.ndarray of length N base-zero integer values corresponding to segment (manifold k) Parameters ---------- X : 2D np.ndarray of shape (N, dim), where dim > 1 data to fit the algorithm to Returns ------- self """ K = self.K_states n_replicas = self.n_replicas n_iter = self.n_iter sampling_rate = self.sampling_rate burn_in = self.burn_in seed = self.seed _rng = check_random_state(seed) N, mu, Iin, Iout, Iout_count, Iout_track = self._get_neighbourhood_params(X) V, NN, a1, b1, c1, Z, f1, N_in = self._initialise_params(N, mu, Iin, _rng) Npar = N + 2 * K + 2 + 1 bestsampling = None maxlik = -1e10 for _ in range(n_replicas): sampling = self._gibbs_sampling( N, mu, Iin, Iout, Iout_count, Iout_track, V, NN, a1, b1, c1, Z, f1, N_in, _rng, ) sampling = np.reshape(sampling, (n_iter, Npar)) idx = [ it for it in range(n_iter) if it % sampling_rate == 0 and it >= n_iter * burn_in ] if len(idx) == 0: continue sampling = sampling[idx,] likelihood = np.mean(sampling[:, -1], axis=0) if likelihood > maxlik: bestsampling = sampling maxlik = likelihood if bestsampling is None: raise ValueError( "No valid samples after burn-in and sampling_rate filtering. " f"Try reducing burn_in ({burn_in}) or " f"sampling_rate ({sampling_rate})." ) self._d = np.mean(bestsampling[:, :K], axis=0) self._derr = np.std(bestsampling[:, :K], axis=0) self._p = np.mean(bestsampling[:, K : 2 * K], axis=0) self._perr = np.std(bestsampling[:, K : 2 * K], axis=0) self._lik = np.mean(bestsampling[:, -1], axis=0) self._likerr = np.std(bestsampling[:, -1], axis=0) Pi = np.zeros((K, N)) for k in range(K): Pi[k, :] = np.sum(bestsampling[:, (2 * K) + 1 : 2 * K + N + 1] == k, axis=0) Pi = Pi / bestsampling.shape[0] self._Pi = Pi Z = np.argmax(Pi, axis=0) pZ = np.max(Pi, axis=0) Z[np.where(pZ < 0.8)] = -1 self._Z = Z return self def _predict(self, X, y=None): return self._Z @classmethod def _get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return `"default"` set. Reserved values for classifiers: "results_comparison" - used for identity testing in some classifiers should contain parameter settings comparable to "TSC bakeoff" Returns ------- params : dict or list of dict, default = {} Parameters to create testing instances of the class Each dict are parameters to construct an "interesting" test instance, i.e., `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. """ return { "metric": "euclidean", "K_states": 1, "zeta": 0.8, "q": 3, "n_iter": 10, "n_replicas": 1, "burn_in": 0.5, "fixed_Z": False, "use_Potts": True, "estimate_zeta": False, "sampling_rate": 2, "a": None, "b": None, "c": None, "f": None, "seed": 1, }
def _binom(N: int | float, q: int | float): """Calculate the binomial coefficient. Parameters ---------- N : int, float number of fixed elements from which q is chosen q : int, float number of subset q elements chosen from N """ if q == 0: return 1.0 if N < 0: return 0.0 return reduce(lambda x, y: x * y, [(N - q1) / (q1 + 1) for q1 in range(q)]) def _partition_function(N, N1, zeta, q): """Partition function for Z. Parameters ---------- N : int, float number of rows of input data X N1 : int, float parameter value from NN[k] for k=0:K-1 zeta : float parameter value zeta q : int, float parameter value q """ return sum( [ _binom(N1 - 1, q1) * _binom(N - N1, q - q1) * zeta ** (q1) * (1 - zeta) ** (q - q1) for q1 in range(q + 1) ] )