Building an Anomaly Detection Model in Python

0 Comments
Share This
Share This

Machine Learning: Detecting Anomalies with Python

Anomaly detection is a great tool in helping engineers identify unusual patterns and deviations in system performance, which could indicate potential failures or inefficiencies.

By proactively detecting anomalies, engineers can build and maintain reliable systems, optimize resource management, and ensure operational efficiency.

This is the first in a two-part series exploring machine learning for anomaly detection. The accompanying post is titled: Machine Learning with a Vibration Sensor.

In this post, we’ll go through the steps of building an anomaly detection model in Python using tri-axial acceleration, orientation, and rotation data.

Topics covered include:


Preprocessing and Feature Extraction

The Raw Data

`lib.process.file`

import decimal
import idelib
import endaq
import pandas as pd


def metadata(doc: idelib.dataset.Dataset) -> pd.DataFrame:
    """
    Retrieve metadata about a Dataset object
    :param doc: an idelib.dataset.Dataset to extract metadata from
    :return: metadata to extract from Dataset object such as filename, sensor type, recording length, etc.
    """
    filename = doc.name

    t0, t1 = (t * decimal.Decimal('1e-6') for t in (doc.sessions[0].firstTime, doc.sessions[0].lastTime))
    recording_length = t1 - t0
    serial_number_id = str(doc.recorderInfo['RecorderSerial'])
    sensor_type = doc.recorderInfo['PartNumber']

    data = {
        "Name": filename,
        "Sensor Type": sensor_type,
        "Recording Length (seconds)": recording_length,
        "Serial Number": serial_number_id
    }
    return pd.DataFrame([data])


def sensor_info(doc) -> pd.DataFrame:
    """
    Retrieve sensor info about a Dataset object

    :param doc: an idelib.dataset.Dataset to extract sensor information from
    :return: a table containing information of each sensor present in the Dataset object
    """
    channel_table = endaq.ide.get_channel_table(doc).data
    return channel_table.drop(columns=['channel'])


def get_sensor_data(doc, sensor: tuple[str, str]) -> pd.DataFrame:
    """

    :param doc: an idelib.dataset.Dataset to extract sensor data from
    :param sensor: the sensor to select, as a tuple consisting of (name, type). See process.sensor_info to retrieve options.
    :return: a table containing the collected data from the sensor
    """
    channel_table = endaq.ide.get_channel_table(doc).data

    sensor_names = zip(channel_table['name'].values, channel_table['type'].values)

    found = False

    # check if sensor is present in recording
    for s in sensor_names:
        if s[0] == sensor[0] and s[1] == sensor[1]:
            found = True
            break

    if not found:
        raise ValueError(f"The sensor {sensor} not present in {doc.name}")

    sensor_data = channel_table.loc[(channel_table['name'] == sensor[0]) & (channel_table['type'] == sensor[1])].squeeze()

    return endaq.ide.to_pandas(sensor_data.channel, time_mode="datetime", tz="utc")
    
 `lib.transform.ts`
import numpy as np
import pandas as pd
def resample_ts(data: pd.DataFrame, rate: float):
    """
    Resample a dataframe to `rate`
    :param data: a pandas dataframe with timestamps as indices and observed values at each timestamp as columns
    :param rate: the rate to resample the dataframe to, in seconds
    :return: the resampled dataframe
    """
    rate_str = f'{rate}s'
    # resample the dataframe to specified rate
    resampled_df = data.resample(rate_str).mean()
    return resampled_df
def split_series(ts: np.ndarray, segment_length: int, pad=None) -> np.ndarray:
    """
    Split a time series into multiple time series of length `segment_length`.
    If last part of `ts` is shorter than
    `segment_length`, rest of series is padded with value `pad` (default is last value in series)
    :param ts: a np.ndarray of shape (n_timesteps, n_features)
    :param segment_length: the length of the shortened time series
    :return: a np.ndarray of shape (n_series, n_timesteps, n_features)
    """
    start = 0
    n = len(ts)
    data = []
    while start < n:
        # check if what's left is less than segment_length
        if n - start < segment_length:
            pad_value = ts[-1] if pad is None else pad
            last_ts = np.concatenate((ts[start:], [pad_value for _ in range(segment_length - (n - start))]))
            data.append(last_ts)
            break
        split_ts = ts[start: start + segment_length]
        data.append(split_ts)
        start += segment_length
    return np.array(data)   
    
    
    
    

The above contains several functions used for preprocessing.

The raw data comes in the form of IDE files. For this test set-up, we are using a simple structure (fan) as a representative example. We train on “good” fan data, then damage fan to see how good the model is at finding anomalies.

The data from the intact fan we will consider as normal. The files normal_1.ide, normal_2.ide, and normal_3.ide are 10-minute recordings from each of the fan’s speed settings: lowest (1), middle (2), highest (3).

I have written a more in-depth blog regarding the test: Machine Learning with a Vibration Sensor.

Feature Representation

The model will be trained on multivariate time series containing tri-axle acceleration data, orientation, and rotation. Since the sampling rates of the accelerometer (20.0 kHz) and gyroscope (0.2 kHz) are different for this sensor, we’ll need to resample the collected data from each sensor to the same frequency. For this example, we chose to resample the data to 100 Hz. The exact frequency chosen here is arbitrary, but reduces the dimensionality of the data without losing information that could significantly impact the performance of the model.

The input to the autoencoder is a matrix of shape (n_samples, n_time_steps, n_features), where each sample represents a time series consisting of n_timesteps=10 and n_features=10 (acceleration and rotation have 3 axes each, and orientation is measured in quaternions with 4 axes). As a result, each sample will represent a time series that spans 100 milliseconds.


Import Python Libraries

In a Juypter notebook, we first import several Python libraries we will use:

