TEPデータに変化点検知アルゴリズムをいくつか適用してみた

初めまして。Customer Analytics Divisionの加藤と申します。普段の業務では、電力に関するデータの分析を行っています。

本記事では、変化点検知やスタンダードなアルゴリズムについていくつか説明したあと、Tennessee Eastman Process(TEP)のデータに各アルゴリズムを簡単に適用した結果を紹介します。

 

変化点検知とは

変化点検知とは、異常検知の一種です。異常検知は以下の3つに大別されます。

  • 外れ値検知:データの正常時の分布から大きく外れたデータを検知する
  • 変化点検知:データの傾向が変化した点を検知する
  • 異常部位検知:データの傾向が異常な区間を検知する

変化点検知の図

今回は変化点検知をテーマとして、いくつかのスタンダードなアルゴリズムをTEPデータに適用してみます。

 

今回使用するアルゴリズムの説明

ChangeFinder[1,2]

概要
・時系列モデルを構築し、それに基づき各実測値の尤度から異常スコアを算出します。

■アルゴリズム
 ・まず以下の3ステップで異常スコアを計算します。

第一段階学習
・SDAR(Sequentially Discounting AR model learning)アルゴリズムにより、各時点における時系列モデルを構築します。
・構築した時系列モデルを元に、次の実測値が得られる尤度を求めます。
・計算した尤度から外れ値スコア\(S(x_t)\)を算出します。
  \(S(x_t) = -\log P_{t-1}(x_t | x_1, x_2, \cdots, x_{t-1})\)

◆外れ値スコアの平滑化
・移動平均を計算することで、外れ値スコアを平滑化します。

第二段階学習
・平滑化した外れ値スコアに対して第一弾学習と同様の手順を適用し、得られた結果を異常スコアとします。

・異常スコアに対して閾値を別途設定し、それを上回るデータを変化点とします。

Ruptures (PELT)[3,4]

■概要
・Rupturesは単一のアルゴリズムではなく、いくつかのアルゴリズムとコスト関数を組み合わせて使うことのできるライブラリです。

・Pelt (Pruned Exact Linear Time)はRupturesで実装されているアルゴリズムで、動的計画法を用いて変化点を検出するアルゴリズムです。

・計算量が線形に抑えられているため、大規模なデータセットにも適しています。

■アルゴリズム
 ・計算量削減のため、以下の条件が成立する時点\(\tau\)を変化点候補(長さ\(n\)のデータの場合、変化点候補は最大で\(n-1\)個)から除外します(ただし\(0 \leq \tau < t\))。

\(Q_{\tau} + L(y_{\tau + 1:t}) + \beta > Q_t\)

・\(Q_{\tau}\):時刻\(\tau\)まで変化点を選択しデータをモデルにフィットさせた場合の累積コストの最小値
・\(L(y_{\tau + 1:t})\):時刻\(\tau+1\)から時刻\(t\)までのデータをモデリングするコスト
・\(\beta\)ペナルティ値(この値を大きくすると、変化点候補が少なくなる)
・\(Q_t\):時刻\(t\)まで変化点を選択しデータをモデルにフィットさせた場合の累積コストの最小値


・残った変化点候補に対して以下の式を適用し、変化点の数\(k\)
と位置\(\tau\)を最適化します。
\(\min_{k, \tau} \big[ \sum_{k=0}^K (L(y_{\tau_k + 1 : \tau_{k+1}} ) + \beta) \big] – \beta\)

・\(K\):変化点候補の数

特異スペクトル変換[5]

■概要
・データ全体を時刻\(t\)を中心とした部分時系列に区切り、訓練データの部分時系列に照らし、調査データの部分時系列の異常度を特異値を用いて表現します。


■アルゴリズム
・訓練データの部分時系列の特徴を表す規格化されたベクトルを\(\mathbf{\it{u}}^{(t)}\)、調査データ部分時系列の特徴を表す規格化されたベクトルを\(\mathbf{\it{q}}^{(t)}\)とすると、調査データの部分時系列の異常度\(a^{(t)}\)は以下のように表せます。
\(a^{(t)} = 1- {\mathbf{\it{u}}^{(t)}}^T \mathbf{\it{q}}^{(t)}\)

