import multiprocessing
import numpy as np
import pandas as pd
from typing import List, Optional
from .embedding_matrix import format_embedding_with_overlapping_windows, create_univariate_embedding
from .pattern_mining import mine_patterns_univariate
from .pattern_filter import filter_maximum_variance, filter_jaccard_similarity_in_embedding, filter_pbad_embedding, filter_jaccard_similarity
from .PatternBasedEmbedder import PatternBasedEmbedder
from .PatternBasedEmbedding import PatternBasedEmbedding
[docs]
class FrequentPatternMiningEmbedder(PatternBasedEmbedder):
"""
Mines frequent sequential patterns from the time series data to
construct a pattern-based embedding. This process happens in a
few different steps.
First, the time series is preprocessed by extracting fixed-size
subsequences at multiple resolution from each attribute of the
time series. These subsequences are consequently discretized
through SAX. By employing multiple resolutions we enable to
capture short-term and long-term behavior.
Second, patterns are mined in every resolution and for each
attribute independently. The patterns are then pruned using
Jaccard similarity and the relative support to obtain a set
of more interesting patterns.
Third, The patterns are converted to an embedding matrix by
computing at which interval the pattern and replacing those
values in the matrix by the relative support of that pattern.
If multiple subsequences overlap and therefore cover the same
observations, the average value of each subsequence is taken.
Parameters
----------
window_sizes : Optional[List[int]], default=None
The different window sizes or resolutions to use for extracting
subsequences from the time series, from which patterns will be
mined.
stride : int, default = 1
The amount with which the sliding windows will shift to extract
fixed-size subsequences.
word_size : int, default=16
The word size for SAX discretization. This is the number of
discrete symbols to maintain in the discretized word through
PAA.
alphabet_size : int, default=3
The alphabet size for SAX discretization. This is th number
of discrete symbols used to create words.
binning_method : str, default=``'gloabl'``
How to compute the discrete symbols with SAX. The possible
options are:
- ``'global'``: Consider all values in the time series
and discretize them jointly;
- ``'local'``: Discretize the values of each subsequence
independently;
- ``'k_means'``: Use K-Means clustering to cluster the
values in the time series and assign a dicrete label
to each cluster.
top_k_patterns : int, default=25
The number of patterns with maximum relative support to mine
in each resolution.
min_pattern_size : int, default=3
The minimum length of a pattern before it should actually be
considered relevant.
max_pattern_size : int, default=10,
The maximum length of a pattern to consider it.
duration : float, default=1.2
The constraint on the relative duration of a pattern before
it covers a window. Specifically, for a pattern of length ``L``,
there may be at most ``np.floor(L * duration)`` gaps. The value
should therefore be larger than 1.
do_mdl : bool, default=False
Whether to prune the mined patterns based on the MDL-principle.
min_r_support : float, default=0.0
The minimum relative support of a pattern before it should be
considered relevant.
max_r_support : float, default=1.0
The maximum relative support a pattern may have for it to still
be interesting.
jaccard_similarity_threshold : float, default=0.9
The threshold on the Jaccard similarity for two patterns to be
considered similar.
nb_largest_variance : int, default=50
The number of patterns with largest variance to maintain.
do_pca : bool, default=False
Whether PCA should be performed to maintain the number of linear
combinations of patterns with largest variance.
n_jobs : int, default=1
The number of jobs that are allowed to run in parallel.
"""
def __init__(self,
window_sizes: List[int] = None,
stride: int = 1,
word_size: int = 16,
alphabet_size: int = 3,
binning_method: str = 'global',
top_k_patterns: int = 25,
min_pattern_size: int = 3,
max_pattern_size: int = 10,
duration: float = 1.2,
do_mdl: bool = False,
min_r_support: float = 0.0,
max_r_support: float = 1.0,
jaccard_similarity_threshold: float = 0.9,
nb_largest_variance: int = 50,
do_pca: bool = False,
n_jobs: int = 1):
self.window_sizes: Optional[List[int]] = window_sizes
self.stride: int = stride
self.word_size: int = word_size
self.alphabet_size: int = alphabet_size
self.binning_method: str = binning_method
self.top_k_patterns: int = top_k_patterns
self.min_pattern_size: int = min_pattern_size
self.max_pattern_size: int = max_pattern_size
self.duration: float = duration
self.do_mdl: bool = do_mdl
self.min_r_support: float = min_r_support
self.max_r_support: float = max_r_support
self.jaccard_similarity_threshold: float = jaccard_similarity_threshold
self.nb_largest_variance: int = nb_largest_variance
self.do_pca: bool = do_pca
self.n_jobs: int = n_jobs
[docs]
def fit(self, time_series: np.ndarray, y=None) -> 'FrequentPatternMiningEmbedder':
"""
Fitting and transforming a time series using the :py:class:`FrequentPatternMiningEmbedder`
is closely tight together, and therefore the :py:func:`~FrequentPatternMiningEmbedder.fit()`
method should not be used directly. Instead, use the :py:func:`~FrequentPatternMiningEmbedder.fit_transform()`
method.
Raises
------
exception : Exception
Upon calling this method.
"""
raise Exception('The fit and transform steps of the `FrequentPatternMiner` are closely tied together.'
'Instead of using `.fit()` and `.transform()` independently, use the `.fit_transform()` method!')
def _create_attribute_resolution_embedding(self, data: pd.DataFrame, attribute_id, interval):
"""
Create an embedding for the given data with the given resolution
:param data: The (univariate) time series, a pandas DataFrame with the columns 'average_value' and 'time'
:param attribute_id: The ID of the given univariate time series in a multivariate time series (only for
administrative aspects and bookkeeping)
:param interval: The interval length or window size that the subsequences should have
:return:
"""
# Mine the patterns in the time series
patterns, pbad_embedding, _, _ = mine_patterns_univariate(
data=data,
files_prefix='temp/' + str(attribute_id) + '_' + str(interval) + '/',
interval=interval,
stride=self.stride,
nb_symbols=self.word_size,
nb_bins=self.alphabet_size,
binning_method=self.binning_method,
top_k_patterns=self.top_k_patterns,
min_pattern_size=self.min_pattern_size,
max_pattern_size=self.max_pattern_size,
pattern_duration=self.duration,
do_mdl=self.do_mdl
)
if patterns.shape[0] == 0: # Return if no patterns have been found
return attribute_id, np.ndarray([]), patterns
patterns['interval'] = interval
# Filter the patterns in this specific resolution
nb_original_patterns = patterns.shape[0]
if 0.0 < self.min_r_support < self.max_r_support:
patterns = patterns.loc[patterns['rsupport'] >= self.min_r_support]
if self.min_r_support < self.max_r_support <= 1.0:
patterns = patterns.loc[patterns['rsupport'] <= self.max_r_support]
patterns = filter_jaccard_similarity(patterns, self.jaccard_similarity_threshold)
# Format pbad embedding to remove filtered patterns
if nb_original_patterns > patterns.shape[0]:
pbad_embedding = filter_pbad_embedding(patterns, pbad_embedding)
# Create the embedding for the resolution
new_embedding = create_univariate_embedding(pbad_embedding, patterns.shape[0])
new_embedding = format_embedding_with_overlapping_windows(new_embedding, interval, self.stride)
# Reset index of patterns, thus pattern i corresponds to row i in the embedding
patterns.reset_index(drop=True, inplace=True)
return attribute_id, new_embedding, patterns