from typing import List, Tuple

import endaq 
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf 
from tensorflow.keras import layers 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 

# local modules 
from lib.process import file as process 
from lib.ts import transform 
import ml.autoencoder as ae



Then, we’ll define a utility function to help extract data from multiple sensors:

def multi_ts(file, sensors: List[Tuple[str, str]], rate: float) -> pd.DataFrame:
    """
    A helper to extract multivariate time series of 1/rate Hz from an IDE file 
    
    :param file: the file to extract data from
    :param sensors: the set of sensors to extract data from 
    :param rate: the rate, in seconds, to resample the data to 
    :return: a dataframe containing sensor data resampled to 1/rate
    """
    if len(sensors) == 0:
        raise ValueError("Please provide at least one sensor to extract data from")
    
    doc = endaq.ide.get_doc(file)
    dataframes = [] 
    
    for sensor in sensors:
        try:
            sensor_data = process.get_sensor_data(doc, sensor)
            
            # resample to 1/rate Hz
            data_resampled = transform.resample_ts(sensor_data, rate)
            dataframes.append(data_resampled)
        except ValueError:
            # ignore if sensor not present in recording
            continue 
            
    # concatenate dataframes along columns 
    return pd.concat(dataframes, axis=1)



Creating the Training Set

We then construct the training dataset:

# set parameters 
rate = 0.01 
segment_length = 10 
sensors_to_get = [
    ('X (40g)', 'Acceleration'),
    ('Y (40g)', 'Acceleration'),
    ('Z (40g)', 'Acceleration'),
    ('X', 'Rotation'),
    ('Y', 'Rotation'),
    ('Z', 'Rotation'),
    ('X', 'Quaternion'),
    ('Y', 'Quaternion'),
    ('Z', 'Quaternion'),
    ('W', 'Quaternion'),
    ('Internal Pressure', 'Pressure')

# get multivariate data from each of the normal files 
normal_1 = multi_ts("../data/recordings/normal_1.ide", sensors=sensors_to_get, rate=rate)
normal_2 = multi_ts("../data/recordings/normal_2.ide", sensors=sensors_to_get, rate=rate)
normal_3 = multi_ts("../data/recordings/normal_3.ide", sensors=sensors_to_get, rate=rate)

# some columns could include NaNs/missing values, so fill missing data with column mean
normal_1 = normal_1.fillna(normal_1.mean())
normal_2 = normal_2.fillna(normal_2.mean())
normal_3 = normal_3.fillna(normal_3.mean())

# now split each dataset into multiple time series 
X1 = transform.split_series(normal_1.to_numpy(), segment_length)
X2 = transform.split_series(normal_2.to_numpy(), segment_length)
X3 = transform.split_series(normal_3.to_numpy(), segment_length)

# series for training data 
# get multivariate data from each of the normal files 
normal_1 = multi_ts("../data/recordings/normal_1.ide", sensors=sensors_to_get, rate=rate)
normal_2 = multi_ts("../data/recordings/normal_2.ide", sensors=sensors_to_get, rate=rate)
normal_3 = multi_ts("../data/recordings/normal_3.ide", sensors=sensors_to_get, rate=rate)

# some columns could include NaNs/missing values, so fill missing data with column mean
normal_1 = normal_1.fillna(normal_1.mean())
normal_2 = normal_2.fillna(normal_2.mean())
normal_3 = normal_3.fillna(normal_3.mean())

# now split each dataset into multiple time series 
X1 = transform.split_series(normal_1.to_numpy(), segment_length)
X2 = transform.split_series(normal_2.to_numpy(), segment_length)
X3 = transform.split_series(normal_3.to_numpy(), segment_length)

# combine sets into one 
X = np.concatenate([X1, X2, X3], axis=0)

# create train and validation set 
X_train, X_val = train_test_split(X, test_size=0.2, random_state=42)

It is a common practice in machine learning to split a dataset into a train and validation set. Here, the validation set is a subset (20% of the entire size) of the normal dataset that the model will not encounter during training. We use the validation set to evaluate whether the trained model accurately identifies unseen data points that come from the same distribution of the training set as normal.

Normalizing the Data

It is often good practice to normalize the data before training the model, so that all feature values are on a common scale. Normalization also reduces the magnitude of the features, which benefits the algorithm when it calculates gradients during training in order to minimize the loss function.

def normalize(data):
    """
    Normalize the input data
    :param data: the multivariate time series to normalize, a matrix of shape (n_samples, n_timesteps, n_features)
    :return: the multivariate time series with features normalized
    """
    n_samples, n_timesteps, n_features = data.shape 
    
    # reshape to 2D for normalization 
    data_reshaped = data.reshape(-1, n_features)
    
    # create the scaler and transform data
    scaler = StandardScaler().fit(data_reshaped)
    data_normalized = scaler.transform(data_reshaped)
    
    # reshape data back to original shape
    return data_normalized.reshape(n_samples, n_timesteps, n_features)  

# normalize training and validation data
X_train = normalize(X_train)
X_val = normalize(X_val)




Training

Building the Model

We then proceed to build the autoencoder using TensorFlow, an open-source library for machine learning and artificial intelligence. We’ll first create an Autoencoder :

`ml.autoencoder`
import numpy as np 
import tensorflow as tf 
from tensorflow.keras import layers, models 

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models


class Autoencoder(models.Model):
    def __init__(self, input_shape, latent_dim, encoder, decoder):
        """
        A class for building autoencoders

        :param input_shape: the shape of the input
        :param latent_dim: the dimensions of the latent space
        :param encoder: the encoder model
        :param decoder: the decoder model
        """
        super(Autoencoder, self).__init__()
        self.latent_dim = latent_dim
        self.input_shape = input_shape

        self.encoder = encoder
        self.decoder = decoder

        self.threshold = None

    def call(self, x):
        """
        A method that passes the input through the autoencoder and returns the reconstructed input

        :param x: the input
        :return: the reconstructed input
        """
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)

        return decoded

    def reconstruction_errors(self, x, axis=0):
        """
        A method that calculates the reconstruction error for each sample in x
        :param x: the data to calculate reconstruction errors for
        :param x: the axis to calculate reconstruction error along
        :return: an N-dimensional array containing reconstruction errors for the samples
        """

        x_reconstructed = self.call(x)
        return np.mean((x - x_reconstructed) ** 2, axis=axis)

    def set_threshold(self, threshold):
        """
        A method to set the threshold for which the Autoencoder classifies a data point as an anomaly

        :param threshold: the reconstruction error threshold
        :return: the new threshold
        """

        self.threshold = threshold
        return self.threshold

    def classify(self, x, axis=0):
        """
        A method to classify points in x as normal or anomalies

        :param x: the data to classify
        :param axis: the axis to calculation reconstruction error along
        :return: returns N-dimensional array of 1s and 0s where 1 is an inlier and 0 is an outlier
        """

        if self.threshold is None:
            raise ValueError("Threshold is none. Set threshold first before classifying.")

        re_errors = self.reconstruction_errors(x, axis=axis)

        return (re_errors <= self.threshold).astype(int)
        
        

