# -*- coding: utf-8 -*-
'''
MAT-Tools: Python Framework for Multiple Aspect Trajectory Data Mining
The present package offers a tool, to support the user in the task of data analysis of multiple aspect trajectories. It integrates into a unique framework for multiple aspects trajectories and in general for multidimensional sequence data mining methods.
Copyright (C) 2022, MIT license (this portion of code is subject to licensing from source project distribution)
Created on Dec, 2021
Copyright (C) 2022, License GPL Version 3 or superior (see LICENSE file)
Authors:
- Tarlis Portela
'''
# --------------------------------------------------------------------------------
from matclassification.methods.core import *
# --------------------------------------------------------------------------------
# Hiperparameter Optimization Classifier - For Trajectory input data
[docs]
class THSClassifier(HSClassifier):
"""
A hyperparameter optimization classifier for trajectory data, leveraging similarity measures and
support for geospatial data encoding (Geohash or IndexGrid).
#TODO Geohash and IndexGrid encoding and testing
Check: help(AbstractClassifier) and help(HSClassifier)
Parameters:
-----------
name : str
Name of the classifier model.
save_results : bool, optional (default=False)
Whether to save the results of the classification.
n_jobs : int, optional (default=-1)
The number of parallel jobs to run for computation. -1 means using all processors.
verbose : bool, optional (default=False)
Verbosity mode. If True, enables detailed output.
random_state : int, optional (default=42)
Random seed used for reproducibility.
filterwarnings : str, optional (default='ignore')
Warning filter setting to control output warnings.
"""
def __init__(self,
name='NN',
save_results=False,
n_jobs=-1,
verbose=False,
random_state=42,
filterwarnings='ignore'):
super().__init__(name=name, save_results=save_results, n_jobs=n_jobs, verbose=verbose, random_state=random_state, filterwarnings=filterwarnings)
[docs]
def message(self, pbar, text):
if self.isverbose:
pbar.set_postfix_str(text)
[docs]
def read_report(self, filename, prefix=''):
marksplit = '-'
validation_report = pd.read_csv(filename)
filename = filename.split(prefix+self.name.lower()+'-')[-1]
filename = filename.split('.csv')[0]
i = 0
for y in filename.split(marksplit):
validation_report['p'+str(i)] = y
i+=1
return validation_report
[docs]
def xy(self,
train, test,
tid_col='tid',
class_col='label',
space_geohash=False, # True: Geohash, False: indexgrid
geo_precision=30, # Geohash: precision OR IndexGrid: meters
validate=False):
# RETURN: X, y, features, num_classes, space, dic_parameters
return prepareTrajectories(train.copy(), test.copy(),
tid_col=tid_col,
class_col=class_col,
# space_geohash, True: Geohash, False: indexgrid
space_geohash=space_geohash,
# Geohash: precision OR IndexGrid: meters
geo_precision=geo_precision,
features_encoding=True,
y_one_hot_encodding=False,
split_test_validation=validate,
data_preparation=1,
verbose=self.isverbose)
[docs]
def fit(self,
X_train,
y_train,
X_val,
y_val,
config=None):
if not config:
config = self.best_config
if not self.model:
self.model = self.create(config)
return self.model.fit(X_train,
y_train,
X_val,
y_val)
[docs]
def predict(self,
X_test,
y_test):
y_pred_prob = self.model.predict(X_test)
if y_pred_prob.ndim == 1:
y_pred = y_pred_prob
else:
y_pred = argmax(y_pred_prob , axis = 1)
self.y_test_true = y_test
self.y_test_pred = y_pred
if self.le:
self.y_test_true = self.le.inverse_transform(self.y_test_true)
self.y_test_pred = self.le.inverse_transform(self.y_test_pred)
self._summary = self.score(y_test, y_pred_prob)
return self._summary, y_pred_prob
[docs]
def train(self, dir_validation='.'):
self.start_time = datetime.now()
X_train = self.X_train
y_train = self.y_train
if self.validate:
X_val = self.X_val
y_val = self.y_val
else:
X_val = self.X_test
y_val = self.y_test
if self.isverbose:
print('['+self.name+':] Training hiperparameter model')
# Hiper-param data:
data = []
self.best_config = [-1, []]
if self.isverbose:
pbar = tqdm(self.grid, desc='['+self.name+':] Model Training')
else:
pbar = self.grid
for config in pbar:
# concat config:
params = '-'.join([str(y) for y in config])
filename = os.path.join(dir_validation, self.name.lower()+'-'+params+'.csv')
if os.path.exists(filename):
self.message(pbar, 'Skip ---> {}'.format(filename))
data.append(self.read_report(filename))
else:
self.message(pbar, 'Trainning Config - '+params)
self.model = self.create(config)
self.fit(X_train, y_train, X_val, y_val, config)
validation_report, y_pred = self.predict(X_val, y_val)
if self.save_results:
validation_report.to_csv(filename, index=False)
for index, (att, val) in enumerate(zip(['p'+str(y) for y in range(len(config))], config)):
validation_report[att] = [val]
data.append( validation_report )
# Choose the best model based on higher acc:
acc = validation_report.iloc[0]['accuracy']#['acc']
if acc > self.best_config[0]:
self.best_config = [acc, config]
self.best_model = self.model
# TODO Save model results ??
self.clear()
# ------------------------------------------->
#break
# TODO -----------------------------------------># -------------------------------------------> **************************
self.best_config = self.best_config[1]
self.report = pd.concat(data)
self.report.reset_index(drop=True, inplace=True)
# self.report.sort_values('acc', ascending=False, inplace=True)
self.report.sort_values('accuracy', ascending=False, inplace=True)
self.model = self.best_model
return self.report
[docs]
def test(self,
rounds=1,
dir_evaluation='.'):
X_train = self.X_train
y_train = self.y_train
if self.validate:
X_val = self.X_val
y_val = self.y_val
else:
X_val = self.X_test
y_val = self.y_test
X_test = self.X_test
y_test = self.y_test
params = '-'.join([str(y) for y in self.best_config])
filename = os.path.join(dir_evaluation, 'eval_'+self.name.lower()+'-'+params+'.csv')
if os.path.exists(filename):
if self.isverbose:
print('['+self.name+':] Model previoulsy built.')
return self.read_report(filename, prefix='eval_')
else:
if self.isverbose:
print('['+self.name+':] Creating a model to test set')
print('['+self.name+':] Evaluation Config - '+params)
pbar = tqdm(range(rounds), desc="Model Testing")
else:
pbar = list(range(rounds))
random_state = self.config['random_state']
evaluate_report = []
for e in pbar:
re = (random_state+e)
self.config['random_state'] = re
self.message(pbar, 'Round {} of {} (random {})'.format(e, rounds, re))
self.model = self.create(self.best_config)
self.fit(X_train, y_train, X_val, y_val, self.best_config)
eval_report, y_pred = self.predict(X_test, y_test)
eval_report['clstime'] = self.duration()
evaluate_report.append(eval_report)
self.config['random_state'] = random_state
self.test_report = pd.concat(evaluate_report)
self.test_report.reset_index(drop=True, inplace=True)
if self.isverbose:
print('['+self.name+':] Processing time: {} milliseconds. Done.'.format(self.duration()))
return self.test_report, y_pred
# def summary(self):
## tail = self.report.tail(1)
# tail = self.test_report.mean()
# summ = {
# 'accuracy': tail['acc'],
# 'accuracyTopK5': tail['acc_top_K5'],
# 'balanced_accuracy': tail['balanced_accuracy'],
# 'precision_macro': tail['precision_macro'],
# 'recall_macro': tail['recall_macro'],
# 'f1_macro': tail['f1_macro'],
## 'loss': None, # TODO New metrics
# 'clstime': self.test_report['clstime'].max()
# }
#
# self._summary = pd.DataFrame(summ, index=[0])
# return self._summary