・訓練データと調査データそれぞれの部分時系列からなる行列に対し特異値分解を行うことで、\(\mathbf{\it{u}}^{(t)}\)、\(\mathbf{\it{q}}^{(t)}\)を得ます(特徴ベクトルを複数選択した場合は行列を得ます)。

・ここから計算される\({\mathbf{\it{u}}^{(t)}}^T \mathbf{\it{q}}^{(t)}\)の行列2ノルム(\({\mathbf{\it{u}}^{(t)}}^T \mathbf{\it{q}}^{(t)}\)の最大特異値。最大固有値の平方根)は\(\mathbf{\it{u}}^{(t)}\)と\(\mathbf{\it{q}}^{(t)}\)の類似度のような値(仮に両者が全く同じベクトルであれば1、直交すれば0となる)のため、これを用いることで最終的に異常度\(a^{(t)}\)を求めることができます。
\(a^{(t)} = 1 – ({\mathbf{\it{u}}^{(t)}}^T \mathbf{\it{q}}^{(t)}の最大特異値)\)

・訓練データの部分時系列を固定し、調査データの部分時系列をずらしながら\(a^{(t)}\)を計算すると、各点に対して異常スコアを付与し閾値を用いて単一の異常点を特定する、といったことができます。

Hotelling\(T^2\)[5]

■概要
・対象の母集団が正規分布することを仮定し、各データの異常度を計算します。
・一般的には外れ値検知に用いられます(そのため、今回は参考として使用)。

■アルゴリズム
・以下の形で異常スコアを計算します。
・各データの異常スコアは次の式で計算されます。
\(a(x)=-\log \big( \frac{1}{\sqrt{2 \pi \sigma}} \exp\big( -\frac{(x-\mu)^2}{2 \sigma^2} \big) \big)\)
・\(\mu\):データの平均値
・\(\sigma\):データの標準偏差

定数等を省略し、以下のように簡略化して表します。
\(a(x)^\prime = \frac{(x-\mu)^2}{\sigma^2}\)

・異常スコアに対して異常と判定する閾値を別途設定し、今回はそれを上回った最初のデータ点を変化点とみなします。

TEPデータの説明

■概要

・TEPデータは、Eastman Chemical社により現実の化学プラントを参考に設計・開発されたプロセスシミュレータで、プロセス制御技術の研究や異常診断手法の検証などに用いられます。

・A、C、D、E の4つの化合物からG、Hの2つの化合物を得る反応における化合物の量などをシミュレーションしたデータとなっており、系中にA~H の8つの化合物が存在します。

Eastman Chemical社により現実の化学プラントを参考に設計・開発されたプロセスシミュレータ「TEPデータ」の図
J. J. DOWNSs, E. F. Vogel (1993)

■詳細
・1つのデータセットで11個の操作変数(XMV(1)~XMV(11))と41個の観測変数(XMEAS(1)~XMEAS(41))の計52個の変数が設定されており、それぞれが時系列データとなっています(例えば、XMV(1)は化合物Dの供給速度、XMEAS(1)は化合物Aの供給量)。
・各変数は960個のデータポイントから成り、サンプリング周期は3分間です。
・20種類の異常(化合物の比率を変える、反応炉の温度を変えるなど)がシミュレートされており、それぞれの異常に1つのデータセットが対応します(IDV(1)~IDV(20))。
・変化点は160ポイントの時点(観測開始から8時間後)です。
・より詳細には、こちらの論文[6]を参照ください。

■データ取得元
Papers with Code – TEP DatasetCPDE/TEP_data at master · YKatser/CPDE
・上記のtesting partを使用します。

異常の種類によっては変化が現れるのが不自然な変数もあるため、こちらのドキュメント[7]を参考にさせていただき、因果関係が妥当と考えられる異常の種類と変数の組み合わせとして、以下を選択しました。

■異常の種類
・IDV(10):C feed temperature (stream 4)

■変数
・XMEAS(1):A feed (stream 1)
・XMEAS(7):Reactor pressure
・XMEAS(13):Product separator pressure
・XMEAS(16):Stripper pressure
・XMEAS(18):Stripper temperature
・XMEAS(20):Compressor work
・XMV(3):A feed flow (stream 1)
・XMV(5):Compressor recycle valve
・XMV(9):Stripper steam valve