We then define the architecture for the model:

# define architecture for model 
encoder = tf.keras.Sequential([
    layers.Input(shape=(X_train.shape[1], X_train.shape[2])), 
    layers.LSTM(128, activation="softsign", return_sequences=True), 
    layers.LSTM(32, activation="softsign", return_sequences=False)
])

decoder = tf.keras.Sequential([
    layers.RepeatVector(X_train.shape[1]), 
    layers.LSTM(32, activation="softsign", return_sequences=True), 
    layers.LSTM(128, activation="softsign", return_sequences=True), 
    layers.TimeDistributed(layers.Dense(X_train.shape[2]))
])



Note that the architecture will depend on the application the model is being used for. When developing a model, we will often test different architectures in an effort to curate a more accurate model.

Model Architecture

Here, the encoder has 2 LSTM layers with the softsign activation function. The second LSTM layer is the latent layer, which has 32 nodes. The encoder compresses a sequence into a reduced context vector. The RepeatVector layer prepares the context vector to be the initial input into the decoder.

The decoder also has 2 LSTM layers activated with softsign and then a final TimeDistributed Dense layer, which is used to ensure the output sequence is of the same shape as the input sequence.

Adam Optimization

After defining the architecture, we can now instantiate the model:

# instaniate model and configuration
model = ae.Autoencoder(latent_dim=32, input_shape=(X_train.shape[1], X_train.shape[2]), encoder=encoder, decoder=decoder)
model.compile(optimizer='adam', loss='mse')



The compile method configures the model to use mean squared error as the objective function, and to use Adam optimization, an iterative algorithm that is often used to minimize the objective function when training neural networks.

Training the Model

Now, we train the model:

history = model.fit(X_train, X_train, epochs=300, validation_data=(X_val, X_val))



where the epochs parameter specifies the number of iterations to run the optimization algorithm for. On each training step, training and validation loss will be calculated and stored in the history object. Once the model has been trained, we can plot the losses as shown below:

# plot training and validation loss 
plt.plot(history.history['loss'], label="Train Loss")
plt.plot(history.history['val_loss'], label="Validation Loss")
plt.title(f"Loss vs Epoch")
plt.grid(True)
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("Loss (MSE)")
plt.show()



Loss vs Epoch chart

Overall, for any good model, we should expect training and validation loss to decrease, signaling that the model is getting better at reconstructing the input (i.e. the model is “learning”).


Testing/Evaluation

Now, let’s see how the model performs when evaluated on data from the modified fans. First, we process the data from the files containing the test data.

Creating the Test Set

# get data from fan with blade broken 
broken_data = multi_ts("../data/recordings/broken.ide", sensors=sensors_to_get, rate=rate)

# impute columns 
broken_data = broken_data.fillna(broken_data.mean())

# split into multiple time series
X_broken = transform.split_series(broken_data.to_numpy(), segment_length)
X_broken = normalize(X_broken)

# get data from fan with mass attached
mass_1 = multi_ts("../data/recordings/mass_1.ide", sensors=sensors_to_get, rate=rate)
mass_2 = multi_ts("../data/recordings/mass_2.ide", sensors=sensors_to_get, rate=rate)
mass_3 = multi_ts("../data/recordings/mass_3.ide", sensors=sensors_to_get, rate=rate)

# some columns could include NaNs/missing values, so fill missing data with column mean
mass_1 = mass_1.fillna(mass_1.mean())
mass_2 = mass_2.fillna(mass_2.mean())
mass_3 = mass_3.fillna(mass_3.mean())

# now split each dataset into multiple time series 
X_mass_1 = transform.split_series(mass_1.to_numpy(), segment_length)
X_mass_2 = transform.split_series(mass_2.to_numpy(), segment_length)
X_mass_3 = transform.split_series(mass_3.to_numpy(), segment_length)

# combine sets into one and normalize
X_test_1 = np.concatenate([X_mass_1, X_mass_2, X_mass_3], axis=0)
X_test_1 = normalize(X_test_1)

# get data from fan in closet 
closet_data = multi_ts("../data/recordings/closet.ide", sensors_to_get, rate=rate)

# impute columns 
closet_data = closet_data.fillna(closet_data.mean())

# split into time series 
X_closet = transform.split_series(closet_data.to_numpy(), segment_length)
X_closet = normalize(X_closet)

# get data fan with brush obstructing blades
brush_data = multi_ts("../data/recordings/brush.ide", sensors_to_get, rate=rate)

