Source code for tsseg.algorithms.igts.detector

"""
Information Gain-based Temporal Segmenter.

Information Gain Temporal Segmentation (_IGTS) is a method for segmenting
multivariate time series based off reducing the entropy in each segment.

The amount of entropy lost by the segmentations made is called the Information
Gain (IG). The aim is to find the segmentations that have the maximum information
gain for any number of segmentations.
"""

from collections.abc import Generator
from dataclasses import dataclass, field

import numpy as np
import numpy.typing as npt
import pandas as pd

from ..base import BaseSegmenter
from ..param_schema import Closed, Interval, ParamDef

__all__ = ["InformationGainDetector"]
__maintainer__ = []


def _augment_univariate(X: npt.ArrayLike) -> np.ndarray:
    """Augment a univariate series with its complement channel.

    Applies the data transformation from Section 4.4 of the IGTS paper:

    1. **Normalize** each channel so that values sum to 1 (Eq. 12).
    2. **Append the complement** ``max(c_i) - c_i`` for every channel,
       doubling the number of columns from *m* to *2m* (Eq. 13).

    For a univariate input this produces a bivariate series where one
    channel rises whenever the other falls, breaking the constant-entropy
    degeneracy that prevents information gain from detecting change points
    in single-channel data.

    Parameters
    ----------
    X : array_like, shape (n_timepoints, n_channels)
        Input time series.

    Returns
    -------
    X_aug : np.ndarray, shape (n_timepoints, 2 * n_channels)
        Augmented time series.

    References
    ----------
    Sadri, Amin, Yongli Ren, and Flora D. Salim.
    "Information gain-based metric for recognizing transitions in
    human activities.", Pervasive and Mobile Computing, 38, 92-109, (2017).
    """
    n_samples, n_channels = X.shape
    X_aug = np.empty((n_samples, 2 * n_channels), dtype=np.float64)

    for i in range(n_channels):
        c = X[:, i].astype(np.float64)
        c_min = c.min()
        c_shifted = c - c_min
        denom = c_shifted.sum()
        if denom == 0.0:
            c_norm = np.zeros_like(c)
        else:
            c_norm = c_shifted / denom
        X_aug[:, i] = c_norm
        X_aug[:, n_channels + i] = c_norm.max() - c_norm

    return X_aug


@dataclass
class ChangePointResult:
    k: int
    score: float
    change_points: list[int]


def entropy(X: npt.ArrayLike) -> float:
    """Shannons's entropy for time series.

    As defined by equations (3) and (4) from [1]_.

    Parameters
    ----------
    X: array_like
        Time series data as a 2D numpy array with sequence index along rows
        and value series in columns.

    Returns
    -------
    entropy: float
        Computed entropy.
    """
    if X.size == 0 or X.shape[0] == 0:
        return 0.0

    column_totals = np.sum(X, axis=0)
    overall_total = float(np.sum(column_totals))

    if overall_total <= 0.0:
        shifted = X - np.min(X, axis=0, keepdims=True)
        column_totals = np.sum(shifted, axis=0)
        overall_total = float(np.sum(column_totals))
        if overall_total <= 0.0:
            return 0.0
        X = shifted

    with np.errstate(divide="ignore", invalid="ignore"):
        p = np.divide(np.sum(X, axis=0), overall_total, where=overall_total != 0.0)
    p = p[p > 0.0]
    if p.size == 0:
        return 0.0
    return -np.sum(p * np.log(p))


def _entropy_from_sums(channel_sums: np.ndarray) -> float:
    """Compute Shannon entropy from precomputed channel sums.

    This is the O(m) version using cumulative sums (Eq. 5-6 of [1]_).

    Parameters
    ----------
    channel_sums : np.ndarray, shape (n_channels,)
        Sum of each channel over the segment.

    Returns
    -------
    h : float
        Shannon entropy of the segment.
    """
    total = channel_sums.sum()
    if total <= 0.0:
        return 0.0
    p = channel_sums / total
    p = p[p > 0.0]
    if p.size == 0:
        return 0.0
    return -float(np.sum(p * np.log(p)))


