Source code for tsseg.algorithms.autoplait.detector

"""
This module provides an aeon-compatible wrapper for the AutoPlait algorithm.
"""
import numpy as np
from ..base import BaseSegmenter
from .autoplait_c import AutoPlait
from ..param_schema import (
    Closed,
    Interval,
    ParamDef,
)

[docs] class AutoPlaitDetector(BaseSegmenter): """ Wrapper for the AutoPlait state detection algorithm. References ---------- .. [1] Y. Matsubara, Y. Sakurai, and C. Faloutsos, "AutoPlait: automatic mining of co-evolving time sequences," SIGMOD, 2014, doi: 10.1145/2588555.2588556. .. [2] Online Semantic Segmentation project page, https://sites.google.com/site/onlinesemanticsegmentation/ (reference implementation). """ _tags = { "capability:univariate": True, "capability:multivariate": True, "fit_is_empty": False, "returns_dense": False, "detector_type": "state_detection", "capability:unsupervised": False, "capability:semi_supervised": True, } _parameter_schema = { "n_cps": ParamDef( constraint=Interval(int, 1, None, Closed.LEFT), description="Number of change points (required if y not provided).", nullable=True, ), } def __init__(self, n_cps=None): super().__init__(axis=0) self.n_cps = n_cps def _fit(self, X, y): """ Fit the segmenter. If y is provided (semi-supervised), we extract n_cps from it. If y is None (unsupervised), we rely on the __init__ n_cps or raise error. """ if y is None and self.n_cps is None: raise ValueError( f"{self.__class__.__name__} requires `n_cps` or `y` to be provided. " "It is not capable of pure unsupervised learning without hints." ) # If unsupervised and capable, we keep self.n_cps as defined in __init__ (or None if algo handles it) return self def _predict(self, X, y=None): """ Segment the time series to find change points and assign state labels. This method calls the underlying AutoPlait C implementation via the `AutoPlait` runner class. Parameters ---------- X : np.ndarray The time series to segment of shape (n_channels, n_timepoints). y : None Not used, for API compatibility. Returns ------- np.ndarray The predicted state labels as a 1D array of shape (n_timepoints,). """ # Ensure a 2D array where rows are time points and columns are features. ts = np.asarray(X) if ts.ndim == 1: ts2d = ts[:, np.newaxis] elif ts.ndim == 2: ts2d = ts else: ts2d = ts.reshape(ts.shape[0], -1) # 1. Instantiate the runner class from autoplait.py autoplait_runner = AutoPlait() # 2. Run the C code to get changepoints and labels change_points, segment_labels = autoplait_runner._run_autoplait( ts2d, self.n_cps ) change_points = np.asarray(change_points, dtype=int) segment_labels = np.asarray(segment_labels, dtype=int) n_samples = ts2d.shape[0] if segment_labels.size == 0: if change_points.size == 0: return np.zeros(n_samples, dtype=int) segment_labels = np.arange(change_points.size + 1, dtype=int) if change_points.size > 0: change_points = np.sort(change_points) change_points = change_points[(change_points > 0) & (change_points < n_samples)] expected_segments = change_points.size + 1 if segment_labels.size < expected_segments: pad_value = segment_labels[-1] pad_width = expected_segments - segment_labels.size segment_labels = np.pad( segment_labels, (0, pad_width), mode="constant", constant_values=pad_value ) elif segment_labels.size > expected_segments: segment_labels = segment_labels[:expected_segments] dense_labels = np.empty(n_samples, dtype=int) boundaries = list(change_points.tolist()) boundaries.append(n_samples) start = 0 for label, end in zip(segment_labels, boundaries): end_idx = min(max(end, start), n_samples) dense_labels[start:end_idx] = label start = end_idx if start < n_samples: dense_labels[start:] = segment_labels[-1] return dense_labels