# impute columns 
brush_data = brush_data.fillna(brush_data.mean())

# split into time series
X_brush = transform.split_series(brush_data.to_numpy(), segment_length)
X_brush = normalize(X_brush)

Defined below is a function that produces some summary metrics for how the model performs on data X such as the average, standard deviation, of several quantiles of reconstruction errors that occur across samples. We should expect the values that come from the distribution of reconstruction errors for the modified fans to be considerably higher than the distribution of errors for the training and validation set.

Evaluating the Model

def summarize_metrics(model, X, decimal_places=5):
    """
    Produce a few summary statistics from the trained model 
    :param model: the trained model
    :param X: the data to calculate summary metrics for
    :param decimal_places: the number of decimal places to round output to 
    :return: a dict containing summary statistics from model evaluated on data
    """
    errs = model.reconstruction_errors(X, axis=(1, 2))

    qs = [0.05, 0.25, 0.5, 0.75, 0.95]
    quantiles = np.quantile(errs, q=qs)
    mse = np.mean((X - model.call(X)) ** 2)
    std = np.std(errs)

    data = {
        "MSE": f"{mse:.{decimal_places}f}",
        "STD RE": f"{std:.{decimal_places}f}",
        "Min RE": f"{np.min(errs):.{decimal_places}f}",
        "Max RE": f"{np.max(errs):.{decimal_places}f}"
    }

    for i, q in enumerate(quantiles):
        data[f"{qs[i]}-q"] = q

    return data



We can then produce a dataframe that displays these summary metrics for the train, validation, and test data:

s_training = summarize_metrics(model, X_train)
s_val = summarize_metrics(model, X_val)

s_test_1 = summarize_metrics(model, X_test_1)
s_test_2 = summarize_metrics(model, X_broken)
s_test_3 = summarize_metrics(model, X_closet)
s_test_4 = summarize_metrics(model, X_brush)

lst = [s_training, s_val, s_test_1, s_test_2, s_test_3, s_test_4]
indices = ["Train", "Validation", "Mass Attached", "Balde Broken", "Inside Closet", "Brush"]
summary_data = pd.DataFrame(lst, index=indices)



Reconstruction error quantities

The above plot shows the quantiles of reconstruction errors for the different datasets. As displayed above, the distribution of reconstruction errors for the training and validation set are similar, while those in the test sets for the modified fans are considerably different. Specifically, the reconstruction errors on the test sets are significantly larger than those from training and validation. The model therefore shows desired behavior, as it is reconstructing the time series from the normal fan better than how it reconstructs data from the test sets.

Classifying Anomalies

With the trained model, we can treat the reconstruction error on a particular sample as an anomaly score that we can use to categorize data. For instance, here we could set a threshold such that when the reconstruction error falls above the threshold, the data point (i.e. a series spanning 10 timesteps or 100 milliseconds) as an anomaly and those that fall below as normal. We could also create a more sophisticated system where we set multiple different thresholds, such that anomalies are identified in different levels.

If we consider the case of a simple threshold, we want to set a threshold such that the model classifies unseen normal data as normal frequently (let’s say above 95%) and anomalous data (e.g. that comes from the broken fan) as normal rarely (below 5%). The threshold can be chosen in different ways such as using trial-and-error or following mathematical properties of statistical distributions. For instance, if the reconstruction errors from the normal data followed the normal distribution, then taking the threshold to be 2 standard deviations above the average reconstruction error should result in 97.5% of data points from the intact fan being classified as normal.

Suppose here we take the threshold to be the 0.97-quantile (or 97th percentile) of reconstruction errors from the training set. So, we should expect the number of points classified as normal from a dataset with normal data to be about 97%. We’ll then use that threshold to classify data points as 1 (normal) or 0 (anomaly).

# set threshold
train_errors = model.reconstruction_errors(X_train, axis=(1, 2))
threshold = np.quantile(train_errors, q=0.97)
model.set_threshold(threshold)

# classify points and get proportion for each 
train_labels = model.classify(X_train, axis=(1, 2))
val_labels = model.classify(X_val, axis=(1,2))

# mass attached
test_1_labels = model.classify(X_test_1, axis=(1, 2))
# blade broken 
test_2_labels = model.classify(X_broken, axis=(1, 2))
# inside closet 
test_3_labels = model.classify(X_closet, axis=(1, 2))
# brush
test_4_labels = model.classify(X_brush, axis=(1, 2))

# create dataframe showing proportion classified as Normal
indices = ["Train", "Validation", "Mass Attached", "Blade Broken", "Inside Closet", "Brush"]
lst = [train_labels, val_labels, test_1_labels, test_2_labels, test_3_labels, test_4_labels]

data = [{"Proportion Classified as Normal": (labels == 1).mean()} for labels in lst]
table = pd.DataFrame(data, index=indices)
table



 

Proportion Classified as Normal

Train

0.969933

Validation

0.911690

Mass Attached

0.000000

Blade Broken

0.000000

Inside Closet

0.000000

Brush

0.000000

As shown above, the model performs relatively well at classifying normal data that it did not see during training (i.e. validation set) with 91.2% accuracy. On the other hand, the number of points it classifies as normal from the anomalous settings are all approximately 0.


Summary

By establishing a robust foundation for anomaly detection using Python, we have demonstrated the initial steps necessary for creating a machine-learning model tailored to identify irregularities in complex systems. In the 2nd post of this series (Machine Learning with a Vibration Sensor), we will delve deeper into the practical applications of these models, focusing on the development of a vibration sensor-based anomaly detection model to monitor equipment health and predict failures.

Download This Post (PDF)

 

Share This

Pete Scheidler

Pete has over 15 years of electrical engineering experience, mostly with signal processing and firmware.

Building an Anomaly Detection Model in Python

