Source code for matdata.inc.ts_io

# -*- coding: utf-8 -*-
"""
MAT-Tools: Python Framework for Multiple Aspect Trajectory Data Mining

The present application offers a tool, to support the user in the preprocessing of multiple aspect trajectory data. It integrates into a unique framework for multiple aspects trajectories and in general for multidimensional sequence data mining methods.
Copyright (C) 2022, MIT license (this portion of code is subject to licensing from source project distribution)

Created on Dec, 2023
Copyright (C) 2023, License GPL Version 3 or superior (see LICENSE file)

Authors:
    - Tarlis Portela
    - sktime package (adapted)
"""
import os
import itertools
import textwrap

import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score as acc

from tqdm.auto import tqdm

[docs] class TsFileParseException(Exception): """ Should be raised when parsing a .ts file and the format is incorrect. """ pass
[docs] class LongFormatDataParseException(Exception): """ Should be raised when parsing a .csv file with long-formatted date and the format is incorrect """ pass
[docs] def load_from_tsfile_to_dataframe(full_file_path_and_name,return_separate_X_and_y=False,replace_missing_vals_with="?", opLabel='Processing TS'): full_file = open(full_file_path_and_name, "r", encoding="utf-8") return load_from_tsfile(full_file,return_separate_X_and_y,replace_missing_vals_with, opLabel)
[docs] def load_from_tsfile(file,return_separate_X_and_y=False,replace_missing_vals_with="NaN", opLabel='Processing TS'): """Loads data from a .ts file into a Pandas DataFrame. Parameters ---------- full_file_path_and_name: str The full pathname of the .ts file to read. return_separate_X_and_y: bool true if X and Y values should be returned as separate Data Frames ( X) and a numpy array (y), false otherwise. This is only relevant for data that replace_missing_vals_with: str The value that missing values in the text file should be replaced with prior to parsing. Returns ------- DataFrame, ndarray If return_separate_X_and_y then a tuple containing a DataFrame and a numpy array containing the relevant time-series and corresponding class values. DataFrame If not return_separate_X_and_y then a single DataFrame containing all time-series and (if relevant) a column "class_vals" the associated class values. """ # Initialize flags and variables used when parsing the file metadata_started = False data_started = False has_problem_name_tag = False has_timestamps_tag = False has_univariate_tag = False has_class_labels_tag = False has_data_tag = False previous_timestamp_was_int = None prev_timestamp_was_timestamp = None num_dimensions = None is_first_case = True instance_list = [] class_val_list = [] line_num = 0 # Fix use of parameter: replace_missing_vals_with = str(replace_missing_vals_with) # Parse the file # print(full_file_path_and_name) # with full_file as file: if file: # for line in file: for line in tqdm(file, desc=opLabel): # def readLine(line): # global metadata_started, data_started, has_problem_name_tag, has_timestamps_tag, has_univariate_tag # global has_class_labels_tag, has_data_tag, previous_timestamp_was_int, prev_timestamp_was_timestamp # global num_dimensions, is_first_case, instance_list, class_val_list, line_num # Strip white space from start/end of line and change to # lowercase for use below line = line.strip().lower() # Empty lines are valid at any point in a file if line: # Check if this line contains metadata # Please note that even though metadata is stored in this # function it is not currently published externally if line.startswith("@problemname"): # Check that the data has not started if data_started: raise TsFileParseException("metadata must come before data") # Check that the associated value is valid tokens = line.split(" ") token_len = len(tokens) if token_len == 1: raise TsFileParseException( "problemname tag requires an associated value" ) # problem_name = line[len("@problemname") + 1:] has_problem_name_tag = True metadata_started = True elif line.startswith("@timestamps"): # Check that the data has not started if data_started: raise TsFileParseException("metadata must come before data") # Check that the associated value is valid tokens = line.split(" ") token_len = len(tokens) if token_len != 2: raise TsFileParseException( "timestamps tag requires an associated Boolean " "value" ) elif tokens[1] == "true": timestamps = True elif tokens[1] == "false": timestamps = False else: raise TsFileParseException("invalid timestamps value") has_timestamps_tag = True metadata_started = True elif line.startswith("@univariate"): # Check that the data has not started if data_started: raise TsFileParseException("metadata must come before data") # Check that the associated value is valid tokens = line.split(" ") token_len = len(tokens) if token_len != 2: raise TsFileParseException( "univariate tag requires an associated Boolean " "value" ) elif tokens[1] == "true": # univariate = True pass elif tokens[1] == "false": # univariate = False pass else: raise TsFileParseException("invalid univariate value") has_univariate_tag = True metadata_started = True elif line.startswith("@classlabel"): # Check that the data has not started if data_started: raise TsFileParseException("metadata must come before data") # Check that the associated value is valid tokens = line.split(" ") token_len = len(tokens) if token_len == 1: raise TsFileParseException( "classlabel tag requires an associated Boolean " "value" ) if tokens[1] == "true": class_labels = True elif tokens[1] == "false": class_labels = False else: raise TsFileParseException("invalid classLabel value") # Check if we have any associated class values if token_len == 2 and class_labels: raise TsFileParseException( "if the classlabel tag is true then class values " "must be supplied" ) has_class_labels_tag = True class_label_list = [token.strip() for token in tokens[2:]] metadata_started = True # Check if this line contains the start of data elif line.startswith("@data"): if line != "@data": raise TsFileParseException( "data tag should not have an associated value" ) if data_started and not metadata_started: raise TsFileParseException("metadata must come before data") else: has_data_tag = True data_started = True # If the 'data tag has been found then metadata has been # parsed and data can be loaded elif data_started: # Check that a full set of metadata has been provided if ( not has_problem_name_tag or not has_timestamps_tag or not has_univariate_tag or not has_class_labels_tag or not has_data_tag ): raise TsFileParseException( "a full set of metadata has not been provided " "before the data" ) # Replace any missing values with the value specified line = line.replace("?", replace_missing_vals_with) # Check if we dealing with data that has timestamps if timestamps: # We're dealing with timestamps so cannot just split # line on ':' as timestamps may contain one has_another_value = False has_another_dimension = False timestamp_for_dim = [] values_for_dimension = [] this_line_num_dim = 0 line_len = len(line) char_num = 0 while char_num < line_len: # Move through any spaces while char_num < line_len and str.isspace(line[char_num]): char_num += 1 # See if there is any more data to read in or if # we should validate that read thus far if char_num < line_len: # See if we have an empty dimension (i.e. no # values) if line[char_num] == ":": if len(instance_list) < (this_line_num_dim + 1): instance_list.append([]) instance_list[this_line_num_dim].append( pd.Series(dtype="object") ) this_line_num_dim += 1 has_another_value = False has_another_dimension = True timestamp_for_dim = [] values_for_dimension = [] char_num += 1 else: # Check if we have reached a class label if line[char_num] != "(" and class_labels: class_val = line[char_num:].strip() if class_val not in class_label_list: raise TsFileParseException( "the class value '" + class_val + "' on line " + str(line_num + 1) + " is not " "valid" ) class_val_list.append(class_val) char_num = line_len has_another_value = False has_another_dimension = False timestamp_for_dim = [] values_for_dimension = [] else: # Read in the data contained within # the next tuple if line[char_num] != "(" and not class_labels: raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " does " "not " "start " "with a " "'('" ) char_num += 1 tuple_data = "" while ( char_num < line_len and line[char_num] != ")" ): tuple_data += line[char_num] char_num += 1 if ( char_num >= line_len or line[char_num] != ")" ): raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " does " "not end" " with a " "')'" ) # Read in any spaces immediately # after the current tuple char_num += 1 while char_num < line_len and str.isspace( line[char_num] ): char_num += 1 # Check if there is another value or # dimension to process after this tuple if char_num >= line_len: has_another_value = False has_another_dimension = False elif line[char_num] == ",": has_another_value = True has_another_dimension = False elif line[char_num] == ":": has_another_value = False has_another_dimension = True char_num += 1 # Get the numeric value for the # tuple by reading from the end of # the tuple data backwards to the # last comma last_comma_index = tuple_data.rfind(",") if last_comma_index == -1: raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " contains a tuple that has " "no comma inside of it" ) try: value = tuple_data[last_comma_index + 1 :] value = float(value) except ValueError: # By Tarlis: print("dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " contains a tuple that does " "not have a valid numeric " "value, read '"+str(value)+"' as missing.") value = replace_missing_vals_with #raise TsFileParseException( # "dimension " # + str(this_line_num_dim + 1) # + " on line " # + str(line_num + 1) # + " contains a tuple that does " # "not have a valid numeric " # "value" #) # Check the type of timestamp that # we have timestamp = tuple_data[0:last_comma_index] try: timestamp = int(timestamp) timestamp_is_int = True timestamp_is_timestamp = False except ValueError: timestamp_is_int = False if not timestamp_is_int: try: timestamp = timestamp.strip() timestamp_is_timestamp = True except ValueError: timestamp_is_timestamp = False # Make sure that the timestamps in # the file (not just this dimension # or case) are consistent if ( not timestamp_is_timestamp and not timestamp_is_int ): raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " contains a tuple that " "has an invalid timestamp '" + timestamp + "'" ) if ( previous_timestamp_was_int is not None and previous_timestamp_was_int and not timestamp_is_int ): raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " contains tuples where the " "timestamp format is " "inconsistent" ) if ( prev_timestamp_was_timestamp is not None and prev_timestamp_was_timestamp and not timestamp_is_timestamp ): raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " contains tuples where the " "timestamp format is " "inconsistent" ) # Store the values timestamp_for_dim += [timestamp] values_for_dimension += [value] # If this was our first tuple then # we store the type of timestamp we # had if ( prev_timestamp_was_timestamp is None and timestamp_is_timestamp ): prev_timestamp_was_timestamp = True previous_timestamp_was_int = False if ( previous_timestamp_was_int is None and timestamp_is_int ): prev_timestamp_was_timestamp = False previous_timestamp_was_int = True # See if we should add the data for # this dimension if not has_another_value: if len(instance_list) < ( this_line_num_dim + 1 ): instance_list.append([]) if timestamp_is_timestamp: timestamp_for_dim = pd.DatetimeIndex( timestamp_for_dim ) instance_list[this_line_num_dim].append( pd.Series( index=timestamp_for_dim, data=values_for_dimension, ) ) this_line_num_dim += 1 timestamp_for_dim = [] values_for_dimension = [] elif has_another_value: raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on " "line " + str(line_num + 1) + " ends with a ',' that " "is not followed by " "another tuple" ) elif has_another_dimension and class_labels: raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on " "line " + str(line_num + 1) + " ends with a ':' while " "it should list a class " "value" ) elif has_another_dimension and not class_labels: if len(instance_list) < (this_line_num_dim + 1): instance_list.append([]) instance_list[this_line_num_dim].append( pd.Series(dtype=np.float32) ) this_line_num_dim += 1 num_dimensions = this_line_num_dim # If this is the 1st line of data we have seen # then note the dimensions if not has_another_value and not has_another_dimension: if num_dimensions is None: num_dimensions = this_line_num_dim if num_dimensions != this_line_num_dim: raise TsFileParseException( "line " + str(line_num + 1) + " does not have the " "same number of " "dimensions as the " "previous line of " "data" ) # Check that we are not expecting some more data, # and if not, store that processed above if has_another_value: raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " ends with a ',' that is " "not followed by another " "tuple" ) elif has_another_dimension and class_labels: raise TsFileParseException( "dimension " + str(this_line_num_dim + 1) + " on line " + str(line_num + 1) + " ends with a ':' while it " "should list a class value" ) elif has_another_dimension and not class_labels: if len(instance_list) < (this_line_num_dim + 1): instance_list.append([]) instance_list[this_line_num_dim].append( pd.Series(dtype="object") ) this_line_num_dim += 1 num_dimensions = this_line_num_dim # If this is the 1st line of data we have seen then # note the dimensions if ( not has_another_value and num_dimensions != this_line_num_dim ): raise TsFileParseException( "line " + str(line_num + 1) + " does not have the same " "number of dimensions as the " "previous line of data" ) # Check if we should have class values, and if so # that they are contained in those listed in the # metadata if class_labels and len(class_val_list) == 0: raise TsFileParseException( "the cases have no associated class values" ) else: dimensions = line.split(":") # If first row then note the number of dimensions ( # that must be the same for all cases) if is_first_case: num_dimensions = len(dimensions) if class_labels: num_dimensions -= 1 for _dim in range(0, num_dimensions): instance_list.append([]) is_first_case = False # See how many dimensions that the case whose data # in represented in this line has this_line_num_dim = len(dimensions) if class_labels: this_line_num_dim -= 1 # All dimensions should be included for all series, # even if they are empty if this_line_num_dim != num_dimensions: raise TsFileParseException( "inconsistent number of dimensions. " "Expecting " + str(num_dimensions) + " but have read " + str(this_line_num_dim) ) # Process the data for each dimension for dim in range(0, num_dimensions): dimension = dimensions[dim].strip() if dimension: data_series = dimension.split(",") #data_series = [float(i) for i in data_series] def process(dim, value): try: value = float(value) except ValueError: # By Tarlis: print("dimension " + str(dim) + " contains a tuple that does " "not have a valid numeric " "value, read '"+str(value)+"' as missing.") value = replace_missing_vals_with return value data_series = list(map(lambda i: process(dim, i), data_series)) instance_list[dim].append(pd.Series(data_series)) else: instance_list[dim].append(pd.Series(dtype="object")) if class_labels: class_val_list.append(dimensions[num_dimensions].strip()) line_num += 1 # list(map(lambda line: readLine(line), tqdm(file, desc=opLabel))) file.close() # Check that the file was not empty if line_num: # Check that the file contained both metadata and data if metadata_started and not ( has_problem_name_tag and has_timestamps_tag and has_univariate_tag and has_class_labels_tag and has_data_tag ): raise TsFileParseException("metadata incomplete") elif metadata_started and not data_started: raise TsFileParseException("file contained metadata but no data") elif metadata_started and data_started and len(instance_list) == 0: raise TsFileParseException("file contained metadata but no data") # Create a DataFrame from the data parsed above x_data = pd.DataFrame(dtype=np.float32) for x in range(len(instance_list[0])): x_data_aux = pd.DataFrame(dtype=np.float32) for dim in range(len(instance_list)): # print(dim, x) # x_data_aux["dim" + str(dim)] = pd.Series(instance_list[dim][x]) x_data_aux = pd.concat([x_data_aux, pd.Series(instance_list[dim][x], name="dim" + str(dim))], axis=1) x_data_aux["tid"] = x if class_labels: x_data_aux["label"] = class_val_list[x] x_data = pd.concat([x_data, x_data_aux]) x_data.reset_index(drop=True, inplace=True) # for dim in range(0, num_dimensions): # data["dim_" + str(dim)] = instance_list[dim] # Check if we should return any associated class labels separately if class_labels: if return_separate_X_and_y: return x_data[:-1], np.asarray(class_val_list) else: # data["class_vals"] = pd.Series(class_val_list) return x_data else: return x_data else: raise TsFileParseException("empty file")
[docs] def load_from_arff_to_dataframe( full_file_path_and_name, has_class_labels=True, return_separate_X_and_y=True, replace_missing_vals_with="NaN", ): """Loads data from a .ts file into a Pandas DataFrame. Parameters ---------- full_file_path_and_name: str The full pathname of the .ts file to read. has_class_labels: bool true then line contains separated strings and class value contains list of separated strings, check for 'return_separate_X_and_y' false otherwise. return_separate_X_and_y: bool true then X and Y values should be returned as separate Data Frames ( X) and a numpy array (y), false otherwise. This is only relevant for data. replace_missing_vals_with: str The value that missing values in the text file should be replaced with prior to parsing. Returns ------- DataFrame, ndarray If return_separate_X_and_y then a tuple containing a DataFrame and a numpy array containing the relevant time-series and corresponding class values. DataFrame If not return_separate_X_and_y then a single DataFrame containing all time-series and (if relevant) a column "class_vals" the associated class values. """ instance_list = [] class_val_list = [] data_started = False is_multi_variate = False is_first_case = True # Parse the file # print(full_file_path_and_name) with open(full_file_path_and_name, "r") as f: for line in f: if line.strip(): if ( is_multi_variate is False and "@attribute" in line.lower() and "relational" in line.lower() ): is_multi_variate = True if "@data" in line.lower(): data_started = True continue # if the 'data tag has been found, the header information # has been cleared and now data can be loaded if data_started: line = line.replace("?", replace_missing_vals_with) if is_multi_variate: if has_class_labels: line, class_val = line.split("',") class_val_list.append(class_val.strip()) dimensions = line.split("\\n") dimensions[0] = dimensions[0].replace("'", "") if is_first_case: for _d in range(len(dimensions)): instance_list.append([]) is_first_case = False for dim in range(len(dimensions)): instance_list[dim].append( pd.Series( [float(i) for i in dimensions[dim].split(",")] ) ) else: if is_first_case: instance_list.append([]) is_first_case = False line_parts = line.split(",") if has_class_labels: instance_list[0].append( pd.Series( [ float(i) for i in line_parts[: len(line_parts) - 1] ] ) ) class_val_list.append(line_parts[-1].strip()) else: instance_list[0].append( pd.Series( [float(i) for i in line_parts[: len(line_parts)]] ) ) x_data = pd.DataFrame(dtype=np.float32) for x in range(len(instance_list[0])): x_data_aux = pd.DataFrame(dtype=np.float32) for dim in range(len(instance_list)): # print(dim, x) x_data_aux["dim" + str(dim)] = pd.Series(instance_list[dim][x]) x_data_aux["tid"] = x if has_class_labels: x_data_aux["label"] = class_val_list[x] x_data = pd.concat([x_data, x_data_aux]) x_data.reset_index(drop=True, inplace=True) # for dim in range(len(instance_list)): # x_data["dim_" + str(dim)] = instance_list[dim] if has_class_labels: if return_separate_X_and_y: return x_data[:-1], np.asarray(class_val_list) # else: # x_data["class_vals"] = pd.Series(class_val_list) return x_data
[docs] def load_from_ucr_tsv_to_dataframe( full_file_path_and_name, return_separate_X_and_y=True ): """Loads data from a .tsv file into a Pandas DataFrame. Parameters ---------- full_file_path_and_name: str The full pathname of the .tsv file to read. return_separate_X_and_y: bool true then X and Y values should be returned as separate Data Frames ( X) and a numpy array (y), false otherwise. This is only relevant for data. Returns ------- DataFrame, ndarray If return_separate_X_and_y then a tuple containing a DataFrame and a numpy array containing the relevant time-series and corresponding class values. DataFrame If not return_separate_X_and_y then a single DataFrame containing all time-series and (if relevant) a column "class_vals" the associated class values. """ df = pd.read_csv(full_file_path_and_name, sep="\t", header=None) y = df.pop(0).values df.columns -= 1 X = pd.DataFrame() X["dim_0"] = [pd.Series(df.iloc[x, :]) for x in range(len(df))] if return_separate_X_and_y is True: return X, y X["class_val"] = y return X
# assumes data is in a long table format with the following structure: # | case_id | dim_id | reading_id | value # ------------------------------------------------ # 0 | int | int | int | double # 1 | int | int | int | double # 2 | int | int | int | double # 3 | int | int | int | double
[docs] def from_long_to_nested(long_dataframe): # get distinct dimension ids unique_dim_ids = long_dataframe.iloc[:, 1].unique() num_dims = len(unique_dim_ids) data_by_dim = [] indices = [] # get number of distinct cases (note: a case may have 1 or many dimensions) unique_case_ids = long_dataframe.iloc[:, 0].unique() # assume series are indexed from 0 to m-1 (can map to non-linear indices # later if needed) # init a list of size m for each d - to store the series data for m # cases over d dimensions # also, data may not be in order in long format so store index data for # aligning output later # (i.e. two stores required: one for reading id/timestamp and one for # value) for d in range(0, num_dims): data_by_dim.append([]) indices.append([]) for _c in range(0, len(unique_case_ids)): data_by_dim[d].append([]) indices[d].append([]) # go through every row in the dataframe for i in range(0, len(long_dataframe)): # extract the relevant data, catch cases where the dim id is not an # int as it must be the class row = long_dataframe.iloc[i] case_id = int(row[0]) dim_id = int(row[1]) reading_id = int(row[2]) value = row[3] data_by_dim[dim_id][case_id].append(value) indices[dim_id][case_id].append(reading_id) x_data = {} for d in range(0, num_dims): key = "dim_" + str(d) dim_list = [] for i in range(0, len(unique_case_ids)): temp = pd.Series(data_by_dim[d][i], indices[d][i]) dim_list.append(temp) x_data[key] = pd.Series(dim_list) return pd.DataFrame(x_data)
[docs] def load_from_long_to_dataframe(full_file_path_and_name, separator=","): """Loads data from a long format file into a Pandas DataFrame. Parameters ---------- full_file_path_and_name: str The full pathname of the .csv file to read. separator: str The character that the csv uses as a delimiter Returns ------- DataFrame A dataframe with sktime-formatted data """ data = pd.read_csv(full_file_path_and_name, sep=separator, header=0) # ensure there are 4 columns in the long_format table if len(data.columns) != 4: raise LongFormatDataParseException("dataframe must contain 4 columns of data") # ensure that all columns contain the correct data types if ( not data.iloc[:, 0].dtype == "int64" or not data.iloc[:, 1].dtype == "int64" or not data.iloc[:, 2].dtype == "int64" or not data.iloc[:, 3].dtype == "float64" ): raise LongFormatDataParseException( "one or more data columns contains data of an incorrect type" ) data = from_long_to_nested(data) return data
# left here for now, better elsewhere later perhaps
[docs] def generate_example_long_table(num_cases=50, series_len=20, num_dims=2): """Generates example from long table format file. Parameters ---------- num_cases: int Number of cases. series_len: int Length of the series. num_dims: int Number of dimensions. Returns ------- DataFrame """ rows_per_case = series_len * num_dims total_rows = num_cases * series_len * num_dims case_ids = np.empty(total_rows, dtype=np.int) idxs = np.empty(total_rows, dtype=np.int) dims = np.empty(total_rows, dtype=np.int) vals = np.random.rand(total_rows) for i in range(total_rows): case_ids[i] = int(i / rows_per_case) rem = i % rows_per_case dims[i] = int(rem / series_len) idxs[i] = rem % series_len df = pd.DataFrame() df["case_id"] = pd.Series(case_ids) df["dim_id"] = pd.Series(dims) df["reading_id"] = pd.Series(idxs) df["value"] = pd.Series(vals) return df
[docs] def write_results_to_uea_format( path, strategy_name, dataset_name, y_true, y_pred, split="TEST", resample_seed=0, y_proba=None, second_line="N/A", ): if len(y_true) != len(y_pred): raise IndexError( "The number of predicted class values is not the same as the " "number of actual class values" ) try: os.makedirs( str(path) + "/" + str(strategy_name) + "/Predictions/" + str(dataset_name) + "/" ) except os.error: pass # raises os.error if path already exists if split == "TRAIN" or split == "train": train_or_test = "train" elif split == "TEST" or split == "test": train_or_test = "test" else: raise ValueError("Unknown 'split' value - should be TRAIN/train or TEST/test") file = open( str(path) + "/" + str(strategy_name) + "/Predictions/" + str(dataset_name) + "/" + str(train_or_test) + "Fold" + str(resample_seed) + ".csv", "w", ) correct = acc(y_true, y_pred) # the first line of the output file is in the form of: # <classifierName>,<datasetName>,<train/test> file.write( str(strategy_name) + "," + str(dataset_name) + "," + str(train_or_test) + "\n" ) # the second line of the output is free form and classifier-specific; # usually this will record info # such as build time, paramater options used, any constituent model # names for ensembles, etc. file.write(str(second_line) + "\n") # the third line of the file is the accuracy (should be between 0 and 1 # inclusive). If this is a train # output file then it will be a training estimate of the classifier on # the training data only (e.g. # 10-fold cv, leave-one-out cv, etc.). If this is a test output file, # it should be the output # of the estimator on the test data (likely trained on the training data # for a-priori parameter optimisation) file.write(str(correct) + "\n") # from line 4 onwards each line should include the actual and predicted # class labels (comma-separated). If # present, for each case, the probabilities of predicting every class # value for this case should also be # appended to the line (a space is also included between the predicted # value and the predict_proba). E.g.: # # if predict_proba data IS provided for case i: # actual_class_val[i], predicted_class_val[i],,prob_class_0[i], # prob_class_1[i],...,prob_class_c[i] # # if predict_proba data IS NOT provided for case i: # actual_class_val[i], predicted_class_val[i] for i in range(0, len(y_pred)): file.write(str(y_true[i]) + "," + str(y_pred[i])) if y_proba is not None: file.write(",") for j in y_proba[i]: file.write("," + str(j)) file.write("\n") # TODO BUG new line is written only if the # probas are provided!!!! file.close()
[docs] def write_dataframe_to_tsfile( data, path, problem_name="sample_data", timestamp=False, univariate=True, class_label=None, class_value_list=None, equal_length=False, series_length=-1, missing_values="NaN", comment=None, ): """ Output a dataset in dataframe format to .ts file Parameters ---------- data: pandas dataframe the dataset in a dataframe to be written as a ts file which must be of the structure specified in the documentation https://github.com/whackteachers/sktime/blob/master/examples/loading_data.ipynb index | dim_0 | dim_1 | ... | dim_c-1 0 | pd.Series | pd.Series | pd.Series | pd.Series 1 | pd.Series | pd.Series | pd.Series | pd.Series ... | ... | ... | ... | ... n | pd.Series | pd.Series | pd.Series | pd.Series path: str The full path to output the ts file problem_name: str The problemName to print in the header of the ts file and also the name of the file. timestamp: {False, bool}, optional Indicate whether the data contains timestamps in the header. univariate: {True, bool}, optional Indicate whether the data is univariate or multivariate in the header. If univariate, only the first dimension will be written to file class_label: {list, None}, optional Provide class label to show the possible class values for classification problems in the header. class_value_list: {list/ndarray, []}, optional ndarray containing the class values for each case in classification problems equal_length: {False, bool}, optional Indicate whether each series has equal length. It only write to file if true. series_length: {-1, int}, optional Indicate each series length if they are of equal length. It only write to file if true. missing_values: {NaN, str}, optional Representation for missing value, default is NaN. comment: {None, str}, optional Comment text to be inserted before the header in a block. Returns ------- None Notes ----- This version currently does not support writing timestamp data. References ---------- The code for writing series data into file is adopted from https://stackoverflow.com/questions/37877708/ how-to-turn-a-pandas-dataframe-row-into-a-comma-separated-string """ if class_value_list is None: class_value_list = [] # ensure data provided is a dataframe if not isinstance(data, pd.DataFrame): raise ValueError("Data provided must be a DataFrame") # ensure number of cases is same as the class value list if len(data.index) != len(class_value_list) and len(class_value_list) > 0: raise IndexError( "The number of cases is not the same as the number of given " "class values" ) if equal_length and series_length == -1: raise ValueError( "Please specify the series length for equal length time series data." ) # create path if not exist dirt = f"{str(path)}/{str(problem_name)}/" try: os.makedirs(dirt) except os.error: pass # raises os.error if path already exists # create ts file in the path file = open(f"{dirt}{str(problem_name)}_transform.ts", "w") # write comment if any as a block at start of file if comment: file.write("\n# ".join(textwrap.wrap("# " + comment))) file.write("\n") # begin writing header information file.write(f"@problemName {problem_name}\n") file.write(f"@timeStamps {str(timestamp).lower()}\n") file.write(f"@univariate {str(univariate).lower()}\n") # write equal length or series length if provided if equal_length: file.write(f"@equalLength {str(equal_length).lower()}\n") if series_length > 0: file.write(f"@seriesLength {series_length}\n") # write class label line if class_label: space_separated_class_label = " ".join(str(label) for label in class_label) file.write(f"@classLabel true {space_separated_class_label}\n") else: file.write("@class_label false\n") # begin writing the core data for each case # which are the series and the class value list if there is any file.write("@data\n") for case, value in itertools.zip_longest(data.iterrows(), class_value_list): for dimension in case[1:]: # start from the first dimension # split the series observation into separate token # ignoring the header and index series = ( dimension[0] .to_string(index=False, header=False, na_rep=missing_values) .split("\n") ) # turn series into comma-separated row series = ",".join(obsv for obsv in series) file.write(str(series)) # continue with another dimension for multivariate case if not univariate: file.write(":") if value is not None: file.write(f":{value}") # write the case value if any file.write("\n") # open a new line file.close()