# -*- coding: utf-8 -*-
'''
MAT-Tools: Python Framework for Multiple Aspect Trajectory Data Mining
The present package offers a tool, to support the user in the task of data analysis of multiple aspect trajectories. It integrates into a unique framework for multiple aspects trajectories and in general for multidimensional sequence data mining methods.
Copyright (C) 2022, MIT license (this portion of code is subject to licensing from source project distribution)
Created on Dec, 2021
Copyright (C) 2022, License GPL Version 3 or superior (this portion of code is subject to licensing from source project distribution)
Authors:
- Tarlis Portela
- Original source:
- Nicksson C. A. de Freitas,
- Ticiana L. Coelho da Silva,
- Jose António Fernandes de Macêdo,
- Leopoldo Melo Junior,
- Matheus Gomes Cordeiro
- Adapted from: https://github.com/nickssonfreitas/ICAART2021
'''
# --------------------------------------------------------------------------------
import time
import pandas as pd
import numpy as np
from numpy import argmax
from tqdm.auto import tqdm
import itertools
# --------------------------------------------------------------------------------
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from tensorflow.keras.layers import Dense, LSTM, GRU, Bidirectional, Concatenate, Add, Average, Embedding, Dropout, Input
from tensorflow.keras.initializers import he_normal, he_uniform
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.regularizers import l1
from tensorflow.keras import backend as K
# --------------------------------------------------------------------------------
from matclassification.methods._lib.datahandler import prepareTrajectories
from matclassification.methods.core import THSClassifier
[docs]
class DeepeST(THSClassifier):
"""
DeepeST: (Deep Learning for Sub-Trajectory classification)
The `DeepeST` class is a deep learning model for trajectory-based classification,
which extends the `THSClassifier`. It uses RNN-based architectures, such as
LSTM and BiLSTM, to handle spatial-temporal data.
Parameters
----------
rnn : list, default=['bilstm', 'lstm']
Types of recurrent neural networks to use ('bilstm' or 'lstm').
units : list, default=[100, 200, 300, 400, 500]
List of number of units for the recurrent layers.
merge_type : list, default=['concat']
How to merge embedding layers. Options: 'concat', 'add', 'avg'.
dropout_before_rnn : list, default=[0, 0.5]
Dropout rates applied before the recurrent layers.
dropout_after_rnn : list, default=[0.5]
Dropout rates applied after the recurrent layers.
embedding_size : list, default=[50, 100, 200, 300, 400]
Sizes for the embedding layers.
batch_size : list, default=[64]
Batch sizes for training the model.
epochs : list, default=[1000]
Number of epochs to train the model.
patience : list, default=[20]
Patience for early stopping based on monitored metric.
monitor : list, default=['val_acc']
Metric to monitor for early stopping.
optimizer : list, default=['ada']
Optimizer for training ('ada' for Adam, 'rmsprop' for RMSProp).
learning_rate : list, default=[0.001]
Learning rate for the optimizer.
loss : list, default=['CCE']
Loss function to use ('CCE' for categorical cross-entropy).
loss_parameters : list, default=[{}]
Additional parameters for the loss function.
y_one_hot_encoding : bool, default=True
Whether to one-hot encode the target labels.
save_results : bool, default=False
Whether to save results after execution.
n_jobs : int, default=-1
Number of parallel jobs for computations.
verbose : int, default=0
Verbosity level for output (0: silent, 1: progress).
random_state : int, default=42
Random seed for reproducibility.
filterwarnings : str, default='ignore'
Filter warnings during execution.
Methods
-------
xy(train, test, tid_col='tid', class_col='label', space_geohash=False, geo_precision=30, validate=False)
Prepares the trajectory data for training and testing, returning features and labels.
prepare_input(train, test, tid_col='tid', class_col='label', space_geohash=False, geo_precision=30, validate=False)
Prepares input features and configurations from the data for model training.
create(config)
Constructs the deep learning model using the given configuration.
fit(X_train, y_train, X_val, y_val, config=None)
Trains the model on the training data with validation using the specified configuration.
predict(X_test, y_test)
Generates predictions on the test data and computes performance metrics.
clear()
Resets the model and clears the Keras session to free memory.
"""
def __init__(self,
## GRID SEARCH PARAMETERS
rnn = ['bilstm', 'lstm'],
units = [100, 200, 300, 400, 500],
merge_type = ['concat'],
dropout_before_rnn=[0, 0.5],
dropout_after_rnn=[0.5],
embedding_size = [50, 100, 200, 300, 400],
batch_size = [64],
epochs = [1000],
patience = [20],
monitor = ['val_acc'],
optimizer = ['ada'],
learning_rate = [0.001],
loss = ['CCE'],
loss_parameters = [{}], # TODO unfix, it´s fixed for now, but if you add parameters, change all configs.
y_one_hot_encodding = True,
save_results=False,
n_jobs=-1,
verbose=0,
random_state=42,
filterwarnings='ignore'):
super().__init__('DeepeST', save_results=save_results, n_jobs=n_jobs, verbose=verbose, random_state=random_state, filterwarnings=filterwarnings)
self.add_config(rnn=rnn,
units=units,
merge_type=merge_type,
dropout_before_rnn=dropout_before_rnn,
dropout_after_rnn=dropout_after_rnn,
embedding_size=embedding_size,
batch_size=batch_size,
epochs=epochs,
patience=patience,
monitor=monitor,
optimizer=optimizer,
learning_rate=learning_rate,
loss=loss,
loss_parameters=loss_parameters,
y_one_hot_encodding=y_one_hot_encodding)
# Moved to prepare_input:
# self.grid = list(itertools.product())
self.model = None
[docs]
def xy(self,
train, test,
tid_col='tid',
class_col='label',
space_geohash=False, # True: Geohash, False: indexgrid
geo_precision=30, # Geohash: precision OR IndexGrid: meters
validate=False):
# RETURN: X, y, features, num_classes, space, dic_parameters
return prepareTrajectories(train.copy(), test.copy(),
tid_col=tid_col,
class_col=class_col,
# space_geohash, True: Geohash, False: indexgrid
space_geohash=space_geohash,
# Geohash: precision OR IndexGrid: meters
geo_precision=geo_precision,
features_encoding=True,
y_one_hot_encodding=True,
split_test_validation=validate,
data_preparation=2,
verbose=self.isverbose)
[docs]
def create(self, config):
vocab_size=self.config['vocab_size']
max_lenght=self.config['max_lenght']
num_classes=self.config['num_classes'] # Tarlis
col_name = list(vocab_size.keys())
rnn=config[0]
rnn_units=config[1]
merge_type=config[2]
dropout_before_rnn=config[3]
dropout_after_rnn=config[4]
embedding_size=config[5]
input_model = []
embedding_layers = []
hidden_input = []
hidden_dropout = []
if not isinstance(embedding_size, dict):
embbeding_default = embedding_size
embedding_size = dict(zip(col_name, np.full(len(col_name), embbeding_default)))
assert set(vocab_size) == set(embedding_size), "ERR: embedding size is different from vocab_size"
assert len(embedding_size) > 0, "embedding size was not defined"
# Initializing Neural Network
# Building Input and Embedding Layers
for c in tqdm(col_name):
i_model= Input(shape=(max_lenght,),
name='Input_{}'.format(c))
e_output_ = Embedding(input_dim = vocab_size[c],
output_dim = embedding_size[c],
name='Embedding_{}'.format(c),
input_length=max_lenght)(i_model)
input_model.append(i_model)
embedding_layers.append(e_output_)
# MERGE Layer
if len(embedding_layers) == 1:
hidden_input = embedding_layers[0]
elif merge_type == 'add':
hidden_input = Add()(embedding_layers)
elif merge_type == 'avg':
hidden_input = Average()(embedding_layers)
else:
hidden_input = Concatenate(axis=2)(embedding_layers)
# DROPOUT before RNN
hidden_dropout = Dropout(dropout_before_rnn)(hidden_input)
# Recurrent Neural Network Layer
# https://www.quora.com/What-is-the-meaning-of-%E2%80%9CThe-number-of-units-in-the-LSTM-cell
if rnn == 'bilstm':
rnn_cell = Bidirectional(LSTM(units=rnn_units, recurrent_regularizer=l1(0.02)))(hidden_dropout)
else:
rnn_cell = LSTM(units=rnn_units, recurrent_regularizer=l1(0.02))(hidden_dropout)
rnn_dropout = Dropout(dropout_after_rnn)(rnn_cell)
#https://keras.io/initializers/#randomnormal
output_model = Dense(num_classes,
kernel_initializer=he_uniform(),
activation='softmax')(rnn_dropout)
# Encoding the labels as integers and using the sparse_categorical_crossentropy asloss function
return Model(inputs=input_model, outputs=output_model)
[docs]
def fit(self,
X_train,
y_train,
X_val,
y_val,
config=None):
if not config:
config = self.best_config
if not self.model:
self.model = self.create(config)
batch_size=config[6]
epochs=config[7]
monitor=config[9]
min_delta=0
patience=config[8]
verbose=0
baseline=None # By Tarlis
optimizer=config[10]
learning_rate=config[11]
mode='auto'
new_metrics=None
modelname=''
log_dir=None
loss=config[12]
loss_parameters=config[13]
assert (y_train.ndim == 1) | (y_train.ndim == 2), "ERR: y_train dimension is incorrect"
assert (y_val.ndim == 1) | (y_val.ndim == 2), "ERR: y_test dimension is incorrect"
assert (y_train.ndim == y_val.ndim), "ERR: y_train and y_test have different dimensions"
if y_train.ndim == 1:
y_one_hot_encodding = False
elif y_train.ndim == 2:
y_one_hot_encodding = True
if y_one_hot_encodding == True:
loss = ['categorical_crossentropy'] #categorical_crossentropy
my_metrics = ['acc', 'top_k_categorical_accuracy']
else:
loss = ['sparse_categorical_crossentropy'] #sparse_categorical_crossentropy
my_metrics = ['acc', 'sparse_top_k_categorical_accuracy']
# Tarlis: removed the top_k metric in cases of less than 5 classes
if self.config['num_classes'] < 5:
my_metrics = ['acc']
if new_metrics is not None:
my_metrics = new_metrics + my_metrics
if optimizer == 'ada':
# Optimizer was setting as Adam
optimizer = Adam(lr=learning_rate)
else:
# Optimizer was setting as RMSProps
optimizer = RMSprop(lr=learning_rate)
# Compiling DeepeST Model
self.model.compile(optimizer=optimizer, loss=loss, metrics=my_metrics)
early_stop = EarlyStopping(monitor=monitor,
min_delta=min_delta,
patience=patience,
verbose=verbose, # without print
mode=mode,
baseline=baseline,
restore_best_weights=True)
# Defining checkpoint
my_callbacks= [early_stop]
# Starting training
return self.model.fit(X_train, y_train,
epochs=epochs,
callbacks=my_callbacks,
validation_data=(X_val, y_val),
verbose=1,
shuffle=True,
use_multiprocessing=True,
batch_size=batch_size)
[docs]
def predict(self,
X_test,
y_test):
assert (y_test.ndim == 1) | (y_test.ndim == 2), "ERR: y_train dimension is incorrect"
if y_test.ndim == 1:
y_one_hot_encodding = False
elif y_test.ndim == 2:
y_one_hot_encodding = True
y_pred_prob = np.array(self.model.predict(X_test))
if y_one_hot_encodding == True:
argmax = np.argmax(y_pred_prob, axis=1)
y_pred_true = np.zeros(y_pred_prob.shape)
for row, col in enumerate(argmax):
y_pred_true[row][col] = 1
else:
y_pred_true = y_pred_prob.argmax(axis=1)
self._summary = self.score(np.argmax(y_test, axis=1), y_pred_prob)
self.y_test_true = y_test
self.y_test_pred = y_pred_true
if self.le:
self.y_test_true = self.le.inverse_transform(self.y_test_true).reshape(1, -1)[0]
self.y_test_pred = self.le.inverse_transform(self.y_test_pred).reshape(1, -1)[0]
return self._summary, y_pred_prob
[docs]
def clear(self):
super().clear()
K.clear_session()