Posted by Pete Scheidler on Sep 4, 2024 11:43:08 AM
Pete Scheidler
Find me on:

Machine Learning: Detecting Anomalies with Python

Anomaly detection is a great tool in helping engineers identify unusual patterns and deviations in system performance, which could indicate potential failures or inefficiencies.

By proactively detecting anomalies, engineers can build and maintain reliable systems, optimize resource management, and ensure operational efficiency.

This is the first in a two-part series exploring machine learning for anomaly detection. The accompanying post is titled: Machine Learning with a Vibration Sensor.

In this post, we’ll go through the steps of building an anomaly detection model in Python using tri-axial acceleration, orientation, and rotation data.

Topics covered include:


Preprocessing and Feature Extraction

The Raw Data

`lib.process.file`

import decimal
import idelib
import endaq
import pandas as pd


def metadata(doc: idelib.dataset.Dataset) -> pd.DataFrame:
    """
    Retrieve metadata about a Dataset object
    :param doc: an idelib.dataset.Dataset to extract metadata from
    :return: metadata to extract from Dataset object such as filename, sensor type, recording length, etc.
    """
    filename = doc.name

    t0, t1 = (t * decimal.Decimal('1e-6') for t in (doc.sessions[0].firstTime, doc.sessions[0].lastTime))
    recording_length = t1 - t0
    serial_number_id = str(doc.recorderInfo['RecorderSerial'])
    sensor_type = doc.recorderInfo['PartNumber']

    data = {
        "Name": filename,
        "Sensor Type": sensor_type,
        "Recording Length (seconds)": recording_length,
        "Serial Number": serial_number_id
    }
    return pd.DataFrame([data])


def sensor_info(doc) -> pd.DataFrame:
    """
    Retrieve sensor info about a Dataset object

    :param doc: an idelib.dataset.Dataset to extract sensor information from
    :return: a table containing information of each sensor present in the Dataset object
    """
    channel_table = endaq.ide.get_channel_table(doc).data
    return channel_table.drop(columns=['channel'])


def get_sensor_data(doc, sensor: tuple[str, str]) -> pd.DataFrame:
    """

    :param doc: an idelib.dataset.Dataset to extract sensor data from
    :param sensor: the sensor to select, as a tuple consisting of (name, type). See process.sensor_info to retrieve options.
    :return: a table containing the collected data from the sensor
    """
    channel_table = endaq.ide.get_channel_table(doc).data

    sensor_names = zip(channel_table['name'].values, channel_table['type'].values)

    found = False

    # check if sensor is present in recording
    for s in sensor_names:
        if s[0] == sensor[0] and s[1] == sensor[1]:
            found = True
            break

    if not found:
        raise ValueError(f"The sensor {sensor} not present in {doc.name}")

    sensor_data = channel_table.loc[(channel_table['name'] == sensor[0]) & (channel_table['type'] == sensor[1])].squeeze()

    return endaq.ide.to_pandas(sensor_data.channel, time_mode="datetime", tz="utc")
    
 `lib.transform.ts`
import numpy as np
import pandas as pd
def resample_ts(data: pd.DataFrame, rate: float):
    """
    Resample a dataframe to `rate`
    :param data: a pandas dataframe with timestamps as indices and observed values at each timestamp as columns
    :param rate: the rate to resample the dataframe to, in seconds
    :return: the resampled dataframe
    """
    rate_str = f'{rate}s'
    # resample the dataframe to specified rate
    resampled_df = data.resample(rate_str).mean()
    return resampled_df
def split_series(ts: np.ndarray, segment_length: int, pad=None) -> np.ndarray:
    """
    Split a time series into multiple time series of length `segment_length`.
    If last part of `ts` is shorter than
    `segment_length`, rest of series is padded with value `pad` (default is last value in series)
    :param ts: a np.ndarray of shape (n_timesteps, n_features)
    :param segment_length: the length of the shortened time series
    :return: a np.ndarray of shape (n_series, n_timesteps, n_features)
    """
    start = 0
    n = len(ts)
    data = []
    while start < n:
        # check if what's left is less than segment_length
        if n - start < segment_length:
            pad_value = ts[-1] if pad is None else pad
            last_ts = np.concatenate((ts[start:], [pad_value for _ in range(segment_length - (n - start))]))
            data.append(last_ts)
            break
        split_ts = ts[start: start + segment_length]
        data.append(split_ts)
        start += segment_length
    return np.array(data)   
    
    
    
    

The above contains several functions used for preprocessing.

The raw data comes in the form of IDE files. For this test set-up, we are using a simple structure (fan) as a representative example. We train on “good” fan data, then damage fan to see how good the model is at finding anomalies.

The data from the intact fan we will consider as normal. The files normal_1.ide, normal_2.ide, and normal_3.ide are 10-minute recordings from each of the fan’s speed settings: lowest (1), middle (2), highest (3).

I have written a more in-depth blog regarding the test: Machine Learning with a Vibration Sensor.

Feature Representation

The model will be trained on multivariate time series containing tri-axle acceleration data, orientation, and rotation. Since the sampling rates of the accelerometer (20.0 kHz) and gyroscope (0.2 kHz) are different for this sensor, we’ll need to resample the collected data from each sensor to the same frequency. For this example, we chose to resample the data to 100 Hz. The exact frequency chosen here is arbitrary, but reduces the dimensionality of the data without losing information that could significantly impact the performance of the model.

The input to the autoencoder is a matrix of shape (n_samples, n_time_steps, n_features), where each sample represents a time series consisting of n_timesteps=10 and n_features=10 (acceleration and rotation have 3 axes each, and orientation is measured in quaternions with 4 axes). As a result, each sample will represent a time series that spans 100 milliseconds.


Import Python Libraries

