Machine Learning: Detecting Anomalies with Python
Anomaly detection is a great tool in helping engineers identify unusual patterns and deviations in system performance, which could indicate potential failures or inefficiencies.
By proactively detecting anomalies, engineers can build and maintain reliable systems, optimize resource management, and ensure operational efficiency.
This is the first in a two-part series exploring machine learning for anomaly detection. The accompanying post is titled: Machine Learning with a Vibration Sensor.
In this post, we’ll go through the steps of building an anomaly detection model in Python using tri-axial acceleration, orientation, and rotation data.
Topics covered include:
- Preprocessing and Feature Extraction
- The Raw Data
- Feature Representation
- Import Python Libraries
- Creating the Training Set
- Normalizing the Data
- Training
- Building the Model
- Model Architecture
- Adam Optimization
- Training the Model
- Testing/Evaluation
- Creating the Test Set
- Evaluating the Model
- Classifying Anomalies
- Summary
Preprocessing and Feature Extraction
The Raw Data
`lib.process.file` import decimal import idelib import endaq import pandas as pd def metadata(doc: idelib.dataset.Dataset) -> pd.DataFrame: """ Retrieve metadata about a Dataset object :param doc: an idelib.dataset.Dataset to extract metadata from :return: metadata to extract from Dataset object such as filename, sensor type, recording length, etc. """ filename = doc.name t0, t1 = (t * decimal.Decimal('1e-6') for t in (doc.sessions[0].firstTime, doc.sessions[0].lastTime)) recording_length = t1 - t0 serial_number_id = str(doc.recorderInfo['RecorderSerial']) sensor_type = doc.recorderInfo['PartNumber'] data = { "Name": filename, "Sensor Type": sensor_type, "Recording Length (seconds)": recording_length, "Serial Number": serial_number_id } return pd.DataFrame([data]) def sensor_info(doc) -> pd.DataFrame: """ Retrieve sensor info about a Dataset object :param doc: an idelib.dataset.Dataset to extract sensor information from :return: a table containing information of each sensor present in the Dataset object """ channel_table = endaq.ide.get_channel_table(doc).data return channel_table.drop(columns=['channel']) def get_sensor_data(doc, sensor: tuple[str, str]) -> pd.DataFrame: """ :param doc: an idelib.dataset.Dataset to extract sensor data from :param sensor: the sensor to select, as a tuple consisting of (name, type). See process.sensor_info to retrieve options. :return: a table containing the collected data from the sensor """ channel_table = endaq.ide.get_channel_table(doc).data sensor_names = zip(channel_table['name'].values, channel_table['type'].values) found = False # check if sensor is present in recording for s in sensor_names: if s[0] == sensor[0] and s[1] == sensor[1]: found = True break if not found: raise ValueError(f"The sensor {sensor} not present in {doc.name}") sensor_data = channel_table.loc[(channel_table['name'] == sensor[0]) & (channel_table['type'] == sensor[1])].squeeze() return endaq.ide.to_pandas(sensor_data.channel, time_mode="datetime", tz="utc") `lib.transform.ts` import numpy as np import pandas as pd def resample_ts(data: pd.DataFrame, rate: float): """ Resample a dataframe to `rate` :param data: a pandas dataframe with timestamps as indices and observed values at each timestamp as columns :param rate: the rate to resample the dataframe to, in seconds :return: the resampled dataframe """ rate_str = f'{rate}s' # resample the dataframe to specified rate resampled_df = data.resample(rate_str).mean() return resampled_df def split_series(ts: np.ndarray, segment_length: int, pad=None) -> np.ndarray: """ Split a time series into multiple time series of length `segment_length`. If last part of `ts` is shorter than `segment_length`, rest of series is padded with value `pad` (default is last value in series) :param ts: a np.ndarray of shape (n_timesteps, n_features) :param segment_length: the length of the shortened time series :return: a np.ndarray of shape (n_series, n_timesteps, n_features) """ start = 0 n = len(ts) data = [] while start < n: # check if what's left is less than segment_length if n - start < segment_length: pad_value = ts[-1] if pad is None else pad last_ts = np.concatenate((ts[start:], [pad_value for _ in range(segment_length - (n - start))])) data.append(last_ts) break split_ts = ts[start: start + segment_length] data.append(split_ts) start += segment_length return np.array(data)
The above contains several functions used for preprocessing.
The raw data comes in the form of IDE files. For this test set-up, we are using a simple structure (fan) as a representative example. We train on “good” fan data, then damage fan to see how good the model is at finding anomalies.
The data from the intact fan we will consider as normal. The files normal_1.ide
, normal_2.ide
, and normal_3.ide
are 10-minute recordings from each of the fan’s speed settings: lowest (1), middle (2), highest (3).
I have written a more in-depth blog regarding the test: Machine Learning with a Vibration Sensor.
Feature Representation
The model will be trained on multivariate time series containing tri-axle acceleration data, orientation, and rotation. Since the sampling rates of the accelerometer (20.0 kHz) and gyroscope (0.2 kHz) are different for this sensor, we’ll need to resample the collected data from each sensor to the same frequency. For this example, we chose to resample the data to 100 Hz. The exact frequency chosen here is arbitrary, but reduces the dimensionality of the data without losing information that could significantly impact the performance of the model.
The input to the autoencoder is a matrix of shape (n_samples, n_time_steps, n_features)
, where each sample represents a time series consisting of n_timesteps=10
and n_features=10
(acceleration and rotation have 3 axes each, and orientation is measured in quaternions with 4 axes). As a result, each sample will represent a time series that spans 100 milliseconds.
Import Python Libraries
In a Juypter notebook, we first import several Python libraries we will use:
from typing import List, Tuple import endaq from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler import tensorflow as tf from tensorflow.keras import layers import numpy as np import pandas as pd import matplotlib.pyplot as plt # local modules from lib.process import file as process from lib.ts import transform import ml.autoencoder as ae
Then, we’ll define a utility function to help extract data from multiple sensors:
def multi_ts(file, sensors: List[Tuple[str, str]], rate: float) -> pd.DataFrame: """ A helper to extract multivariate time series of 1/rate Hz from an IDE file :param file: the file to extract data from :param sensors: the set of sensors to extract data from :param rate: the rate, in seconds, to resample the data to :return: a dataframe containing sensor data resampled to 1/rate """ if len(sensors) == 0: raise ValueError("Please provide at least one sensor to extract data from") doc = endaq.ide.get_doc(file) dataframes = [] for sensor in sensors: try: sensor_data = process.get_sensor_data(doc, sensor) # resample to 1/rate Hz data_resampled = transform.resample_ts(sensor_data, rate) dataframes.append(data_resampled) except ValueError: # ignore if sensor not present in recording continue # concatenate dataframes along columns return pd.concat(dataframes, axis=1)
Creating the Training Set
We then construct the training dataset:
# set parameters rate = 0.01 segment_length = 10 sensors_to_get = [ ('X (40g)', 'Acceleration'), ('Y (40g)', 'Acceleration'), ('Z (40g)', 'Acceleration'), ('X', 'Rotation'), ('Y', 'Rotation'), ('Z', 'Rotation'), ('X', 'Quaternion'), ('Y', 'Quaternion'), ('Z', 'Quaternion'), ('W', 'Quaternion'), ('Internal Pressure', 'Pressure') # get multivariate data from each of the normal files normal_1 = multi_ts("../data/recordings/normal_1.ide", sensors=sensors_to_get, rate=rate) normal_2 = multi_ts("../data/recordings/normal_2.ide", sensors=sensors_to_get, rate=rate) normal_3 = multi_ts("../data/recordings/normal_3.ide", sensors=sensors_to_get, rate=rate) # some columns could include NaNs/missing values, so fill missing data with column mean normal_1 = normal_1.fillna(normal_1.mean()) normal_2 = normal_2.fillna(normal_2.mean()) normal_3 = normal_3.fillna(normal_3.mean()) # now split each dataset into multiple time series X1 = transform.split_series(normal_1.to_numpy(), segment_length) X2 = transform.split_series(normal_2.to_numpy(), segment_length) X3 = transform.split_series(normal_3.to_numpy(), segment_length) # series for training data # get multivariate data from each of the normal files normal_1 = multi_ts("../data/recordings/normal_1.ide", sensors=sensors_to_get, rate=rate) normal_2 = multi_ts("../data/recordings/normal_2.ide", sensors=sensors_to_get, rate=rate) normal_3 = multi_ts("../data/recordings/normal_3.ide", sensors=sensors_to_get, rate=rate) # some columns could include NaNs/missing values, so fill missing data with column mean normal_1 = normal_1.fillna(normal_1.mean()) normal_2 = normal_2.fillna(normal_2.mean()) normal_3 = normal_3.fillna(normal_3.mean()) # now split each dataset into multiple time series X1 = transform.split_series(normal_1.to_numpy(), segment_length) X2 = transform.split_series(normal_2.to_numpy(), segment_length) X3 = transform.split_series(normal_3.to_numpy(), segment_length) # combine sets into one X = np.concatenate([X1, X2, X3], axis=0) # create train and validation set X_train, X_val = train_test_split(X, test_size=0.2, random_state=42)
It is a common practice in machine learning to split a dataset into a train and validation set. Here, the validation set is a subset (20% of the entire size) of the normal dataset that the model will not encounter during training. We use the validation set to evaluate whether the trained model accurately identifies unseen data points that come from the same distribution of the training set as normal.
Normalizing the Data
It is often good practice to normalize the data before training the model, so that all feature values are on a common scale. Normalization also reduces the magnitude of the features, which benefits the algorithm when it calculates gradients during training in order to minimize the loss function.
def normalize(data): """ Normalize the input data :param data: the multivariate time series to normalize, a matrix of shape (n_samples, n_timesteps, n_features) :return: the multivariate time series with features normalized """ n_samples, n_timesteps, n_features = data.shape # reshape to 2D for normalization data_reshaped = data.reshape(-1, n_features) # create the scaler and transform data scaler = StandardScaler().fit(data_reshaped) data_normalized = scaler.transform(data_reshaped) # reshape data back to original shape return data_normalized.reshape(n_samples, n_timesteps, n_features) # normalize training and validation data X_train = normalize(X_train) X_val = normalize(X_val)
`ml.autoencoder` import numpy as np import tensorflow as tf from tensorflow.keras import layers, models import numpy as np import tensorflow as tf from tensorflow.keras import layers, models class Autoencoder(models.Model): def __init__(self, input_shape, latent_dim, encoder, decoder): """ A class for building autoencoders :param input_shape: the shape of the input :param latent_dim: the dimensions of the latent space :param encoder: the encoder model :param decoder: the decoder model """ super(Autoencoder, self).__init__() self.latent_dim = latent_dim self.input_shape = input_shape self.encoder = encoder self.decoder = decoder self.threshold = None def call(self, x): """ A method that passes the input through the autoencoder and returns the reconstructed input :param x: the input :return: the reconstructed input """ encoded = self.encoder(x) decoded = self.decoder(encoded) return decoded def reconstruction_errors(self, x, axis=0): """ A method that calculates the reconstruction error for each sample in x :param x: the data to calculate reconstruction errors for :param x: the axis to calculate reconstruction error along :return: an N-dimensional array containing reconstruction errors for the samples """ x_reconstructed = self.call(x) return np.mean((x - x_reconstructed) ** 2, axis=axis) def set_threshold(self, threshold): """ A method to set the threshold for which the Autoencoder classifies a data point as an anomaly :param threshold: the reconstruction error threshold :return: the new threshold """ self.threshold = threshold return self.threshold def classify(self, x, axis=0): """ A method to classify points in x as normal or anomalies :param x: the data to classify :param axis: the axis to calculation reconstruction error along :return: returns N-dimensional array of 1s and 0s where 1 is an inlier and 0 is an outlier """ if self.threshold is None: raise ValueError("Threshold is none. Set threshold first before classifying.") re_errors = self.reconstruction_errors(x, axis=axis) return (re_errors <= self.threshold).astype(int)
We then define the architecture for the model:
# define architecture for model encoder = tf.keras.Sequential([ layers.Input(shape=(X_train.shape[1], X_train.shape[2])), layers.LSTM(128, activation="softsign", return_sequences=True), layers.LSTM(32, activation="softsign", return_sequences=False) ]) decoder = tf.keras.Sequential([ layers.RepeatVector(X_train.shape[1]), layers.LSTM(32, activation="softsign", return_sequences=True), layers.LSTM(128, activation="softsign", return_sequences=True), layers.TimeDistributed(layers.Dense(X_train.shape[2])) ])
Note that the architecture will depend on the application the model is being used for. When developing a model, we will often test different architectures in an effort to curate a more accurate model.
Model Architecture
Here, the encoder has 2 LSTM layers with the softsign
activation function. The second LSTM layer is the latent layer, which has 32 nodes. The encoder compresses a sequence into a reduced context vector. The RepeatVector layer prepares the context vector to be the initial input into the decoder.
The decoder also has 2 LSTM layers activated with softsign
and then a final TimeDistributed Dense layer, which is used to ensure the output sequence is of the same shape as the input sequence.
Adam Optimization
After defining the architecture, we can now instantiate the model:
# instaniate model and configuration model = ae.Autoencoder(latent_dim=32, input_shape=(X_train.shape[1], X_train.shape[2]), encoder=encoder, decoder=decoder) model.compile(optimizer='adam', loss='mse')
The compile
method configures the model to use mean squared error as the objective function, and to use Adam optimization, an iterative algorithm that is often used to minimize the objective function when training neural networks.
Training the Model
Now, we train the model:
history = model.fit(X_train, X_train, epochs=300, validation_data=(X_val, X_val))
where the epochs parameter specifies the number of iterations to run the optimization algorithm for. On each training step, training and validation loss will be calculated and stored in the history
object. Once the model has been trained, we can plot the losses as shown below:
# plot training and validation loss plt.plot(history.history['loss'], label="Train Loss") plt.plot(history.history['val_loss'], label="Validation Loss") plt.title(f"Loss vs Epoch") plt.grid(True) plt.legend() plt.xlabel("Epoch") plt.ylabel("Loss (MSE)") plt.show()
Overall, for any good model, we should expect training and validation loss to decrease, signaling that the model is getting better at reconstructing the input (i.e. the model is “learning”).
Testing/Evaluation
Now, let’s see how the model performs when evaluated on data from the modified fans. First, we process the data from the files containing the test data.
Creating the Test Set
# get data from fan with blade broken broken_data = multi_ts("../data/recordings/broken.ide", sensors=sensors_to_get, rate=rate) # impute columns broken_data = broken_data.fillna(broken_data.mean()) # split into multiple time series X_broken = transform.split_series(broken_data.to_numpy(), segment_length) X_broken = normalize(X_broken) # get data from fan with mass attached mass_1 = multi_ts("../data/recordings/mass_1.ide", sensors=sensors_to_get, rate=rate) mass_2 = multi_ts("../data/recordings/mass_2.ide", sensors=sensors_to_get, rate=rate) mass_3 = multi_ts("../data/recordings/mass_3.ide", sensors=sensors_to_get, rate=rate) # some columns could include NaNs/missing values, so fill missing data with column mean mass_1 = mass_1.fillna(mass_1.mean()) mass_2 = mass_2.fillna(mass_2.mean()) mass_3 = mass_3.fillna(mass_3.mean()) # now split each dataset into multiple time series X_mass_1 = transform.split_series(mass_1.to_numpy(), segment_length) X_mass_2 = transform.split_series(mass_2.to_numpy(), segment_length) X_mass_3 = transform.split_series(mass_3.to_numpy(), segment_length) # combine sets into one and normalize X_test_1 = np.concatenate([X_mass_1, X_mass_2, X_mass_3], axis=0) X_test_1 = normalize(X_test_1) # get data from fan in closet closet_data = multi_ts("../data/recordings/closet.ide", sensors_to_get, rate=rate) # impute columns closet_data = closet_data.fillna(closet_data.mean()) # split into time series X_closet = transform.split_series(closet_data.to_numpy(), segment_length) X_closet = normalize(X_closet) # get data fan with brush obstructing blades brush_data = multi_ts("../data/recordings/brush.ide", sensors_to_get, rate=rate) # impute columns brush_data = brush_data.fillna(brush_data.mean()) # split into time series X_brush = transform.split_series(brush_data.to_numpy(), segment_length) X_brush = normalize(X_brush)
Defined below is a function that produces some summary metrics for how the model performs on data X
such as the average, standard deviation, of several quantiles of reconstruction errors that occur across samples. We should expect the values that come from the distribution of reconstruction errors for the modified fans to be considerably higher than the distribution of errors for the training and validation set.
Evaluating the Model
def summarize_metrics(model, X, decimal_places=5): """ Produce a few summary statistics from the trained model :param model: the trained model :param X: the data to calculate summary metrics for :param decimal_places: the number of decimal places to round output to :return: a dict containing summary statistics from model evaluated on data """ errs = model.reconstruction_errors(X, axis=(1, 2)) qs = [0.05, 0.25, 0.5, 0.75, 0.95] quantiles = np.quantile(errs, q=qs) mse = np.mean((X - model.call(X)) ** 2) std = np.std(errs) data = { "MSE": f"{mse:.{decimal_places}f}", "STD RE": f"{std:.{decimal_places}f}", "Min RE": f"{np.min(errs):.{decimal_places}f}", "Max RE": f"{np.max(errs):.{decimal_places}f}" } for i, q in enumerate(quantiles): data[f"{qs[i]}-q"] = q return data
We can then produce a dataframe that displays these summary metrics for the train, validation, and test data:
s_training = summarize_metrics(model, X_train) s_val = summarize_metrics(model, X_val) s_test_1 = summarize_metrics(model, X_test_1) s_test_2 = summarize_metrics(model, X_broken) s_test_3 = summarize_metrics(model, X_closet) s_test_4 = summarize_metrics(model, X_brush) lst = [s_training, s_val, s_test_1, s_test_2, s_test_3, s_test_4] indices = ["Train", "Validation", "Mass Attached", "Balde Broken", "Inside Closet", "Brush"] summary_data = pd.DataFrame(lst, index=indices)
The above plot shows the quantiles of reconstruction errors for the different datasets. As displayed above, the distribution of reconstruction errors for the training and validation set are similar, while those in the test sets for the modified fans are considerably different. Specifically, the reconstruction errors on the test sets are significantly larger than those from training and validation. The model therefore shows desired behavior, as it is reconstructing the time series from the normal fan better than how it reconstructs data from the test sets.
Classifying Anomalies
With the trained model, we can treat the reconstruction error on a particular sample as an anomaly score that we can use to categorize data. For instance, here we could set a threshold such that when the reconstruction error falls above the threshold, the data point (i.e. a series spanning 10 timesteps or 100 milliseconds) as an anomaly and those that fall below as normal. We could also create a more sophisticated system where we set multiple different thresholds, such that anomalies are identified in different levels.
If we consider the case of a simple threshold, we want to set a threshold such that the model classifies unseen normal data as normal frequently (let’s say above 95%) and anomalous data (e.g. that comes from the broken fan) as normal rarely (below 5%). The threshold can be chosen in different ways such as using trial-and-error or following mathematical properties of statistical distributions. For instance, if the reconstruction errors from the normal data followed the normal distribution, then taking the threshold to be 2 standard deviations above the average reconstruction error should result in 97.5% of data points from the intact fan being classified as normal.
Suppose here we take the threshold to be the 0.97-quantile (or 97th percentile) of reconstruction errors from the training set. So, we should expect the number of points classified as normal from a dataset with normal data to be about 97%. We’ll then use that threshold to classify data points as 1 (normal) or 0 (anomaly).
# set threshold train_errors = model.reconstruction_errors(X_train, axis=(1, 2)) threshold = np.quantile(train_errors, q=0.97) model.set_threshold(threshold) # classify points and get proportion for each train_labels = model.classify(X_train, axis=(1, 2)) val_labels = model.classify(X_val, axis=(1,2)) # mass attached test_1_labels = model.classify(X_test_1, axis=(1, 2)) # blade broken test_2_labels = model.classify(X_broken, axis=(1, 2)) # inside closet test_3_labels = model.classify(X_closet, axis=(1, 2)) # brush test_4_labels = model.classify(X_brush, axis=(1, 2)) # create dataframe showing proportion classified as Normal indices = ["Train", "Validation", "Mass Attached", "Blade Broken", "Inside Closet", "Brush"] lst = [train_labels, val_labels, test_1_labels, test_2_labels, test_3_labels, test_4_labels] data = [{"Proportion Classified as Normal": (labels == 1).mean()} for labels in lst] table = pd.DataFrame(data, index=indices) table
|
Proportion Classified as Normal |
---|---|
Train |
0.969933 |
Validation |
0.911690 |
Mass Attached |
0.000000 |
Blade Broken |
0.000000 |
Inside Closet |
0.000000 |
Brush |
0.000000 |
As shown above, the model performs relatively well at classifying normal data that it did not see during training (i.e. validation set) with 91.2% accuracy. On the other hand, the number of points it classifies as normal from the anomalous settings are all approximately 0.
Summary
By establishing a robust foundation for anomaly detection using Python, we have demonstrated the initial steps necessary for creating a machine-learning model tailored to identify irregularities in complex systems. In the 2nd post of this series (Machine Learning with a Vibration Sensor), we will delve deeper into the practical applications of these models, focusing on the development of a vibration sensor-based anomaly detection model to monitor equipment health and predict failures.