各アルゴリズムの実装

まずは必要なライブラリをインポートします。

import pandas as pd
import polars as pl
import numpy as np
import math
import scipy.stats as stats
import changefinder
import ruptures as rpt
import matplotlib.pyplot as plt
import glob
from pathlib import Path
import os

スケーリング、ノイズ除去を目的とした移動平均化を実行する関数や、上述のアルゴリズムを実行するための関数を作成します。

・アルゴリズム共通で行う処理およびアルゴリズム個別の前処理

def scale(data):
    """スケーリングを実行する関数

    Args:
        data (array-like): スケーリング対象の配列

    Returns:
        array-like: 最大値1、最小値0へスケーリングした配列
    """
    return (data - np.min(data)) / (np.max(data) - np.min(data))


def calc_mean_var(data, train_points):
    """Hotelling T^2 法に用いる平均と分散を出力する関数

    Args:
        data (array-like): スケーリング対象のデータ
        train_points (int): Hotelling T^2法の異常判定基準を作成するために用いるデータの範囲

    Returns:
        tuple: Hotelling T^2法の異常判定基準に用いる平均および分散
            mean (float): 平均
            var (float): 分散
    """
    mean = np.mean(data[:train_points])
    var = 0
    for x in data[:train_points]:
        var += (x - mean) ** 2
    var /= train_points
    return mean, var


def valid_convolve(xx, size):
    """補正を施した移動平均

    Args:
        xx (array-like): 移動平均を施す対象の配列
        size (int): 移動平均の窓長

    Returns:
        array-like: 対象配列の窓長sizeでの移動平均
    """
    b = np.ones(size) / size
    xx_mean = np.convolve(xx, b, mode="same")

    n_conv = math.ceil(size / 2)

    # 補正部分
    xx_mean[0] *= size / n_conv
    for i in range(1, n_conv):
        xx_mean[i] *= size / (i + n_conv)
        xx_mean[-i] *= size / (i + n_conv - (size % 2))
    # size%2は奇数偶数での違いに対応するため

    return xx_mean


def normalize(data, normalize_range=5):
    """移動平均を計算する関数

    Args:
        data (array-like): 移動平均を施す対象の配列
        normalize_range (int, optional): 移動平均窓長。実際に計算する窓長の半分の値を指定する. デフォルトは5。

    Returns:
        array-like: 移動平均を施した後の配列。サイズは元の配列と同じ。
    """
    win_len = 2 * normalize_range
    _data = np.array(data, dtype=np.float)
    return valid_convolve(_data, win_len)


def extract_matrix(data, start, end, win):
    """特異スペクトル変換に用いる履歴行列およびテスト行列作成用の関数

    Args:
        start (int): 行列に組み込むデータ点のうち、最初の列の先頭にくるものの位置(1x1要素)
        end (int): 行列に組み込むデータ点のうち、最後の列の先頭にくるものの位置(1x(end-start+1)要素)
        win (int): 作成する行列の行数。Window幅

    Returns:
        array-like: (win)x(end-start+1)行列
    """
    row = win  # 行数の設定
    column = end - start + 1  # 列数の設定
    matrix = np.empty((row, column))  # 空の行列を作成
    T = len(data)
    i = 0
    for t in range(start, end + 1):
        if T < t + row:
            return None
        else:
            matrix[:, i] = data[
                t : t + row
            ]  # dataのstartの位置からrows分だけ抽出しmatrixの列とする、を要素をずらしながら繰り返し
            i += 1
    return matrix

・ChangeFinderの実行関数

def exe_change_finder(data):
    """Change Finderを実行し、異常スコア差分のリストを返す関数

    Args:
        data (array-like): 一次元時系列データ

    Returns:
        array-like: 異常スコアのリスト
    """
    anomaly_scores = []
    for i in data:
        score = cf.update(i)
        anomaly_scores.append(score)
    return anomaly_scores

・特異スペクトル変換の実行関数