In a Juypter notebook, we first import several Python libraries we will use:

from typing import List, Tuple

import endaq 
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf 
from tensorflow.keras import layers 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 

# local modules 
from lib.process import file as process 
from lib.ts import transform 
import ml.autoencoder as ae



Then, we’ll define a utility function to help extract data from multiple sensors:

def multi_ts(file, sensors: List[Tuple[str, str]], rate: float) -> pd.DataFrame:
    """
    A helper to extract multivariate time series of 1/rate Hz from an IDE file 
    
    :param file: the file to extract data from
    :param sensors: the set of sensors to extract data from 
    :param rate: the rate, in seconds, to resample the data to 
    :return: a dataframe containing sensor data resampled to 1/rate
    """
    if len(sensors) == 0:
        raise ValueError("Please provide at least one sensor to extract data from")
    
    doc = endaq.ide.get_doc(file)
    dataframes = [] 
    
    for sensor in sensors:
        try:
            sensor_data = process.get_sensor_data(doc, sensor)
            
            # resample to 1/rate Hz
            data_resampled = transform.resample_ts(sensor_data, rate)
            dataframes.append(data_resampled)
        except ValueError:
            # ignore if sensor not present in recording
            continue 
            
    # concatenate dataframes along columns 
    return pd.concat(dataframes, axis=1)



Creating the Training Set

We then construct the training dataset:

# set parameters 
rate = 0.01 
segment_length = 10 
sensors_to_get = [
    ('X (40g)', 'Acceleration'),
    ('Y (40g)', 'Acceleration'),
    ('Z (40g)', 'Acceleration'),
    ('X', 'Rotation'),
    ('Y', 'Rotation'),
    ('Z', 'Rotation'),
    ('X', 'Quaternion'),
    ('Y', 'Quaternion'),
    ('Z', 'Quaternion'),
    ('W', 'Quaternion'),
    ('Internal Pressure', 'Pressure')

# get multivariate data from each of the normal files 
normal_1 = multi_ts("../data/recordings/normal_1.ide", sensors=sensors_to_get, rate=rate)
normal_2 = multi_ts("../data/recordings/normal_2.ide", sensors=sensors_to_get, rate=rate)
normal_3 = multi_ts("../data/recordings/normal_3.ide", sensors=sensors_to_get, rate=rate)

# some columns could include NaNs/missing values, so fill missing data with column mean
normal_1 = normal_1.fillna(normal_1.mean())
normal_2 = normal_2.fillna(normal_2.mean())
normal_3 = normal_3.fillna(normal_3.mean())

# now split each dataset into multiple time series 
X1 = transform.split_series(normal_1.to_numpy(), segment_length)
X2 = transform.split_series(normal_2.to_numpy(), segment_length)
X3 = transform.split_series(normal_3.to_numpy(), segment_length)

# series for training data 
# get multivariate data from each of the normal files 
normal_1 = multi_ts("../data/recordings/normal_1.ide", sensors=sensors_to_get, rate=rate)
normal_2 = multi_ts("../data/recordings/normal_2.ide", sensors=sensors_to_get, rate=rate)
normal_3 = multi_ts("../data/recordings/normal_3.ide", sensors=sensors_to_get, rate=rate)

# some columns could include NaNs/missing values, so fill missing data with column mean
normal_1 = normal_1.fillna(normal_1.mean())
normal_2 = normal_2.fillna(normal_2.mean())
normal_3 = normal_3.fillna(normal_3.mean())

# now split each dataset into multiple time series 
X1 = transform.split_series(normal_1.to_numpy(), segment_length)
X2 = transform.split_series(normal_2.to_numpy(), segment_length)
X3 = transform.split_series(normal_3.to_numpy(), segment_length)

# combine sets into one 
X = np.concatenate([X1, X2, X3], axis=0)

# create train and validation set 
X_train, X_val = train_test_split(X, test_size=0.2, random_state=42)

It is a common practice in machine learning to split a dataset into a train and validation set. Here, the validation set is a subset (20% of the entire size) of the normal dataset that the model will not encounter during training. We use the validation set to evaluate whether the trained model accurately identifies unseen data points that come from the same distribution of the training set as normal.

Normalizing the Data

It is often good practice to normalize the data before training the model, so that all feature values are on a common scale. Normalization also reduces the magnitude of the features, which benefits the algorithm when it calculates gradients during training in order to minimize the loss function.

def normalize(data):
    """
    Normalize the input data
    :param data: the multivariate time series to normalize, a matrix of shape (n_samples, n_timesteps, n_features)
    :return: the multivariate time series with features normalized
    """
    n_samples, n_timesteps, n_features = data.shape 
    
    # reshape to 2D for normalization 
    data_reshaped = data.reshape(-1, n_features)
    
    # create the scaler and transform data
    scaler = StandardScaler().fit(data_reshaped)
    data_normalized = scaler.transform(data_reshaped)
    
    # reshape data back to original shape
    return data_normalized.reshape(n_samples, n_timesteps, n_features)  

# normalize training and validation data
X_train = normalize(X_train)
X_val = normalize(X_val)




Training

Building the Model

We then proceed to build the autoencoder using TensorFlow, an open-source library for machine learning and artificial intelligence. We’ll first create an Autoencoder :

`ml.autoencoder`
import numpy as np 
import tensorflow as tf 
from tensorflow.keras import layers, models 

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models


class Autoencoder(models.Model):
    def __init__(self, input_shape, latent_dim, encoder, decoder):
        """
        A class for building autoencoders

        :param input_shape: the shape of the input
        :param latent_dim: the dimensions of the latent space
        :param encoder: the encoder model
        :param decoder: the decoder model
        """
        super(Autoencoder, self).__init__()
        self.latent_dim = latent_dim
        self.input_shape = input_shape

        self.encoder = encoder
        self.decoder = decoder

        self.threshold = None

    def call(self, x):
        """
        A method that passes the input through the autoencoder and returns the reconstructed input

        :param x: the input
        :return: the reconstructed input
        """
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)

        return decoded

    def reconstruction_errors(self, x, axis=0):
        """
        A method that calculates the reconstruction error for each sample in x
        :param x: the data to calculate reconstruction errors for
        :param x: the axis to calculate reconstruction error along
        :return: an N-dimensional array containing reconstruction errors for the samples
        """

        x_reconstructed = self.call(x)
        return np.mean((x - x_reconstructed) ** 2, axis=axis)

    def set_threshold(self, threshold):
        """
        A method to set the threshold for which the Autoencoder classifies a data point as an anomaly

        :param threshold: the reconstruction error threshold
        :return: the new threshold
        """

        self.threshold = threshold
        return self.threshold

    def classify(self, x, axis=0):
        """
        A method to classify points in x as normal or anomalies

        :param x: the data to classify
        :param axis: the axis to calculation reconstruction error along
        :return: returns N-dimensional array of 1s and 0s where 1 is an inlier and 0 is an outlier
        """

        if self.threshold is None:
            raise ValueError("Threshold is none. Set threshold first before classifying.")

        re_errors = self.reconstruction_errors(x, axis=axis)

        return (re_errors <= self.threshold).astype(int)
        
        

