Commit f7236e8d authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'tft_nbeats' into 'morphemic-rc1.5'

time series prediction corrected, validation set changed etc.

See merge request !145
parents 95833089 13f17f0d
[loggers]
keys=root,retrain, main
[handlers]
keys=consoleHandler
[formatters]
keys=defaultFormatter
[logger_root]
handlers=consoleHandler
[logger_retrain]
handlers=consoleHandler
level=INFO
qualname=core
propagate=0
[logger_main]
handlers=consoleHandler
level=DEBUG
qualname=__main__
propagate=0
[handler_consoleHandler]
class=logging.StreamHandler
formatter=defaultFormatter
args=(sys.stdout,)
[formatter_defaultFormatter]
format=%(asctime)s %(levelname)s %(asctime)s %(filename)s - %(message)s
\ No newline at end of file
......@@ -129,7 +129,7 @@ def main():
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 120'
# + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 20'
# + "}"
# )
......
data:
csv_path: demo.csv
training:
bs: 8
max_epochs: 8
bs: 64
max_epochs: 20
loss: rmse
dataset:
tv_unknown_reals: []
......
......@@ -92,9 +92,6 @@ def main():
logging.info(
f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
# logging.info(
# f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
# )
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
......@@ -114,10 +111,10 @@ def main():
while True:
start_time = int(time.time())
log.info(
logging.info(
f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
log.info(
logging.info(
f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dataset_preprocessor.prepare_csv()
......@@ -125,22 +122,24 @@ def main():
for metric in predicted_metrics:
predictions = None
for i in range(number_of_forward_predictions[metric]):
print(int((i + 1) * prediction_points_horizon), "point idx")
prediction_msgs, prediction = predict(
metric,
(prediction_cycle * 1000) // msg["publish_rate"],
prediction_length,
extra_data=predictions,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0 + (i + 1) * (prediction_horizon // 1000),
predicted_point_idx=int((i + 1) * prediction_points_horizon - 1),
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
)
# if predictions is not None:
# predictions = pd.concat(
# [predictions, prediction], ignore_index=True
# )
else:
predictions = prediction
......@@ -188,7 +187,6 @@ if __name__ == "__main__":
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
log = logging.getLogger()
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......@@ -200,10 +198,16 @@ if __name__ == "__main__":
time_0 = msg["epoch_start"]
prediction_horizon = msg["prediction_horizon"] * 1000
prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"]
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
log.info(f"Predicted metrics: {predicted_metrics}")
prediction_length = (
msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
* msg["number_of_forward_predictions"]
)
logging.info(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
} # deafult number of forward predictions
......
......@@ -65,7 +65,12 @@ if __name__ == "__main__":
}
for m in msg["all_metrics"]
}
prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"]
prediction_horizon = (
msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
* msg["number_of_forward_predictions"]
)
predicted_metrics = set(metrics_info.keys())
logging.info(
f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......
......@@ -30,14 +30,13 @@ def predict(
m=1,
prediction_hor=60,
timestamp=0,
predicted_point_idx=0,
):
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 10
print(prediction_length, "prediction length")
params["dataset"]["context_length"] = prediction_length * 12
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
......@@ -52,35 +51,49 @@ def predict(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
if not os.path.isfile(data_path):
return (None, None)
dataset = pd.read_csv(data_path)
# dataset[target_column] = range(dataset.shape[0])
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("Not enough fresh data, unable to predict TIME:")
return (None, None)
dataset = new_ts_dataset.dataset
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
dataset = pd.concat([dataset, extra_data[dataset.columns]], ignore_index=True)
lockfile = params["dataloader_path"] + ".pickle"
lock = FileLock(lockfile + ".lock")
if os.path.isfile(lockfile):
with lock:
with open(lockfile, "rb") as handle:
ts_dataset = pickle.load(handle)
ts_dataset.prepare_dataset(dataset)
ts_dataset.dataset["split"] = "train"
# ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
with lock:
with open(lockfile, "rb") as handle:
ts_dataset = pickle.load(handle)
ts_dataset.dataset = dataset
ts_dataset.check_gap()
ts_dataset.dataset["split"] = "train"
print("dataset downloaded from checkpoint")
pred_len = params["dataset"]["prediction_length"]
future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]]
future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len))
future_df["time_idx"] = future_time_idx
future_df = dataset.tail(pred_len).copy()
future_df[target_column] = 0
future_df = pd.concat(
[dataset.tail(params["dataset"]["context_length"]), future_df]
).reset_index()
last_series_length = new_ts_dataset.dataset[
new_ts_dataset.dataset["series"] == new_ts_dataset.dataset["series"].max()
].shape[0]
future_df["time_idx"] = range(
last_series_length - params["dataset"]["context_length"],
last_series_length + pred_len,
)
future_df["split"] = "future"
ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index()
# print(ts_dataset.dataset[target_column])
prediction_input = ts_dataset.inherited_dataset("train", "future")
future_df["series"] = str(new_ts_dataset.dataset.series.max())
model = NBeats.from_dataset(
ts_dataset.ts_dataset,
......@@ -92,13 +105,13 @@ def predict(
lockfile = params["save_path"]
lock = FileLock(lockfile + ".lock")
if os.path.isfile(lockfile):
with lock:
model.load_state_dict(torch.load(model_path))
with lock:
model.load_state_dict(torch.load(model_path))
prediction_input = ts_dataset.get_from_dataset(future_df)
prediction_input = prediction_input.to_dataloader(train=False)
prediction = model.predict(prediction_input, mode="raw")["prediction"]
prediction = model.predict(future_df)
predictions_with_dropout = []
model.train()
......@@ -107,12 +120,11 @@ def predict(
for _ in range(20):
for x, _ in prediction_input:
# make prediction
out = model(x) # raw output is dictionary
out = torch.flatten(model.transform_output(out))[-1]
out = model(x)
out = torch.flatten(model.transform_output(out))[predicted_point_idx]
out = out.item()
predictions_with_dropout.append(out)
# print(model.to_prediction(model.forward(first[0])), "TRANSFORMED")
conf_intervals = st.t.interval(
alpha=0.95,
df=len(predictions_with_dropout) - 1,
......@@ -120,7 +132,9 @@ def predict(
scale=st.sem(predictions_with_dropout),
)
predicted_values = list(conf_intervals) + [torch.flatten(prediction)[-1].item()]
predicted_values = list(conf_intervals) + [
torch.flatten(prediction)[predicted_point_idx].item()
]
predicted_values.sort() # ensure that predictions and confidence intervals are in correct order
msg = {
......@@ -139,8 +153,11 @@ def predict(
"provider": "TODO",
}
}
logging.debug(f"prediction msg: {msg}")
logging.info(
f"prediction msg: {msg} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
future_df = future_df.tail(pred_len)
future_df["split"] = "val"
future_df[target_column] = torch.flatten(torch.mean(prediction, axis=0)).numpy()
future_df[target_column] = prediction[0]
return (msg, future_df)
......@@ -11,9 +11,12 @@ from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy
from pytorch_forecasting import NBeats
from src.preprocess_dataset import Dataset
import pickle
import pytz
from datetime import datetime
"""Script for temporal fusion transformer training"""
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
LOSSES_DICT = {
"mae": MAE(),
......@@ -28,18 +31,20 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 10
params["dataset"]["context_length"] = prediction_length * 12
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
if not os.path.isfile(data_path):
return None
dataset = pd.read_csv(data_path)
# dataset[target_column] = range(dataset.shape[0])
if dataset.shape[0] < 12 * prediction_length:
if dataset.shape[0] < 14 * prediction_length:
logging.info(
f"dataset len: {dataset.shape[0]}, minimum points required: {12 * prediction_length}"
f"dataset len: {dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
......@@ -51,12 +56,16 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
with lock:
with open(lockfile, "wb") as handle:
pickle.dump(ts_dataset, handle)
print(f"train dataset saved: {lockfile}")
logging.info(
f"train dataset saved: {lockfile} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
training = ts_dataset.ts_dataset
validation = ts_dataset.inherited_dataset(
"train", "val"
) # only train and val splits will be used
validation = ts_dataset.get_from_dataset(
ts_dataset.dataset[
ts_dataset.dataset["series"] == ts_dataset.dataset["series"].max()
].tail(14 * prediction_length)
)
bs = params["training"]["bs"]
......@@ -66,7 +75,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
val_dataloader = validation.to_dataloader(train=False, batch_size=bs, num_workers=6)
early_stop_callback = EarlyStopping(
monitor="val_loss", min_delta=1e-5, patience=8, verbose=False, mode="min"
monitor="train_loss", min_delta=1e-5, patience=8, verbose=False, mode="min"
)
lr_logger = LearningRateMonitor()
trainer = pl.Trainer(
......@@ -74,16 +83,17 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
gpus=0,
gradient_clip_val=0.5,
# callbacks=[lr_logger, early_stop_callback],
# checkpoint_callback=False,
checkpoint_callback=False,
logger=None,
)
model = NBeats.from_dataset(
training,
dropout=0.1,
loss=LOSSES_DICT[params["training"]["loss"]],
learning_rate=4e-3,
log_interval=-1,
reduce_on_plateau_patience=5,
weight_decay=1e-2,
widths=[32, 512],
backcast_loss_ratio=1.0,
)
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
......@@ -112,5 +122,4 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
"forecasting_method": os.environ.get("METHOD", "nbetas"),
"timestamp": int(time.time()),
}
print(msg)
return msg
......@@ -2,6 +2,7 @@ import pandas as pd
from pytorch_forecasting import TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
import numpy as np
import logging
pd.options.mode.chained_assignment = None
......@@ -14,15 +15,18 @@ class Dataset(object):
self,
dataset,
target_column="value",
tv_unknown_reals="[]",
known_reals="[]",
tv_unknown_cat="[]",
static_reals="[]",
tv_unknown_reals=[],
known_reals=[],
tv_unknown_cat=[],
static_reals=[],
classification=0,
context_length=40,
prediction_length=5,
):
self.max_missing_values = (
20 # max consecutive missing values allowed per series
)
self.target_column = target_column
self.tv_unknown_cat = tv_unknown_cat
self.known_reals = known_reals
......@@ -31,30 +35,24 @@ class Dataset(object):
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.prepare_dataset(dataset)
self.dataset = dataset
self.check_gap()
# self.dataset = self.cut_nan_start(dataset)
# self.fill_na()
# self.dataset = self.add_obligatory_columns(self.dataset)
# self.dataset = self.convert_formats(self.dataset)
# self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
def prepare_dataset(self, dataset):
self.dataset = self.cut_nan_start(dataset)
self.fill_na()
self.dataset = self.add_obligatory_columns(self.dataset)
self.dataset = self.convert_formats(self.dataset)
self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
def cut_nan_start(self, dataset):
first_not_nan_index = dataset[self.target_column][
dataset[self.target_column] != "None"
].index[0]
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
return dataset[dataset.index > first_not_nan_index]
def fill_na(self):
self.dataset = self.dataset.replace("None", np.nan)
self.dataset = self.dataset.ffill(axis="rows")
def fill_na(self, dataset):
dataset = dataset.replace("None", np.nan)
dataset = dataset.ffill(axis="rows")
return dataset
def convert_formats(self, dataset):
if not self.classification:
......@@ -72,7 +70,7 @@ class Dataset(object):
dataset["series"] = 0
dataset["split"] = "train"
n = dataset.shape[0]
dataset["split"][int(n * 0.9) :] = "val"
dataset["split"][int(n * 0.8) :] = "val"
dataset["time_idx"] = range(n) # TODO check time gaps
return dataset
......@@ -100,10 +98,48 @@ class Dataset(object):
max_prediction_length=self.prediction_length,
min_prediction_length=self.prediction_length,
add_relative_time_idx=False,
allow_missings=True,
allow_missings=False,
)
return ts_dataset
def check_gap(self):
max_gap = self.dataset["time"].diff().abs().max()
logging.info(f"Max time gap in series {max_gap}")
print(f"Max time gap in series {max_gap}")
series_freq = self.dataset["time"].diff().value_counts().index.values[0]
logging.info(f"Detected series with {series_freq} frequency")
print(f"Detected series with {series_freq} frequency")
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * series_freq)
),
)
logging.info(f"{len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
logging.info(f"{len(preprocessed_series)} long enough series found")
print(f"{len(preprocessed_series)} long enough series found")
# logging.info(f"")
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
self.dataset.index = range(self.dataset.shape[0])
def inherited_dataset(self, split1, split2):
df1 = (
self.dataset[lambda x: x.split == split1]
......@@ -122,3 +158,119 @@ class Dataset(object):
return TimeSeriesDataSet.from_dataset(
self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True
)
# class Dataset(object):
# def __init__(
# self,
# dataset,
# target_column="value",
# tv_unknown_reals="[]",
# known_reals="[]",
# tv_unknown_cat="[]",
# static_reals="[]",
# classification=0,
# context_length=40,
# prediction_length=5,
# ):
# self.target_column = target_column
# self.tv_unknown_cat = tv_unknown_cat
# self.known_reals = known_reals
# self.tv_unknown_reals = tv_unknown_reals
# self.static_reals = static_reals
# self.classification = classification
# self.context_length = context_length
# self.prediction_length = prediction_length
# self.prepare_dataset(dataset)
# # self.dataset = self.cut_nan_start(dataset)
# # self.fill_na()
# # self.dataset = self.add_obligatory_columns(self.dataset)
# # self.dataset = self.convert_formats(self.dataset)
# # self.n = dataset.shape[0]
# self.ts_dataset = self.create_time_series_dataset()
# def prepare_dataset(self, dataset):
# self.dataset = self.cut_nan_start(dataset)
# self.fill_na()
# self.dataset = self.add_obligatory_columns(self.dataset)
# self.dataset = self.convert_formats(self.dataset)
# self.n = dataset.shape[0]
# def cut_nan_start(self, dataset):
# first_not_nan_index = dataset[self.target_column][
# dataset[self.target_column] != "None"
# ].index[0]
# return dataset[dataset.index > first_not_nan_index]
# def fill_na(self):
# self.dataset = self.dataset.replace("None", np.nan)
# self.dataset = self.dataset.ffill(axis="rows")
# def convert_formats(self, dataset):
# if not self.classification:
# dataset[self.target_column] = dataset[self.target_column].astype(float)
# else:
# dataset[self.target_column] = dataset[self.target_column].astype(int)
# for name in self.tv_unknown_cat:
# dataset[name] = dataset[name].astype(str)
# dataset["series"] = dataset["series"].astype(str)
# return dataset
# def add_obligatory_columns(self, dataset):
# dataset["series"] = 0
# dataset["split"] = "train"
# n = dataset.shape[0]
# dataset["split"][int(n * 0.9) :] = "val"
# dataset["time_idx"] = range(n) # TODO check time gaps
# return dataset
# def create_time_series_dataset(self):
# if not self.classification:
# self.time_varying_unknown_reals = [
# self.target_column
# ] + self.tv_unknown_reals
# self.time_varying_unknown_categoricals = self.tv_unknown_cat
# else:
# self.time_varying_unknown_reals = self.tv_unknown_reals
# self.time_varying_unknown_categoricals = [
# self.target_column
# ] + self.tv_unknown_cat
# ts_dataset = TimeSeriesDataSet(
# self.dataset[lambda x: x.split == "train"],
# time_idx="time_idx",
# target=self.target_column,
# categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)},
# group_ids=["series"],
# time_varying_unknown_reals=[self.target