def generate_segments(X: npt.ArrayLike, change_points: list[int]) -> Generator:
    """Generate separate segments from time series based on change points.

    Parameters
    ----------
    X: array_like
        Time series data as a 2D numpy array with sequence index along rows
        and value series in columns.

    change_points: list of int
        Locations of change points as integer indexes. By convention change points
        include the identity segmentation, i.e. first and last index + 1 values.

    Yields
    ------
    segment: npt.ArrayLike
        A segments from the input time series between two consecutive change points
    """
    for start, end in zip(change_points[:-1], change_points[1:], strict=True):
        yield X[start:end, :]


def generate_segments_pandas(X: npt.ArrayLike, change_points: list) -> Generator:
    """Generate separate segments from time series based on change points.

    Parameters
    ----------
    X: array_like
        Time series data as a 2D numpy array with sequence index along rows
        and value series in columns.

    change_points: list of int
        Locations of change points as integer indexes. By convention change points
        include the identity segmentation, i.e. first and last index + 1 values.

    Yields
    ------
    segment: npt.ArrayLike
        A segments from the input time series between two consecutive change points
    """
    for interval in pd.IntervalIndex.from_breaks(sorted(change_points), closed="both"):
        yield X[interval.left : interval.right, :]


@dataclass
class _IGTS:
    """
    Information Gain based Temporal Segmentation (GTS).

    GTS is an unsupervised method for segmenting multivariate time series
    into non-overlapping segments by locating change points that for which
    the information gain is maximized.

    Information gain (IG) is defined as the amount of entropy lost by the segmentation.
    The aim is to find the segmentation that have the maximum information
    gain for a specified number of segments.

    GTS uses top-down search method to greedily find the next change point
    location that creates the maximum information gain. Once this is found, it
    repeats the process until it finds `k_max` splits of the time series.

    Uses cumulative sums (Eq. 5-6 of the IGTS paper) so that segment channel sums are
    computed in O(m) instead of O(mn).

    .. note::

       For univariate input the caller should augment the series with its
       complement channel via :func:`_augment_univariate` before calling
       :meth:`find_change_points`.  The wrapper
       :class:`InformationGainDetector` does this automatically.

    Parameters
    ----------
    k_max : int, default=10
        Maximum number of change points to find. The number of segments is thus k+1.
    step : int, default=5
        Step size, or stride for selecting candidate locations of change points.
        Fox example a `step=5` would produce candidates [0, 5, 10, ...]. Has the same
        meaning as `step` in `range` function.

    Notes
    -----
    Based on the work from Sadri et al. (2017).

    - alt. py implementation: https://github.com/cruiseresearchgroup/IGTS-python
    - MATLAB version: https://github.com/cruiseresearchgroup/IGTS-matlab
    - paper available at:
      https://www.sciencedirect.com/science/article/abs/pii/S1574119217300081
    """

    # init attributes
    k_max: int = 10
    step: int = 5

    # computed attributes
    intermediate_results_: list = field(init=False, default_factory=list)

    def identity(self, X: npt.ArrayLike) -> list[int]:
        """Return identity segmentation, i.e. terminal indexes of the data."""
        return sorted([0, X.shape[0]])

    def get_candidates(self, n_samples: int, change_points: list[int]) -> list[int]:
        """Generate candidate change points.

        Also exclude existing change points.

        Parameters
        ----------
        n_samples: int
            Length of the time series.
        change_points: list of ints
            Current set of change points, that will be used to exclude values
            from candidates.

        TODO: exclude points within a neighborhood of existing
        change points with neighborhood radius
        """
        return sorted(
            set(range(0, n_samples, self.step)).difference(set(change_points))
        )

    @staticmethod
    def information_gain_score(X: npt.ArrayLike, change_points: list[int]) -> float:
        """Calculate the information gain score.

        The formula is based on equation (2) from [1]_.

        Parameters
        ----------
        X: array_like
            Time series data as a 2D numpy array with sequence index along rows
            and value series in columns.

        change_points: list of ints
            Locations of change points as integer indexes. By convention change points
            include the identity segmentation, i.e. first and last index + 1 values.

        Returns
        -------
        information_gain: float
            Information gain score for the segmentation corresponding to the change
            points.
        """
        segment_entropies = [
            seg.shape[0] * entropy(seg) for seg in generate_segments(X, change_points)
        ]
        return entropy(X) - sum(segment_entropies) / X.shape[0]

    @staticmethod
    def _ig_from_cumsum(
        cumsum: np.ndarray,
        n_samples: int,
        h_total: float,
        change_points: list[int],
    ) -> float:
        """Compute information gain using precomputed cumulative sums.

        Implements Eq. 2 with the O(m) segment-sum trick of Eq. 5-6.

        Parameters
        ----------
        cumsum : np.ndarray, shape (n_samples + 1, n_channels)
            Cumulative sum with a leading row of zeros so that
            ``cumsum[t]`` = sum of X[0:t] along axis 0.
        n_samples : int
            Length of the time series.
        h_total : float
            Precomputed entropy of the whole series.
        change_points : list of int
            Sorted boundary indices including 0 and n_samples.

        Returns
        -------
        ig : float
        """
        weighted_h = 0.0
        for start, end in zip(change_points[:-1], change_points[1:], strict=True):
            seg_len = end - start
            if seg_len <= 0:
                continue
            seg_sums = cumsum[end] - cumsum[start]  # O(m)
            weighted_h += seg_len * _entropy_from_sums(seg_sums)
        return h_total - weighted_h / n_samples

    def find_change_points(self, X: npt.ArrayLike) -> list[int]:
        """Find change points.

        Using a top-down search method, iteratively identify at most
        `k_max` change points that increase the information gain score
        the most.

        Uses precomputed cumulative sums (Eq. 5-6) so that each candidate
        evaluation is O(k·m) instead of O(k·m·n).

        Parameters
        ----------
        X: array_like
            Time series data as a 2D numpy array with sequence index along rows
            and value series in columns.

        Returns
        -------
        change_points: list of ints
            Locations of change points as integer indexes. By convention change points
            include the identity segmentation, i.e. first and last index + 1 values.
        """
        n_samples, n_series = X.shape
        self.intermediate_results_ = []

        # Precompute cumulative sums — Eq. 5-6.  cumsum[t] = sum(X[0:t]).
        cumsum = np.zeros((n_samples + 1, n_series), dtype=np.float64)
        np.cumsum(X, axis=0, out=cumsum[1:])

        # Precompute total entropy once.
        total_sums = cumsum[n_samples]  # = cumsum[-1]
        h_total = _entropy_from_sums(total_sums)

        # by convention initialize with the identity segmentation
        current_change_points = self.identity(X)

        for k in range(self.k_max):
            best_candidate = -1
            ig_max = -1.0
            # find a point which maximizes score
            for candidate in self.get_candidates(n_samples, current_change_points):
                try_change_points = sorted(set(current_change_points) | {candidate})
                ig = self._ig_from_cumsum(cumsum, n_samples, h_total, try_change_points)
                if ig > ig_max:
                    ig_max = ig
                    best_candidate = candidate

            current_change_points.append(best_candidate)
            current_change_points.sort()
            self.intermediate_results_.append(
                ChangePointResult(
                    k=k, score=ig_max, change_points=current_change_points.copy()
                )
            )

        return current_change_points