We then define the architecture for the model:

# define architecture for model 
encoder = tf.keras.Sequential([
    layers.Input(shape=(X_train.shape[1], X_train.shape[2])), 
    layers.LSTM(128, activation="softsign", return_sequences=True), 
    layers.LSTM(32, activation="softsign", return_sequences=False)
])

decoder = tf.keras.Sequential([
    layers.RepeatVector(X_train.shape[1]), 
    layers.LSTM(32, activation="softsign", return_sequences=True), 
    layers.LSTM(128, activation="softsign", return_sequences=True), 
    layers.TimeDistributed(layers.Dense(X_train.shape[2]))
])



Note that the architecture will depend on the application the model is being used for. When developing a model, we will often test different architectures in an effort to curate a more accurate model.

Model Architecture

Here, the encoder has 2 LSTM layers with the softsign activation function. The second LSTM layer is the latent layer, which has 32 nodes. The encoder compresses a sequence into a reduced context vector. The RepeatVector layer prepares the context vector to be the initial input into the decoder.

The decoder also has 2 LSTM layers activated with softsign and then a final TimeDistributed Dense layer, which is used to ensure the output sequence is of the same shape as the input sequence.

Adam Optimization

After defining the architecture, we can now instantiate the model:

# instaniate model and configuration
model = ae.Autoencoder(latent_dim=32, input_shape=(X_train.shape[1], X_train.shape[2]), encoder=encoder, decoder=decoder)
model.compile(optimizer='adam', loss='mse')



The compile method configures the model to use mean squared error as the objective function, and to use Adam optimization, an iterative algorithm that is often used to minimize the objective function when training neural networks.

Training the Model

Now, we train the model:

history = model.fit(X_train, X_train, epochs=300, validation_data=(X_val, X_val))



where the epochs parameter specifies the number of iterations to run the optimization algorithm for. On each training step, training and validation loss will be calculated and stored in the history object. Once the model has been trained, we can plot the losses as shown below:

# plot training and validation loss 
plt.plot(history.history['loss'], label="Train Loss")
plt.plot(history.history['val_loss'], label="Validation Loss")
plt.title(f"Loss vs Epoch")
plt.grid(True)
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("Loss (MSE)")
plt.show()



Loss vs Epoch chart

Overall, for any good model, we should expect training and validation loss to decrease, signaling that the model is getting better at reconstructing the input (i.e. the model is “learning”).


Testing/Evaluation

Now, let’s see how the model performs when evaluated on data from the modified fans. First, we process the data from the files containing the test data.

Creating the Test Set

# get data from fan with blade broken 
broken_data = multi_ts("../data/recordings/broken.ide", sensors=sensors_to_get, rate=rate)

# impute columns 
broken_data = broken_data.fillna(broken_data.mean())

# split into multiple time series
X_broken = transform.split_series(broken_data.to_numpy(), segment_length)
X_broken = normalize(X_broken)

# get data from fan with mass attached
mass_1 = multi_ts("../data/recordings/mass_1.ide", sensors=sensors_to_get, rate=rate)
mass_2 = multi_ts("../data/recordings/mass_2.ide", sensors=sensors_to_get, rate=rate)
mass_3 = multi_ts("../data/recordings/mass_3.ide", sensors=sensors_to_get, rate=rate)

# some columns could include NaNs/missing values, so fill missing data with column mean
mass_1 = mass_1.fillna(mass_1.mean())
mass_2 = mass_2.fillna(mass_2.mean())
mass_3 = mass_3.fillna(mass_3.mean())

# now split each dataset into multiple time series 
X_mass_1 = transform.split_series(mass_1.to_numpy(), segment_length)
X_mass_2 = transform.split_series(mass_2.to_numpy(), segment_length)
X_mass_3 = transform.split_series(mass_3.to_numpy(), segment_length)

# combine sets into one and normalize
X_test_1 = np.concatenate([X_mass_1, X_mass_2, X_mass_3], axis=0)
X_test_1 = normalize(X_test_1)

# get data from fan in closet 
closet_data = multi_ts("../data/recordings/closet.ide", sensors_to_get, rate=rate)

# impute columns 
closet_data = closet_data.fillna(closet_data.mean())

# split into time series 
X_closet = transform.split_series(closet_data.to_numpy(), segment_length)
X_closet = normalize(X_closet)