def exe_sst(data, win, basis_vector, cols, lag):
    """特異スペクトル変換を実行する関数

    Args:
        win (int): 履歴行列およびテスト行列の行数。Window幅
        basis_vectors (int): 抽出する主要な特異ベクトルの本数
        cols (int): 履歴行列とテスト行列の列数
        lag (int): 履歴行列とテスト行列の間のラグ

    Returns:
        array-like: 異常スコアのリスト
    """
    T = len(data)
    # テスト行列に用いるデータの範囲
    start_cal = cols + win
    end_cal = T - lag + 1
    # 異常度の計算
    change_scores = np.zeros(len(data))
    # 訓練行列の作成(訓練行列を固定する場合)
    start_tra = 0
    end_tra = start_tra + cols - 1
    tra_matrix = extract_matrix(data, start_tra, end_tra, win)
    for t in range(start_cal, end_cal - win + 1):
        # 調査行列の作成
        start_test = (
            start_tra + lag + t
        )  # 最大値は lag + (T - lag + 1) - win = T + 1 - win (例えばT=1600, win=3なら1598)
        end_test = (
            end_tra + lag + t
        )  # 最大値は cols - 1 + lag + (T - lag + 1) - win  + 1 = T + cols - win (例えばT=1600, win=3, cols=2なら1600)
        test_matrix = extract_matrix(data, start_test, end_test, win)
        if test_matrix is not None:
            # 特異値分解
            U_tra, _, _ = np.linalg.svd(tra_matrix, full_matrices=False)
            U_test, _, _ = np.linalg.svd(test_matrix, full_matrices=False)
            U_tra_m = U_tra[:, :basis_vector]
            U_test_m = U_test[:, :basis_vector]
            # 行列の積
            s = np.linalg.svd(np.dot(U_tra_m.T, U_test_m), full_matrices=False, compute_uv=False)
            # 最大特異値を用いた異常スコア計算
            change_scores[t] = 1 - s[0]
        else:
            change_scores[t] = 0.0
    return change_scores

・Hotellingの実行関数

def exe_hotelling(data, train_points):
    """Hoteling T^2 法に則り、異常スコアを計算する関数

    Args:
        data (array-like): 一次元時系列データ
        train_points (int): 異常判定を開始する点

    Returns:
        array-like: 異常スコアのリスト
    """
    anomaly_scores = []
    mean, var = calc_mean_var(data, train_points)
    for x in data[train_points:]:
        anomaly_score = (x - mean) ** 2 / var
        anomaly_scores.append(anomaly_score)
    return anomaly_scores

各アルゴリズムに入力するパラメータなどを設定します。

# パス設定
input_dir = Path(r"[データ格納先フォルダパス]")
# 使用対象のデータ
test_num_list = [10]
col_list = ["XMEAS(1)", "XMEAS(7)", "XMEAS(13)", "XMEAS(16)", "XMEAS(18)", "XMEAS(20)", "XMV(3)", "XMV(5)", "XMV(9)"]
changepoint = 160
TRAIN_POINTS = 100 # 平常とみなす範囲
NORMALISE = 5 # 移動平均窓長
THRESHOLD = 0.5 # ChangeFinder、特異スペクトル変換、Hotelling用の閾値

# 各モデル用パラメータ
# ChangeFinder
param_change_finder = {
    "R": 0.08,  # 忘却率
    "ORDER": 1,  # 時系列モデルの次数
    "SMOOTH": 5,  # ChangeFinderの中で行われる平滑化の窓長
}
# Ruptures
param_rpt = {"PEN": 5}  # ペナルティ値
# 特異スペクトル変換
param_sst = {
    "WIN": 50,  # 訓練行列および調査行列の行数
    "BASIS_VECTOR": 10,  # 特異ベクトルの本数
    "COLS": 10,  # 訓練行列および調査行列の列数
    "LAG": 10,  # 訓練行列と調査行列の間のラグ
}

ひとまず異常データをグラフで可視化します。

file_list = glob.glob(str(input_dir) + "/" + "*_te.dat") # testing partを使用
master_col_list = []
for i in range(1, 42):
    master_col_list.append("XMEAS({})".format(i))
for i in range(1, 12):
    master_col_list.append("XMV({})".format(i))
test = {}
for i, j in enumerate(file_list[0:], start=0):
    test[i] = pd.read_table(j, sep="\s+", names=master_col_list)

