"""
Information Gain-based Temporal Segmenter.
Information Gain Temporal Segmentation (_IGTS) is a method for segmenting
multivariate time series based off reducing the entropy in each segment.
The amount of entropy lost by the segmentations made is called the Information
Gain (IG). The aim is to find the segmentations that have the maximum information
gain for any number of segmentations.
"""
from collections.abc import Generator
from dataclasses import dataclass, field
import numpy as np
import numpy.typing as npt
import pandas as pd
from ..base import BaseSegmenter
from ..param_schema import Closed, Interval, ParamDef
__all__ = ["InformationGainDetector"]
__maintainer__ = []
def _augment_univariate(X: npt.ArrayLike) -> np.ndarray:
"""Augment a univariate series with its complement channel.
Applies the data transformation from Section 4.4 of the IGTS paper:
1. **Normalize** each channel so that values sum to 1 (Eq. 12).
2. **Append the complement** ``max(c_i) - c_i`` for every channel,
doubling the number of columns from *m* to *2m* (Eq. 13).
For a univariate input this produces a bivariate series where one
channel rises whenever the other falls, breaking the constant-entropy
degeneracy that prevents information gain from detecting change points
in single-channel data.
Parameters
----------
X : array_like, shape (n_timepoints, n_channels)
Input time series.
Returns
-------
X_aug : np.ndarray, shape (n_timepoints, 2 * n_channels)
Augmented time series.
References
----------
Sadri, Amin, Yongli Ren, and Flora D. Salim.
"Information gain-based metric for recognizing transitions in
human activities.", Pervasive and Mobile Computing, 38, 92-109, (2017).
"""
n_samples, n_channels = X.shape
X_aug = np.empty((n_samples, 2 * n_channels), dtype=np.float64)
for i in range(n_channels):
c = X[:, i].astype(np.float64)
c_min = c.min()
c_shifted = c - c_min
denom = c_shifted.sum()
if denom == 0.0:
c_norm = np.zeros_like(c)
else:
c_norm = c_shifted / denom
X_aug[:, i] = c_norm
X_aug[:, n_channels + i] = c_norm.max() - c_norm
return X_aug
@dataclass
class ChangePointResult:
k: int
score: float
change_points: list[int]
def entropy(X: npt.ArrayLike) -> float:
"""Shannons's entropy for time series.
As defined by equations (3) and (4) from [1]_.
Parameters
----------
X: array_like
Time series data as a 2D numpy array with sequence index along rows
and value series in columns.
Returns
-------
entropy: float
Computed entropy.
"""
if X.size == 0 or X.shape[0] == 0:
return 0.0
column_totals = np.sum(X, axis=0)
overall_total = float(np.sum(column_totals))
if overall_total <= 0.0:
shifted = X - np.min(X, axis=0, keepdims=True)
column_totals = np.sum(shifted, axis=0)
overall_total = float(np.sum(column_totals))
if overall_total <= 0.0:
return 0.0
X = shifted
with np.errstate(divide="ignore", invalid="ignore"):
p = np.divide(np.sum(X, axis=0), overall_total, where=overall_total != 0.0)
p = p[p > 0.0]
if p.size == 0:
return 0.0
return -np.sum(p * np.log(p))
def _entropy_from_sums(channel_sums: np.ndarray) -> float:
"""Compute Shannon entropy from precomputed channel sums.
This is the O(m) version using cumulative sums (Eq. 5-6 of [1]_).
Parameters
----------
channel_sums : np.ndarray, shape (n_channels,)
Sum of each channel over the segment.
Returns
-------
h : float
Shannon entropy of the segment.
"""
total = channel_sums.sum()
if total <= 0.0:
return 0.0
p = channel_sums / total
p = p[p > 0.0]
if p.size == 0:
return 0.0
return -float(np.sum(p * np.log(p)))
def generate_segments(X: npt.ArrayLike, change_points: list[int]) -> Generator:
"""Generate separate segments from time series based on change points.
Parameters
----------
X: array_like
Time series data as a 2D numpy array with sequence index along rows
and value series in columns.
change_points: list of int
Locations of change points as integer indexes. By convention change points
include the identity segmentation, i.e. first and last index + 1 values.
Yields
------
segment: npt.ArrayLike
A segments from the input time series between two consecutive change points
"""
for start, end in zip(change_points[:-1], change_points[1:], strict=True):
yield X[start:end, :]
def generate_segments_pandas(X: npt.ArrayLike, change_points: list) -> Generator:
"""Generate separate segments from time series based on change points.
Parameters
----------
X: array_like
Time series data as a 2D numpy array with sequence index along rows
and value series in columns.
change_points: list of int
Locations of change points as integer indexes. By convention change points
include the identity segmentation, i.e. first and last index + 1 values.
Yields
------
segment: npt.ArrayLike
A segments from the input time series between two consecutive change points
"""
for interval in pd.IntervalIndex.from_breaks(sorted(change_points), closed="both"):
yield X[interval.left : interval.right, :]
@dataclass
class _IGTS:
"""
Information Gain based Temporal Segmentation (GTS).
GTS is an unsupervised method for segmenting multivariate time series
into non-overlapping segments by locating change points that for which
the information gain is maximized.
Information gain (IG) is defined as the amount of entropy lost by the segmentation.
The aim is to find the segmentation that have the maximum information
gain for a specified number of segments.
GTS uses top-down search method to greedily find the next change point
location that creates the maximum information gain. Once this is found, it
repeats the process until it finds `k_max` splits of the time series.
Uses cumulative sums (Eq. 5-6 of the IGTS paper) so that segment channel sums are
computed in O(m) instead of O(mn).
.. note::
For univariate input the caller should augment the series with its
complement channel via :func:`_augment_univariate` before calling
:meth:`find_change_points`. The wrapper
:class:`InformationGainDetector` does this automatically.
Parameters
----------
k_max : int, default=10
Maximum number of change points to find. The number of segments is thus k+1.
step : int, default=5
Step size, or stride for selecting candidate locations of change points.
Fox example a `step=5` would produce candidates [0, 5, 10, ...]. Has the same
meaning as `step` in `range` function.
Notes
-----
Based on the work from Sadri et al. (2017).
- alt. py implementation: https://github.com/cruiseresearchgroup/IGTS-python
- MATLAB version: https://github.com/cruiseresearchgroup/IGTS-matlab
- paper available at:
https://www.sciencedirect.com/science/article/abs/pii/S1574119217300081
"""
# init attributes
k_max: int = 10
step: int = 5
# computed attributes
intermediate_results_: list = field(init=False, default_factory=list)
def identity(self, X: npt.ArrayLike) -> list[int]:
"""Return identity segmentation, i.e. terminal indexes of the data."""
return sorted([0, X.shape[0]])
def get_candidates(self, n_samples: int, change_points: list[int]) -> list[int]:
"""Generate candidate change points.
Also exclude existing change points.
Parameters
----------
n_samples: int
Length of the time series.
change_points: list of ints
Current set of change points, that will be used to exclude values
from candidates.
TODO: exclude points within a neighborhood of existing
change points with neighborhood radius
"""
return sorted(
set(range(0, n_samples, self.step)).difference(set(change_points))
)
@staticmethod
def information_gain_score(X: npt.ArrayLike, change_points: list[int]) -> float:
"""Calculate the information gain score.
The formula is based on equation (2) from [1]_.
Parameters
----------
X: array_like
Time series data as a 2D numpy array with sequence index along rows
and value series in columns.
change_points: list of ints
Locations of change points as integer indexes. By convention change points
include the identity segmentation, i.e. first and last index + 1 values.
Returns
-------
information_gain: float
Information gain score for the segmentation corresponding to the change
points.
"""
segment_entropies = [
seg.shape[0] * entropy(seg) for seg in generate_segments(X, change_points)
]
return entropy(X) - sum(segment_entropies) / X.shape[0]
@staticmethod
def _ig_from_cumsum(
cumsum: np.ndarray,
n_samples: int,
h_total: float,
change_points: list[int],
) -> float:
"""Compute information gain using precomputed cumulative sums.
Implements Eq. 2 with the O(m) segment-sum trick of Eq. 5-6.
Parameters
----------
cumsum : np.ndarray, shape (n_samples + 1, n_channels)
Cumulative sum with a leading row of zeros so that
``cumsum[t]`` = sum of X[0:t] along axis 0.
n_samples : int
Length of the time series.
h_total : float
Precomputed entropy of the whole series.
change_points : list of int
Sorted boundary indices including 0 and n_samples.
Returns
-------
ig : float
"""
weighted_h = 0.0
for start, end in zip(change_points[:-1], change_points[1:], strict=True):
seg_len = end - start
if seg_len <= 0:
continue
seg_sums = cumsum[end] - cumsum[start] # O(m)
weighted_h += seg_len * _entropy_from_sums(seg_sums)
return h_total - weighted_h / n_samples
def find_change_points(self, X: npt.ArrayLike) -> list[int]:
"""Find change points.
Using a top-down search method, iteratively identify at most
`k_max` change points that increase the information gain score
the most.
Uses precomputed cumulative sums (Eq. 5-6) so that each candidate
evaluation is O(k·m) instead of O(k·m·n).
Parameters
----------
X: array_like
Time series data as a 2D numpy array with sequence index along rows
and value series in columns.
Returns
-------
change_points: list of ints
Locations of change points as integer indexes. By convention change points
include the identity segmentation, i.e. first and last index + 1 values.
"""
n_samples, n_series = X.shape
self.intermediate_results_ = []
# Precompute cumulative sums — Eq. 5-6. cumsum[t] = sum(X[0:t]).
cumsum = np.zeros((n_samples + 1, n_series), dtype=np.float64)
np.cumsum(X, axis=0, out=cumsum[1:])
# Precompute total entropy once.
total_sums = cumsum[n_samples] # = cumsum[-1]
h_total = _entropy_from_sums(total_sums)
# by convention initialize with the identity segmentation
current_change_points = self.identity(X)
for k in range(self.k_max):
best_candidate = -1
ig_max = -1.0
# find a point which maximizes score
for candidate in self.get_candidates(n_samples, current_change_points):
try_change_points = sorted(set(current_change_points) | {candidate})
ig = self._ig_from_cumsum(cumsum, n_samples, h_total, try_change_points)
if ig > ig_max:
ig_max = ig
best_candidate = candidate
current_change_points.append(best_candidate)
current_change_points.sort()
self.intermediate_results_.append(
ChangePointResult(
k=k, score=ig_max, change_points=current_change_points.copy()
)
)
return current_change_points