# get data fan with brush obstructing blades
brush_data = multi_ts("../data/recordings/brush.ide", sensors_to_get, rate=rate)

# impute columns 
brush_data = brush_data.fillna(brush_data.mean())

# split into time series
X_brush = transform.split_series(brush_data.to_numpy(), segment_length)
X_brush = normalize(X_brush)

Defined below is a function that produces some summary metrics for how the model performs on data X such as the average, standard deviation, of several quantiles of reconstruction errors that occur across samples. We should expect the values that come from the distribution of reconstruction errors for the modified fans to be considerably higher than the distribution of errors for the training and validation set.

Evaluating the Model

def summarize_metrics(model, X, decimal_places=5):
    """
    Produce a few summary statistics from the trained model 
    :param model: the trained model
    :param X: the data to calculate summary metrics for
    :param decimal_places: the number of decimal places to round output to 
    :return: a dict containing summary statistics from model evaluated on data
    """
    errs = model.reconstruction_errors(X, axis=(1, 2))

    qs = [0.05, 0.25, 0.5, 0.75, 0.95]
    quantiles = np.quantile(errs, q=qs)
    mse = np.mean((X - model.call(X)) ** 2)
    std = np.std(errs)

    data = {
        "MSE": f"{mse:.{decimal_places}f}",
        "STD RE": f"{std:.{decimal_places}f}",
        "Min RE": f"{np.min(errs):.{decimal_places}f}",
        "Max RE": f"{np.max(errs):.{decimal_places}f}"
    }

    for i, q in enumerate(quantiles):
        data[f"{qs[i]}-q"] = q

    return data



We can then produce a dataframe that displays these summary metrics for the train, validation, and test data:

s_training = summarize_metrics(model, X_train)
s_val = summarize_metrics(model, X_val)

s_test_1 = summarize_metrics(model, X_test_1)
s_test_2 = summarize_metrics(model, X_broken)
s_test_3 = summarize_metrics(model, X_closet)
s_test_4 = summarize_metrics(model, X_brush)

lst = [s_training, s_val, s_test_1, s_test_2, s_test_3, s_test_4]
indices = ["Train", "Validation", "Mass Attached", "Balde Broken", "Inside Closet", "Brush"]
summary_data = pd.DataFrame(lst, index=indices)



Reconstruction error quantities

The above plot shows the quantiles of reconstruction errors for the different datasets. As displayed above, the distribution of reconstruction errors for the training and validation set are similar, while those in the test sets for the modified fans are considerably different. Specifically, the reconstruction errors on the test sets are significantly larger than those from training and validation. The model therefore shows desired behavior, as it is reconstructing the time series from the normal fan better than how it reconstructs data from the test sets.

Classifying Anomalies

With the trained model, we can treat the reconstruction error on a particular sample as an anomaly score that we can use to categorize data. For instance, here we could set a threshold such that when the reconstruction error falls above the threshold, the data point (i.e. a series spanning 10 timesteps or 100 milliseconds) as an anomaly and those that fall below as normal. We could also create a more sophisticated system where we set multiple different thresholds, such that anomalies are identified in different levels.

If we consider the case of a simple threshold, we want to set a threshold such that the model classifies unseen normal data as normal frequently (let’s say above 95%) and anomalous data (e.g. that comes from the broken fan) as normal rarely (below 5%). The threshold can be chosen in different ways such as using trial-and-error or following mathematical properties of statistical distributions. For instance, if the reconstruction errors from the normal data followed the normal distribution, then taking the threshold to be 2 standard deviations above the average reconstruction error should result in 97.5% of data points from the intact fan being classified as normal.

Suppose here we take the threshold to be the 0.97-quantile (or 97th percentile) of reconstruction errors from the training set. So, we should expect the number of points classified as normal from a dataset with normal data to be about 97%. We’ll then use that threshold to classify data points as 1 (normal) or 0 (anomaly).

# set threshold
train_errors = model.reconstruction_errors(X_train, axis=(1, 2))
threshold = np.quantile(train_errors, q=0.97)
model.set_threshold(threshold)

# classify points and get proportion for each 
train_labels = model.classify(X_train, axis=(1, 2))
val_labels = model.classify(X_val, axis=(1,2))

# mass attached
test_1_labels = model.classify(X_test_1, axis=(1, 2))
# blade broken 
test_2_labels = model.classify(X_broken, axis=(1, 2))
# inside closet 
test_3_labels = model.classify(X_closet, axis=(1, 2))
# brush
test_4_labels = model.classify(X_brush, axis=(1, 2))

# create dataframe showing proportion classified as Normal
indices = ["Train", "Validation", "Mass Attached", "Blade Broken", "Inside Closet", "Brush"]
lst = [train_labels, val_labels, test_1_labels, test_2_labels, test_3_labels, test_4_labels]

data = [{"Proportion Classified as Normal": (labels == 1).mean()} for labels in lst]
table = pd.DataFrame(data, index=indices)
table



 

Proportion Classified as Normal

Train

0.969933

Validation

0.911690

Mass Attached

0.000000

Blade Broken

0.000000

Inside Closet

0.000000

Brush

0.000000

As shown above, the model performs relatively well at classifying normal data that it did not see during training (i.e. validation set) with 91.2% accuracy. On the other hand, the number of points it classifies as normal from the anomalous settings are all approximately 0.


Summary

By establishing a robust foundation for anomaly detection using Python, we have demonstrated the initial steps necessary for creating a machine-learning model tailored to identify irregularities in complex systems. In the 2nd post of this series (Machine Learning with a Vibration Sensor), we will delve deeper into the practical applications of these models, focusing on the development of a vibration sensor-based anomaly detection model to monitor equipment health and predict failures.

Download This Post (PDF)

 

Topics: vibration analysis, Python, machine learning

By signing up you are agreeing to our Privacy Policy