# -*- coding: utf-8 -*-
"""
MAT-Tools: Python Framework for Multiple Aspect Trajectory Data Mining
The present application offers a tool, to support the user in the preprocessing of multiple aspect trajectory data. It integrates into a unique framework for multiple aspects trajectories and in general for multidimensional sequence data mining methods.
Copyright (C) 2022, MIT license (this portion of code is subject to licensing from source project distribution)
Created on Dec, 2023
Copyright (C) 2023, License GPL Version 3 or superior (see LICENSE file)
Authors:
- Tarlis Portela
- sktime package (adapted)
"""
import os
import itertools
import textwrap
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score as acc
from tqdm.auto import tqdm
[docs]
class TsFileParseException(Exception):
"""
Should be raised when parsing a .ts file and the format is incorrect.
"""
pass
[docs]
def load_from_tsfile_to_dataframe(full_file_path_and_name,return_separate_X_and_y=False,replace_missing_vals_with="?", opLabel='Processing TS'):
full_file = open(full_file_path_and_name, "r", encoding="utf-8")
return load_from_tsfile(full_file,return_separate_X_and_y,replace_missing_vals_with, opLabel)
[docs]
def load_from_tsfile(file,return_separate_X_and_y=False,replace_missing_vals_with="NaN", opLabel='Processing TS'):
"""Loads data from a .ts file into a Pandas DataFrame.
Parameters
----------
full_file_path_and_name: str
The full pathname of the .ts file to read.
return_separate_X_and_y: bool
true if X and Y values should be returned as separate Data Frames (
X) and a numpy array (y), false otherwise.
This is only relevant for data that
replace_missing_vals_with: str
The value that missing values in the text file should be replaced
with prior to parsing.
Returns
-------
DataFrame, ndarray
If return_separate_X_and_y then a tuple containing a DataFrame and a
numpy array containing the relevant time-series and corresponding
class values.
DataFrame
If not return_separate_X_and_y then a single DataFrame containing
all time-series and (if relevant) a column "class_vals" the
associated class values.
"""
# Initialize flags and variables used when parsing the file
metadata_started = False
data_started = False
has_problem_name_tag = False
has_timestamps_tag = False
has_univariate_tag = False
has_class_labels_tag = False
has_data_tag = False
previous_timestamp_was_int = None
prev_timestamp_was_timestamp = None
num_dimensions = None
is_first_case = True
instance_list = []
class_val_list = []
line_num = 0
# Fix use of parameter:
replace_missing_vals_with = str(replace_missing_vals_with)
# Parse the file
# print(full_file_path_and_name)
# with full_file as file:
if file:
# for line in file:
for line in tqdm(file, desc=opLabel):
# def readLine(line):
# global metadata_started, data_started, has_problem_name_tag, has_timestamps_tag, has_univariate_tag
# global has_class_labels_tag, has_data_tag, previous_timestamp_was_int, prev_timestamp_was_timestamp
# global num_dimensions, is_first_case, instance_list, class_val_list, line_num
# Strip white space from start/end of line and change to
# lowercase for use below
line = line.strip().lower()
# Empty lines are valid at any point in a file
if line:
# Check if this line contains metadata
# Please note that even though metadata is stored in this
# function it is not currently published externally
if line.startswith("@problemname"):
# Check that the data has not started
if data_started:
raise TsFileParseException("metadata must come before data")
# Check that the associated value is valid
tokens = line.split(" ")
token_len = len(tokens)
if token_len == 1:
raise TsFileParseException(
"problemname tag requires an associated value"
)
# problem_name = line[len("@problemname") + 1:]
has_problem_name_tag = True
metadata_started = True
elif line.startswith("@timestamps"):
# Check that the data has not started
if data_started:
raise TsFileParseException("metadata must come before data")
# Check that the associated value is valid
tokens = line.split(" ")
token_len = len(tokens)
if token_len != 2:
raise TsFileParseException(
"timestamps tag requires an associated Boolean " "value"
)
elif tokens[1] == "true":
timestamps = True
elif tokens[1] == "false":
timestamps = False
else:
raise TsFileParseException("invalid timestamps value")
has_timestamps_tag = True
metadata_started = True
elif line.startswith("@univariate"):
# Check that the data has not started
if data_started:
raise TsFileParseException("metadata must come before data")
# Check that the associated value is valid
tokens = line.split(" ")
token_len = len(tokens)
if token_len != 2:
raise TsFileParseException(
"univariate tag requires an associated Boolean " "value"
)
elif tokens[1] == "true":
# univariate = True
pass
elif tokens[1] == "false":
# univariate = False
pass
else:
raise TsFileParseException("invalid univariate value")
has_univariate_tag = True
metadata_started = True
elif line.startswith("@classlabel"):
# Check that the data has not started
if data_started:
raise TsFileParseException("metadata must come before data")
# Check that the associated value is valid
tokens = line.split(" ")
token_len = len(tokens)
if token_len == 1:
raise TsFileParseException(
"classlabel tag requires an associated Boolean " "value"
)
if tokens[1] == "true":
class_labels = True
elif tokens[1] == "false":
class_labels = False
else:
raise TsFileParseException("invalid classLabel value")
# Check if we have any associated class values
if token_len == 2 and class_labels:
raise TsFileParseException(
"if the classlabel tag is true then class values "
"must be supplied"
)
has_class_labels_tag = True
class_label_list = [token.strip() for token in tokens[2:]]
metadata_started = True
# Check if this line contains the start of data
elif line.startswith("@data"):
if line != "@data":
raise TsFileParseException(
"data tag should not have an associated value"
)
if data_started and not metadata_started:
raise TsFileParseException("metadata must come before data")
else:
has_data_tag = True
data_started = True
# If the 'data tag has been found then metadata has been
# parsed and data can be loaded
elif data_started:
# Check that a full set of metadata has been provided
if (
not has_problem_name_tag
or not has_timestamps_tag
or not has_univariate_tag
or not has_class_labels_tag
or not has_data_tag
):
raise TsFileParseException(
"a full set of metadata has not been provided "
"before the data"
)
# Replace any missing values with the value specified
line = line.replace("?", replace_missing_vals_with)
# Check if we dealing with data that has timestamps
if timestamps:
# We're dealing with timestamps so cannot just split
# line on ':' as timestamps may contain one
has_another_value = False
has_another_dimension = False
timestamp_for_dim = []
values_for_dimension = []
this_line_num_dim = 0
line_len = len(line)
char_num = 0
while char_num < line_len:
# Move through any spaces
while char_num < line_len and str.isspace(line[char_num]):
char_num += 1
# See if there is any more data to read in or if
# we should validate that read thus far
if char_num < line_len:
# See if we have an empty dimension (i.e. no
# values)
if line[char_num] == ":":
if len(instance_list) < (this_line_num_dim + 1):
instance_list.append([])
instance_list[this_line_num_dim].append(
pd.Series(dtype="object")
)
this_line_num_dim += 1
has_another_value = False
has_another_dimension = True
timestamp_for_dim = []
values_for_dimension = []
char_num += 1
else:
# Check if we have reached a class label
if line[char_num] != "(" and class_labels:
class_val = line[char_num:].strip()
if class_val not in class_label_list:
raise TsFileParseException(
"the class value '"
+ class_val
+ "' on line "
+ str(line_num + 1)
+ " is not "
"valid"
)
class_val_list.append(class_val)
char_num = line_len
has_another_value = False
has_another_dimension = False
timestamp_for_dim = []
values_for_dimension = []
else:
# Read in the data contained within
# the next tuple
if line[char_num] != "(" and not class_labels:
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " does "
"not "
"start "
"with a "
"'('"
)
char_num += 1
tuple_data = ""
while (
char_num < line_len
and line[char_num] != ")"
):
tuple_data += line[char_num]
char_num += 1
if (
char_num >= line_len
or line[char_num] != ")"
):
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " does "
"not end"
" with a "
"')'"
)
# Read in any spaces immediately
# after the current tuple
char_num += 1
while char_num < line_len and str.isspace(
line[char_num]
):
char_num += 1
# Check if there is another value or
# dimension to process after this tuple
if char_num >= line_len:
has_another_value = False
has_another_dimension = False
elif line[char_num] == ",":
has_another_value = True
has_another_dimension = False
elif line[char_num] == ":":
has_another_value = False
has_another_dimension = True
char_num += 1
# Get the numeric value for the
# tuple by reading from the end of
# the tuple data backwards to the
# last comma
last_comma_index = tuple_data.rfind(",")
if last_comma_index == -1:
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " contains a tuple that has "
"no comma inside of it"
)
try:
value = tuple_data[last_comma_index + 1 :]
value = float(value)
except ValueError:
# By Tarlis:
print("dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " contains a tuple that does "
"not have a valid numeric "
"value, read '"+str(value)+"' as missing.")
value = replace_missing_vals_with
#raise TsFileParseException(
# "dimension "
# + str(this_line_num_dim + 1)
# + " on line "
# + str(line_num + 1)
# + " contains a tuple that does "
# "not have a valid numeric "
# "value"
#)
# Check the type of timestamp that
# we have
timestamp = tuple_data[0:last_comma_index]
try:
timestamp = int(timestamp)
timestamp_is_int = True
timestamp_is_timestamp = False
except ValueError:
timestamp_is_int = False
if not timestamp_is_int:
try:
timestamp = timestamp.strip()
timestamp_is_timestamp = True
except ValueError:
timestamp_is_timestamp = False
# Make sure that the timestamps in
# the file (not just this dimension
# or case) are consistent
if (
not timestamp_is_timestamp
and not timestamp_is_int
):
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " contains a tuple that "
"has an invalid timestamp '"
+ timestamp
+ "'"
)
if (
previous_timestamp_was_int is not None
and previous_timestamp_was_int
and not timestamp_is_int
):
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " contains tuples where the "
"timestamp format is "
"inconsistent"
)
if (
prev_timestamp_was_timestamp is not None
and prev_timestamp_was_timestamp
and not timestamp_is_timestamp
):
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " contains tuples where the "
"timestamp format is "
"inconsistent"
)
# Store the values
timestamp_for_dim += [timestamp]
values_for_dimension += [value]
# If this was our first tuple then
# we store the type of timestamp we
# had
if (
prev_timestamp_was_timestamp is None
and timestamp_is_timestamp
):
prev_timestamp_was_timestamp = True
previous_timestamp_was_int = False
if (
previous_timestamp_was_int is None
and timestamp_is_int
):
prev_timestamp_was_timestamp = False
previous_timestamp_was_int = True
# See if we should add the data for
# this dimension
if not has_another_value:
if len(instance_list) < (
this_line_num_dim + 1
):
instance_list.append([])
if timestamp_is_timestamp:
timestamp_for_dim = pd.DatetimeIndex(
timestamp_for_dim
)
instance_list[this_line_num_dim].append(
pd.Series(
index=timestamp_for_dim,
data=values_for_dimension,
)
)
this_line_num_dim += 1
timestamp_for_dim = []
values_for_dimension = []
elif has_another_value:
raise TsFileParseException(
"dimension " + str(this_line_num_dim + 1) + " on "
"line "
+ str(line_num + 1)
+ " ends with a ',' that "
"is not followed by "
"another tuple"
)
elif has_another_dimension and class_labels:
raise TsFileParseException(
"dimension " + str(this_line_num_dim + 1) + " on "
"line "
+ str(line_num + 1)
+ " ends with a ':' while "
"it should list a class "
"value"
)
elif has_another_dimension and not class_labels:
if len(instance_list) < (this_line_num_dim + 1):
instance_list.append([])
instance_list[this_line_num_dim].append(
pd.Series(dtype=np.float32)
)
this_line_num_dim += 1
num_dimensions = this_line_num_dim
# If this is the 1st line of data we have seen
# then note the dimensions
if not has_another_value and not has_another_dimension:
if num_dimensions is None:
num_dimensions = this_line_num_dim
if num_dimensions != this_line_num_dim:
raise TsFileParseException(
"line "
+ str(line_num + 1)
+ " does not have the "
"same number of "
"dimensions as the "
"previous line of "
"data"
)
# Check that we are not expecting some more data,
# and if not, store that processed above
if has_another_value:
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " ends with a ',' that is "
"not followed by another "
"tuple"
)
elif has_another_dimension and class_labels:
raise TsFileParseException(
"dimension "
+ str(this_line_num_dim + 1)
+ " on line "
+ str(line_num + 1)
+ " ends with a ':' while it "
"should list a class value"
)
elif has_another_dimension and not class_labels:
if len(instance_list) < (this_line_num_dim + 1):
instance_list.append([])
instance_list[this_line_num_dim].append(
pd.Series(dtype="object")
)
this_line_num_dim += 1
num_dimensions = this_line_num_dim
# If this is the 1st line of data we have seen then
# note the dimensions
if (
not has_another_value
and num_dimensions != this_line_num_dim
):
raise TsFileParseException(
"line " + str(line_num + 1) + " does not have the same "
"number of dimensions as the "
"previous line of data"
)
# Check if we should have class values, and if so
# that they are contained in those listed in the
# metadata
if class_labels and len(class_val_list) == 0:
raise TsFileParseException(
"the cases have no associated class values"
)
else:
dimensions = line.split(":")
# If first row then note the number of dimensions (
# that must be the same for all cases)
if is_first_case:
num_dimensions = len(dimensions)
if class_labels:
num_dimensions -= 1
for _dim in range(0, num_dimensions):
instance_list.append([])
is_first_case = False
# See how many dimensions that the case whose data
# in represented in this line has
this_line_num_dim = len(dimensions)
if class_labels:
this_line_num_dim -= 1
# All dimensions should be included for all series,
# even if they are empty
if this_line_num_dim != num_dimensions:
raise TsFileParseException(
"inconsistent number of dimensions. "
"Expecting "
+ str(num_dimensions)
+ " but have read "
+ str(this_line_num_dim)
)
# Process the data for each dimension
for dim in range(0, num_dimensions):
dimension = dimensions[dim].strip()
if dimension:
data_series = dimension.split(",")
#data_series = [float(i) for i in data_series]
def process(dim, value):
try:
value = float(value)
except ValueError:
# By Tarlis:
print("dimension "
+ str(dim)
+ " contains a tuple that does "
"not have a valid numeric "
"value, read '"+str(value)+"' as missing.")
value = replace_missing_vals_with
return value
data_series = list(map(lambda i: process(dim, i), data_series))
instance_list[dim].append(pd.Series(data_series))
else:
instance_list[dim].append(pd.Series(dtype="object"))
if class_labels:
class_val_list.append(dimensions[num_dimensions].strip())
line_num += 1
# list(map(lambda line: readLine(line), tqdm(file, desc=opLabel)))
file.close()
# Check that the file was not empty
if line_num:
# Check that the file contained both metadata and data
if metadata_started and not (
has_problem_name_tag
and has_timestamps_tag
and has_univariate_tag
and has_class_labels_tag
and has_data_tag
):
raise TsFileParseException("metadata incomplete")
elif metadata_started and not data_started:
raise TsFileParseException("file contained metadata but no data")
elif metadata_started and data_started and len(instance_list) == 0:
raise TsFileParseException("file contained metadata but no data")
# Create a DataFrame from the data parsed above
x_data = pd.DataFrame(dtype=np.float32)
for x in range(len(instance_list[0])):
x_data_aux = pd.DataFrame(dtype=np.float32)
for dim in range(len(instance_list)):
# print(dim, x)
# x_data_aux["dim" + str(dim)] = pd.Series(instance_list[dim][x])
x_data_aux = pd.concat([x_data_aux, pd.Series(instance_list[dim][x], name="dim" + str(dim))], axis=1)
x_data_aux["tid"] = x
if class_labels:
x_data_aux["label"] = class_val_list[x]
x_data = pd.concat([x_data, x_data_aux])
x_data.reset_index(drop=True, inplace=True)
# for dim in range(0, num_dimensions):
# data["dim_" + str(dim)] = instance_list[dim]
# Check if we should return any associated class labels separately
if class_labels:
if return_separate_X_and_y:
return x_data[:-1], np.asarray(class_val_list)
else:
# data["class_vals"] = pd.Series(class_val_list)
return x_data
else:
return x_data
else:
raise TsFileParseException("empty file")
[docs]
def load_from_arff_to_dataframe(
full_file_path_and_name,
has_class_labels=True,
return_separate_X_and_y=True,
replace_missing_vals_with="NaN",
):
"""Loads data from a .ts file into a Pandas DataFrame.
Parameters
----------
full_file_path_and_name: str
The full pathname of the .ts file to read.
has_class_labels: bool
true then line contains separated strings and class value contains
list of separated strings, check for 'return_separate_X_and_y'
false otherwise.
return_separate_X_and_y: bool
true then X and Y values should be returned as separate Data Frames (
X) and a numpy array (y), false otherwise.
This is only relevant for data.
replace_missing_vals_with: str
The value that missing values in the text file should be replaced
with prior to parsing.
Returns
-------
DataFrame, ndarray
If return_separate_X_and_y then a tuple containing a DataFrame and a
numpy array containing the relevant time-series and corresponding
class values.
DataFrame
If not return_separate_X_and_y then a single DataFrame containing
all time-series and (if relevant) a column "class_vals" the
associated class values.
"""
instance_list = []
class_val_list = []
data_started = False
is_multi_variate = False
is_first_case = True
# Parse the file
# print(full_file_path_and_name)
with open(full_file_path_and_name, "r") as f:
for line in f:
if line.strip():
if (
is_multi_variate is False
and "@attribute" in line.lower()
and "relational" in line.lower()
):
is_multi_variate = True
if "@data" in line.lower():
data_started = True
continue
# if the 'data tag has been found, the header information
# has been cleared and now data can be loaded
if data_started:
line = line.replace("?", replace_missing_vals_with)
if is_multi_variate:
if has_class_labels:
line, class_val = line.split("',")
class_val_list.append(class_val.strip())
dimensions = line.split("\\n")
dimensions[0] = dimensions[0].replace("'", "")
if is_first_case:
for _d in range(len(dimensions)):
instance_list.append([])
is_first_case = False
for dim in range(len(dimensions)):
instance_list[dim].append(
pd.Series(
[float(i) for i in dimensions[dim].split(",")]
)
)
else:
if is_first_case:
instance_list.append([])
is_first_case = False
line_parts = line.split(",")
if has_class_labels:
instance_list[0].append(
pd.Series(
[
float(i)
for i in line_parts[: len(line_parts) - 1]
]
)
)
class_val_list.append(line_parts[-1].strip())
else:
instance_list[0].append(
pd.Series(
[float(i) for i in line_parts[: len(line_parts)]]
)
)
x_data = pd.DataFrame(dtype=np.float32)
for x in range(len(instance_list[0])):
x_data_aux = pd.DataFrame(dtype=np.float32)
for dim in range(len(instance_list)):
# print(dim, x)
x_data_aux["dim" + str(dim)] = pd.Series(instance_list[dim][x])
x_data_aux["tid"] = x
if has_class_labels:
x_data_aux["label"] = class_val_list[x]
x_data = pd.concat([x_data, x_data_aux])
x_data.reset_index(drop=True, inplace=True)
# for dim in range(len(instance_list)):
# x_data["dim_" + str(dim)] = instance_list[dim]
if has_class_labels:
if return_separate_X_and_y:
return x_data[:-1], np.asarray(class_val_list)
# else:
# x_data["class_vals"] = pd.Series(class_val_list)
return x_data
[docs]
def load_from_ucr_tsv_to_dataframe(
full_file_path_and_name, return_separate_X_and_y=True
):
"""Loads data from a .tsv file into a Pandas DataFrame.
Parameters
----------
full_file_path_and_name: str
The full pathname of the .tsv file to read.
return_separate_X_and_y: bool
true then X and Y values should be returned as separate Data Frames (
X) and a numpy array (y), false otherwise.
This is only relevant for data.
Returns
-------
DataFrame, ndarray
If return_separate_X_and_y then a tuple containing a DataFrame and a
numpy array containing the relevant time-series and corresponding
class values.
DataFrame
If not return_separate_X_and_y then a single DataFrame containing
all time-series and (if relevant) a column "class_vals" the
associated class values.
"""
df = pd.read_csv(full_file_path_and_name, sep="\t", header=None)
y = df.pop(0).values
df.columns -= 1
X = pd.DataFrame()
X["dim_0"] = [pd.Series(df.iloc[x, :]) for x in range(len(df))]
if return_separate_X_and_y is True:
return X, y
X["class_val"] = y
return X
# assumes data is in a long table format with the following structure:
# | case_id | dim_id | reading_id | value
# ------------------------------------------------
# 0 | int | int | int | double
# 1 | int | int | int | double
# 2 | int | int | int | double
# 3 | int | int | int | double
[docs]
def from_long_to_nested(long_dataframe):
# get distinct dimension ids
unique_dim_ids = long_dataframe.iloc[:, 1].unique()
num_dims = len(unique_dim_ids)
data_by_dim = []
indices = []
# get number of distinct cases (note: a case may have 1 or many dimensions)
unique_case_ids = long_dataframe.iloc[:, 0].unique()
# assume series are indexed from 0 to m-1 (can map to non-linear indices
# later if needed)
# init a list of size m for each d - to store the series data for m
# cases over d dimensions
# also, data may not be in order in long format so store index data for
# aligning output later
# (i.e. two stores required: one for reading id/timestamp and one for
# value)
for d in range(0, num_dims):
data_by_dim.append([])
indices.append([])
for _c in range(0, len(unique_case_ids)):
data_by_dim[d].append([])
indices[d].append([])
# go through every row in the dataframe
for i in range(0, len(long_dataframe)):
# extract the relevant data, catch cases where the dim id is not an
# int as it must be the class
row = long_dataframe.iloc[i]
case_id = int(row[0])
dim_id = int(row[1])
reading_id = int(row[2])
value = row[3]
data_by_dim[dim_id][case_id].append(value)
indices[dim_id][case_id].append(reading_id)
x_data = {}
for d in range(0, num_dims):
key = "dim_" + str(d)
dim_list = []
for i in range(0, len(unique_case_ids)):
temp = pd.Series(data_by_dim[d][i], indices[d][i])
dim_list.append(temp)
x_data[key] = pd.Series(dim_list)
return pd.DataFrame(x_data)
[docs]
def load_from_long_to_dataframe(full_file_path_and_name, separator=","):
"""Loads data from a long format file into a Pandas DataFrame.
Parameters
----------
full_file_path_and_name: str
The full pathname of the .csv file to read.
separator: str
The character that the csv uses as a delimiter
Returns
-------
DataFrame
A dataframe with sktime-formatted data
"""
data = pd.read_csv(full_file_path_and_name, sep=separator, header=0)
# ensure there are 4 columns in the long_format table
if len(data.columns) != 4:
raise LongFormatDataParseException("dataframe must contain 4 columns of data")
# ensure that all columns contain the correct data types
if (
not data.iloc[:, 0].dtype == "int64"
or not data.iloc[:, 1].dtype == "int64"
or not data.iloc[:, 2].dtype == "int64"
or not data.iloc[:, 3].dtype == "float64"
):
raise LongFormatDataParseException(
"one or more data columns contains data of an incorrect type"
)
data = from_long_to_nested(data)
return data
# left here for now, better elsewhere later perhaps
[docs]
def generate_example_long_table(num_cases=50, series_len=20, num_dims=2):
"""Generates example from long table format file.
Parameters
----------
num_cases: int
Number of cases.
series_len: int
Length of the series.
num_dims: int
Number of dimensions.
Returns
-------
DataFrame
"""
rows_per_case = series_len * num_dims
total_rows = num_cases * series_len * num_dims
case_ids = np.empty(total_rows, dtype=np.int)
idxs = np.empty(total_rows, dtype=np.int)
dims = np.empty(total_rows, dtype=np.int)
vals = np.random.rand(total_rows)
for i in range(total_rows):
case_ids[i] = int(i / rows_per_case)
rem = i % rows_per_case
dims[i] = int(rem / series_len)
idxs[i] = rem % series_len
df = pd.DataFrame()
df["case_id"] = pd.Series(case_ids)
df["dim_id"] = pd.Series(dims)
df["reading_id"] = pd.Series(idxs)
df["value"] = pd.Series(vals)
return df
[docs]
def write_dataframe_to_tsfile(
data,
path,
problem_name="sample_data",
timestamp=False,
univariate=True,
class_label=None,
class_value_list=None,
equal_length=False,
series_length=-1,
missing_values="NaN",
comment=None,
):
"""
Output a dataset in dataframe format to .ts file
Parameters
----------
data: pandas dataframe
the dataset in a dataframe to be written as a ts file
which must be of the structure specified in the documentation
https://github.com/whackteachers/sktime/blob/master/examples/loading_data.ipynb
index | dim_0 | dim_1 | ... | dim_c-1
0 | pd.Series | pd.Series | pd.Series | pd.Series
1 | pd.Series | pd.Series | pd.Series | pd.Series
... | ... | ... | ... | ...
n | pd.Series | pd.Series | pd.Series | pd.Series
path: str
The full path to output the ts file
problem_name: str
The problemName to print in the header of the ts file
and also the name of the file.
timestamp: {False, bool}, optional
Indicate whether the data contains timestamps in the header.
univariate: {True, bool}, optional
Indicate whether the data is univariate or multivariate in the header.
If univariate, only the first dimension will be written to file
class_label: {list, None}, optional
Provide class label to show the possible class values
for classification problems in the header.
class_value_list: {list/ndarray, []}, optional
ndarray containing the class values for each case in classification problems
equal_length: {False, bool}, optional
Indicate whether each series has equal length. It only write to file if true.
series_length: {-1, int}, optional
Indicate each series length if they are of equal length.
It only write to file if true.
missing_values: {NaN, str}, optional
Representation for missing value, default is NaN.
comment: {None, str}, optional
Comment text to be inserted before the header in a block.
Returns
-------
None
Notes
-----
This version currently does not support writing timestamp data.
References
----------
The code for writing series data into file is adopted from
https://stackoverflow.com/questions/37877708/
how-to-turn-a-pandas-dataframe-row-into-a-comma-separated-string
"""
if class_value_list is None:
class_value_list = []
# ensure data provided is a dataframe
if not isinstance(data, pd.DataFrame):
raise ValueError("Data provided must be a DataFrame")
# ensure number of cases is same as the class value list
if len(data.index) != len(class_value_list) and len(class_value_list) > 0:
raise IndexError(
"The number of cases is not the same as the number of given " "class values"
)
if equal_length and series_length == -1:
raise ValueError(
"Please specify the series length for equal length time series data."
)
# create path if not exist
dirt = f"{str(path)}/{str(problem_name)}/"
try:
os.makedirs(dirt)
except os.error:
pass # raises os.error if path already exists
# create ts file in the path
file = open(f"{dirt}{str(problem_name)}_transform.ts", "w")
# write comment if any as a block at start of file
if comment:
file.write("\n# ".join(textwrap.wrap("# " + comment)))
file.write("\n")
# begin writing header information
file.write(f"@problemName {problem_name}\n")
file.write(f"@timeStamps {str(timestamp).lower()}\n")
file.write(f"@univariate {str(univariate).lower()}\n")
# write equal length or series length if provided
if equal_length:
file.write(f"@equalLength {str(equal_length).lower()}\n")
if series_length > 0:
file.write(f"@seriesLength {series_length}\n")
# write class label line
if class_label:
space_separated_class_label = " ".join(str(label) for label in class_label)
file.write(f"@classLabel true {space_separated_class_label}\n")
else:
file.write("@class_label false\n")
# begin writing the core data for each case
# which are the series and the class value list if there is any
file.write("@data\n")
for case, value in itertools.zip_longest(data.iterrows(), class_value_list):
for dimension in case[1:]: # start from the first dimension
# split the series observation into separate token
# ignoring the header and index
series = (
dimension[0]
.to_string(index=False, header=False, na_rep=missing_values)
.split("\n")
)
# turn series into comma-separated row
series = ",".join(obsv for obsv in series)
file.write(str(series))
# continue with another dimension for multivariate case
if not univariate:
file.write(":")
if value is not None:
file.write(f":{value}") # write the case value if any
file.write("\n") # open a new line
file.close()