for k in range(1, len(file_list) - 1):
    save_path = save_dir / f"anomal_{k}.png"
    anomal_df = test[k]

    fig = plt.figure(figsize=(20, 20))
    fig_title = f"Anomal_{k}"
    fig.suptitle(fig_title)
    for i in range(len(master_col_list)):
        plt.subplot(math.ceil(len(master_col_list) / 4), 4, i + 1)
        plt.plot(range(len(anomal_df)), anomal_df[master_col_list[i]], label=master_col_list[i])
        plt.axvline(changepoint, color="red", linestyle="--", label="changepoint")
        plt.xlabel("Time")
        plt.ylabel(master_col_list[i])
        plt.legend()
    plt.tight_layout()

例えば、今回使用するデータセットIDV(10)のすべての変数を可視化したものが以下です。

青の実線は各変数の時系列推移、赤の点線は変化点を示しています。

ノイズの影響が大きい変数や、大局的な変動がわかりやすい変数など、見え方が様々な変数が格納されていることがわかります。
データセットIDV(10)のすべての変数を可視化したグラフ図

サンプルデータに対する各アルゴリズムの適用結果

前述の通りに指定したデータに対して各アルゴリズムを適用します。

error_df = pd.DataFrame(
    {
        "col_name": col_list,
        "ChangeFinder": [[] for i in range(len(col_list))],
        "Ruptures": [[] for i in range(len(col_list))],
        "特異スペクトル変換": [[] for i in range(len(col_list))],
        "Hotelling": [[] for i in range(len(col_list))],
    }
)
method_list = error_df.columns[1:]

cp_list = []
for i in range(len(test_num_list)):
    for j in range(len(col_list)):
        target_data = normalize(test[test_num_list[i]][col_list[j]].to_list(), NORMALISE)
        # Hotellingの実行
        score_hotelling = scale(exe_hotelling(target_data, TRAIN_POINTS))
        cp_hotelling = [i for i in range(len(score_hotelling)) if score_hotelling[i] > THRESHOLD][0] + TRAIN_POINTS
        cp_list.append(cp_hotelling)
        # ChangeFinderの実行
        np.random.seed(1)
        cf = changefinder.ChangeFinder(
            r=param_change_finder["R"], order=param_change_finder["ORDER"], smooth=param_change_finder["SMOOTH"]
        )
        score_cf = scale(exe_change_finder(target_data)[TRAIN_POINTS:])
        cp_cf = [i for i in range(len(score_cf)) if score_cf[i] > THRESHOLD][0] + TRAIN_POINTS
        cp_list.append(cp_cf)
        # 特異スペクトル変換(SST)の実行
        score_sst = scale(
            exe_sst(
                target_data,
                win=param_sst["WIN"],
                basis_vector=param_sst["BASIS_VECTOR"],
                cols=param_sst["COLS"],
                lag=param_sst["LAG"],
            )[TRAIN_POINTS:]
        )
        cp_sst = [i for i in range(len(score_sst)) if score_sst[i] > THRESHOLD][0] + TRAIN_POINTS
        cp_list.append(cp_sst)
        # Rupturesの実行
        algo = rpt.Pelt(model="rbf").fit(np.array(target_data))
        result = algo.predict(pen=param_rpt["PEN"])
        cp_rpt = result[0]
        cp_list.append(cp_rpt)
        # データフレームへの格納
        for method, cp in zip(method_list, cp_list):
            error_df.at[j, method].append(np.abs(cp - changepoint))
        cp_list.clear()

 

今回は複数のアルゴリズムを適用していますが、ノイズ除去の程度が結果に与える影響はアルゴリズムごとに異なると予想されるので、移動平均の窓長を0、10、50、200と変えて適用した結果を簡単に見ていきます。

 

正解の変化点である160に対し、予測した変化点がどれだけ離れていたかの差(絶対誤差)を指標としています。

ただし、前提として以下にご留意ください。

・今回、各アルゴリズムのパラメータはある程度の精度となるように調整しましたが、最適化は行っていません。

・ChangeFinder、特異ベクトル変換、Hotellingにおいて、異常スコアに適用する閾値は0.5(0以上1以下にスケーリングした異常スコアに適用)としています。予測した変化点がどれだけ離れていたかの差、絶対誤差の平均図

平均的にはChangeFinderの性能が最も安定しており、移動平均窓長が0の場合が最も高精度となりました。