[docs] class InformationGainDetector(BaseSegmenter): """Information Gain based Temporal Segmentation (GTS) Estimator. GTS is an unsupervised method for segmenting time series into non-overlapping segments by locating change points that maximise the information gain. Information gain (IG) is defined as the amount of entropy lost by the segmentation. The aim is to find the segmentation that has the maximum information gain for a specified number of segments. GTS uses a top-down search to greedily find the next change point that creates the maximum information gain. Once found, the process repeats until ``k_max`` splits have been made. For **univariate** input the series is automatically augmented with its normalised complement channel (Eq. 12-13 of [1]_) so that entropy can vary across segments. Parameters ---------- k_max: int, default=10 Maximum number of change points to find. The number of segments is thus k+1. step: : int, default=5 Step size, or stride for selecting candidate locations of change points. Fox example a ``step=5`` would produce candidates [0, 5, 10, ...]. Has the same meaning as ``step`` in ``range`` function. Attributes ---------- change_points_: list of change points as integer indexes. By convention change points include the identity segmentation, i.e. first and last index + 1 values. intermediate_results_: list of ``ChangePointResult`` Intermediate segmentation results for each k value, where k=1, 2, ..., k_max Notes ----- Based on the work from [1]_. - alt. py implementation: https://github.com/cruiseresearchgroup/IGTS-python - MATLAB version: https://github.com/cruiseresearchgroup/IGTS-matlab - paper available at: References ---------- .. [1] Sadri, Amin, Yongli Ren, and Flora D. Salim. "Information gain-based metric for recognizing transitions in human activities.", Pervasive and Mobile Computing, 38, 92-109, (2017). https://www.sciencedirect.com/science/article/abs/pii/S1574119217300081 Examples -------- >>> from aeon.testing.data_generation import make_example_dataframe_series >>> from sklearn.preprocessing import MinMaxScaler >>> from aeon.segmentation import InformationGainSegmenter >>> X = make_example_dataframe_series(n_channels=2, random_state=10) >>> X_scaled = MinMaxScaler(feature_range=(0, 1)).fit_transform(X) >>> igts = InformationGainSegmenter(k_max=3, step=2) >>> y = igts.fit_predict(X_scaled, axis=0) """ _tags = { "capability:univariate": True, "capability:multivariate": True, "returns_dense": True, "fit_is_empty": False, "detector_type": "change_point_detection", "capability:unsupervised": True, "capability:semi_supervised": True, } _parameter_schema = { "k_max": ParamDef( constraint=Interval(int, 0, None, Closed.LEFT), description="Maximum number of change points to find.", ), "step": ParamDef( constraint=Interval(int, 1, None, Closed.LEFT), description="Stride for candidate change point locations.", ), } def __init__( self, k_max: int = 10, step: int = 5, ): self.k_max = k_max self.step = step self._igts = _IGTS( k_max=k_max, step=step, ) self.n_segments = k_max + 1 super().__init__(axis=0) def _fit(self, X, y=None): """Fit the IGTS model.""" return self def _predict(self, X, y=None) -> np.ndarray: """Perform segmentation. Parameters ---------- X: np.ndarray 2D time series shape (n_timepoints, n_channels). Returns ------- y_pred : np.ndarray 1D array with predicted segmentation of the same size as X. The numerical values represent distinct segment labels for each of the data points. """ # k_max=0 → no change points requested (e.g. guided mode on a # single-segment signal). Return empty prediction immediately. if self.k_max == 0: self.intermediate_results_ = [] return np.array([], dtype=int) # Apply normalise + complement augmentation (Section 4.4, Eq. 12-13) # to all data — required for univariate and handles positive # correlation in multivariate data. X_work = _augment_univariate(X) change_points_ = self._igts.find_change_points(X_work) self.intermediate_results_ = self._igts.intermediate_results_ # Return sparse change point indices (exclude boundary 0 and N). cps = np.array(sorted(set(change_points_[1:-1])), dtype=int) return cps def __repr__(self) -> str: """Return a string representation of the estimator.""" return self._igts.__repr__() @classmethod def _get_test_params(cls, parameter_set="default"): """Return testing parameter settings for the estimator. Parameters ---------- parameter_set : str, default="default" Name of the set of test parameters to return, for use in tests. If no special parameters are defined for a value, will return `"default"` set. Returns ------- params : dict or list of dict """ return {"k_max": 2, "step": 1}