一方、ChangeFinderだけでなくRuptures(PELT)やHotellingでも同様の傾向が見られますが、窓長を0から10に増やすと精度が低下しますが、窓長をさらに増やして50とすると精度が向上しました。

一般的に、ノイズを削除すると誤った箇所を変化点として予測するリスクが抑えられる一方、変化点が曖昧となるリスクが高まるというトレードオフ関係にあると考えられるので、上記のように窓長と精度が一貫した傾向になっていないと考えられます。

窓長0、200の時のChangeFinderの予測点と異常スコアをグラフで見てみます(見やすさのため、各変数と異常スコアはスケーリングして縦に並べています)。

窓長0、200の時のChangeFinderの予測点と異常スコアをグラフ
ChangeFinder:移動平均窓長0
200の時のChangeFinderの予測点と異常スコアをグラフ
ChangeFinder:移動平均窓長200

移動平均窓長を0から200にしたことで全体的に変化点が曖昧になり、XMEAS(7)、(13)、(16)などは直感的にも変化点と思えない位置が正解点となっています。

また、XMEAS(1)、XMV(3)のようなノイズの影響が大きいデータに対して、移動平均窓長が0の状態でも正解点に比較的近い位置で変化を予測できており、異常スコアに適用する閾値を調整すればより近い位置での予測ができると見込まれます。

一方で、例えば移動平均窓長0でのXMEAS(20)の異常スコアを見ると、閾値の調整では正解に近づけることが難しいと思われます。

次に、平均的に最も精度が高かった移動平均窓長50での各アルゴリズムの結果を見てみます。

XMEAS(18)のような、変化点に向かって安定的に推移し変化点においても変化が小さいデータに対し、ChangeFinderとRuptures(PELT)は特異スペクトル変換およびHotellingと比べると敏感に対応できていることがわかります。

また、XMEAS(1)、XMV(3)のような変化点が勾配の中間にあり尚且つ変化点以前の値と同程度の場合、基準値からの乖離度が異常スコアと直結するHotellingでは異常スコアが小さくなっていますが、特異スペクトル変換では正解点でも大きな異常スコアとなっており、データの値の大きさ以外の要素が考慮されていることが読み取れます。

ChangeFinderの図
ChangeFinder
Ruptures(PELT)の図
Ruptures(PELT)
特異スペクトル変換の図
特異スペクトル変換
Hotellingの図
Hotelling

終わりに

今回は、TEPデータに対し4つのアルゴリズムを適用した結果を簡単に見てみました。

ChangeFinderが最も高精度という結果ではありましたが、当然のことながら各種パラメータのチューニングによってどのアルゴリズムが最高精度となるかは変わる可能性があります。

また、入力データの前処理や異常スコアに対する後処理など、他にも精度改善に寄与し得る工夫はありますので、実際にこれらのアルゴリズムを試される際は是非ご検討ください。

参考文献

・[1] Yamanishi, K. and Takeuchi, J.: A Unifying Approach to Detecting Outliers and Change-Points from Non stationary Data, in Proceedings of the Eighth ACM SIGKDD International Conference on Data Mining and Knowledge Discovery, ACM Press (2002) 676-681.
https://doi.org/10.1145/775047.775148

・[2] 山西健司 (2009). データマイニングによる異常検知, 共立出版.

・[3] C. Truong, L. Oudre, N. Vayatis.: Selective review of offline change point detection methods. Signal Processing, 167:107299, (2020).
https://doi.org/10.1016/j.sigpro.2019.107299

・[4] R. Killick, P. Fearnhead, Optimal Detection of Changepoints With a Linear Computational Cost, Journal of the American Statistical Association 107 (500) (2012) 1590-1598.
https://doi.org/10.1080/01621459.2012.737745

・[5] 井出剛・杉山将 (2015). 異常検知と変化検知, 講談社.

・[6] J. J. Downs and E. F. Vogel: A plant-wide industrial process control problem, Computers & Chemical Engineering 17 (3) (1993) 245-255.
https://doi.org/10.1016/0098-1354(93)80018-I

・[7] Uchida, Y. and Fujiwara, K.: Process Fault Diagnosis Method Based on MSPC and LiNGAM and its Application to Tennessee Eastman Process, IFAC-PapersOnLine 55 (2) (2022) 384-389.
https://doi.org/10.1016/j.ifacol.2022.04.224