Commit 95833089 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'tft_nbeats' into 'morphemic-rc1.5'

errors corrected

See merge request !143
parents abc5fa40 f6ee6f8a
......@@ -11,5 +11,5 @@ INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
TIME_ZONE=Europe/Vienna
[loggers]
keys=root,retrain, main
[handlers]
keys=consoleHandler
[formatters]
keys=defaultFormatter
[logger_root]
handlers=consoleHandler
[logger_retrain]
handlers=consoleHandler
level=INFO
qualname=core
propagate=0
[logger_main]
handlers=consoleHandler
level=DEBUG
qualname=__main__
propagate=0
[handler_consoleHandler]
class=logging.StreamHandler
formatter=defaultFormatter
args=(sys.stdout,)
[formatter_defaultFormatter]
format=%(asctime)s %(levelname)s %(asctime)s %(filename)s - %(message)s
\ No newline at end of file
......@@ -6,7 +6,10 @@ from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
import pytz
from pytz import timezone
from datetime import datetime
# from src.log import logger
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
......@@ -17,9 +20,16 @@ METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
# logging.basicConfig(
# filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
# level=logging.INFO,
# datefmt="%Y-%m-%d %H:%M:%S",
# format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
# )
# logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple()
# import logging.config
def run_process(args):
......@@ -47,8 +57,7 @@ class StartListener(stomp.ConnectionListener):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print(self.topic_name)
logging.debug(f" Body: {frame.body}")
logging.info(f" Body: {frame.body}")
message = json.loads(frame.body)
global publish_rate, all_metrics
......@@ -83,12 +92,19 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
logging.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.info(
f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
log = logging.getLogger()
log.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
)
start_app_conn = morphemic.Connection(
......
......@@ -2,7 +2,7 @@ data:
csv_path: demo.csv
training:
bs: 8
max_epochs: 5
max_epochs: 8
loss: rmse
dataset:
tv_unknown_reals: []
......@@ -22,3 +22,5 @@ prediction:
bs: 8
save_path:
models
dataloader_path:
dataloader
......@@ -11,12 +11,9 @@ import pandas as pd
import logging
from datetime import datetime
from src.dataset_maker import CSVData
from pytz import timezone
import pytz
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
from datetime import datetime
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -92,13 +89,17 @@ def main():
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
logging.debug("dataset downloaded")
logging.info(f"Dataset downloaded")
logging.info(
f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
# logging.info(
# f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
# )
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
logging.debug(
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
logging.info(
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dataset_preprocessor.prepare_csv()
......@@ -113,8 +114,12 @@ def main():
while True:
start_time = int(time.time())
logging.debug("prediction")
logging.info(f"prediction loop started")
log.info(
f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
log.info(
f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
......@@ -140,30 +145,50 @@ def main():
predictions = prediction
if prediction_msgs:
logging.info(f"Sending predictions for {metric} metric")
logging.info(
f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
)
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds'
logging.info(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}'
)
logging.info(
f"Message: {prediction_msgs[metric]}, destination: {dest}"
f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
end_time = int(time.time())
print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds")
time_0 = time_0 + prediction_cycle
time.sleep(prediction_cycle - (end_time - start_time))
time_to_wait = prediction_cycle - (end_time - start_time)
if time_to_wait < 0:
time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle)
logging.info(
f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time_0 = time_0 + prediction_cycle
time.sleep(time_to_wait)
if __name__ == "__main__":
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
log = logging.getLogger()
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......@@ -178,7 +203,7 @@ if __name__ == "__main__":
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
logging.debug(f"Predicted metrics: {predicted_metrics}")
log.info(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
} # deafult number of forward predictions
......
......@@ -6,6 +6,9 @@ import time
from src.model_train import train
from amq_message_python_library import *
from src.dataset_maker import CSVData
import pytz
import time
from datetime import datetime
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 10 # minutes
......@@ -14,12 +17,7 @@ AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
print(os.listdir("./logs"), "files")
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
def main(predicted_metrics, prediction_horizon):
......@@ -36,15 +34,25 @@ def main(predicted_metrics, prediction_horizon):
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
if retrain_msg:
logging.info(f"Training completed for {metric} metric")
logging.info(
f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
else:
print("Not enough data for model training, waiting ...")
logging.info("Not enough data for model training, waiting ...")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
logging.info(
f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
end_time = int(time.time())
time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time))
time_to_wait = 60 * RETRAIN_CYCLE - (end_time - start_time)
if time_to_wait < 0:
time_to_wait = 60 * RETRAIN_CYCLE - (time_to_wait % 60 * RETRAIN_CYCLE)
logging.info(
f"Waiting for the next training: {time_to_wait} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time.sleep(time_to_wait)
if __name__ == "__main__":
......@@ -59,5 +67,7 @@ if __name__ == "__main__":
}
prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"]
predicted_metrics = set(metrics_info.keys())
logging.debug(f"Predicted metrics: {predicted_metrics}")
logging.info(
f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
main(predicted_metrics, prediction_horizon)
import logging
from pytz import timezone
from datetime import datetime
import os
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="BBB %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple()
logger = logging
from pytorch_forecasting.metrics import RMSE
from pytorch_forecasting.metrics import RMSE, MAE
import yaml
import pandas as pd
import numpy as np
......@@ -10,6 +10,11 @@ from src.preprocess_dataset import Dataset
from pytorch_forecasting import NBeats
import scipy.stats as st
import logging
import pickle
import pytz
from datetime import datetime
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
"""Script for nbeats fusion transformer prediction"""
logging.basicConfig(
......@@ -26,23 +31,44 @@ def predict(
prediction_hor=60,
timestamp=0,
):
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 10
print(prediction_length, "prediction length")
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
if not os.path.isfile(model_path): # no pretrained model, unable to predict
logging.info(
f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("no pretrained model, unable to predict")
return (None, None)
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).tail(1000)
dataset = pd.read_csv(data_path)
# dataset[target_column] = range(dataset.shape[0])
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
lockfile = params["dataloader_path"] + ".pickle"
lock = FileLock(lockfile + ".lock")
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if os.path.isfile(lockfile):
with lock:
with open(lockfile, "rb") as handle:
ts_dataset = pickle.load(handle)
ts_dataset.prepare_dataset(dataset)
ts_dataset.dataset["split"] = "train"
# ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
pred_len = params["dataset"]["prediction_length"]
......@@ -52,8 +78,9 @@ def predict(
future_df["split"] = "future"
ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index()
# print(ts_dataset.dataset[target_column])
prediction_input = ts_dataset.inherited_dataset("val", "future")
prediction_input = ts_dataset.inherited_dataset("train", "future")
model = NBeats.from_dataset(
ts_dataset.ts_dataset,
......@@ -65,13 +92,6 @@ def predict(
lockfile = params["save_path"]
lock = FileLock(lockfile + ".lock")
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
if not os.path.isfile(model_path): # no pretrained model, unable to predict
logging.info("no pretrained model, unable to predict")
print("no pretrained model, unable to predict")
return (None, None)
if os.path.isfile(lockfile):
with lock:
model.load_state_dict(torch.load(model_path))
......@@ -82,7 +102,7 @@ def predict(
predictions_with_dropout = []
model.train()
model.loss = RMSE()
model.loss = MAE()
with torch.no_grad():
for _ in range(20):
for x, _ in prediction_input:
......
......@@ -10,10 +10,7 @@ from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor
from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy
from pytorch_forecasting import NBeats
from src.preprocess_dataset import Dataset
logging.basicConfig(
filename=f"/logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
import pickle
"""Script for temporal fusion transformer training"""
......@@ -37,7 +34,8 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).tail(1000)
dataset = pd.read_csv(data_path)
# dataset[target_column] = range(dataset.shape[0])
if dataset.shape[0] < 12 * prediction_length:
logging.info(
......@@ -47,6 +45,14 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
lockfile = params["dataloader_path"] + ".pickle"
lock = FileLock(lockfile + ".lock")
with lock:
with open(lockfile, "wb") as handle:
pickle.dump(ts_dataset, handle)
print(f"train dataset saved: {lockfile}")
training = ts_dataset.ts_dataset
validation = ts_dataset.inherited_dataset(
"train", "val"
......@@ -67,8 +73,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
max_epochs=params["training"]["max_epochs"],
gpus=0,
gradient_clip_val=0.5,
callbacks=[lr_logger, early_stop_callback],
checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe?
# callbacks=[lr_logger, early_stop_callback],
# checkpoint_callback=False,
logger=None,
)
model = NBeats.from_dataset(
......
......@@ -31,12 +31,20 @@ class Dataset(object):
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.prepare_dataset(dataset)
# self.dataset = self.cut_nan_start(dataset)
# self.fill_na()
# self.dataset = self.add_obligatory_columns(self.dataset)
# self.dataset = self.convert_formats(self.dataset)
# self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
def prepare_dataset(self, dataset):
self.dataset = self.cut_nan_start(dataset)
self.fill_na()
self.dataset = self.add_obligatory_columns(self.dataset)
self.dataset = self.convert_formats(self.dataset)
self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
def cut_nan_start(self, dataset):
first_not_nan_index = dataset[self.target_column][
......@@ -64,7 +72,7 @@ class Dataset(object):
dataset["series"] = 0
dataset["split"] = "train"
n = dataset.shape[0]
dataset["split"][int(n * 0.8) :] = "val"
dataset["split"][int(n * 0.9) :] = "val"
dataset["time_idx"] = range(n) # TODO check time gaps
return dataset
......@@ -109,3 +117,8 @@ class Dataset(object):
self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True
)
return inh_dataset
def get_from_dataset(self, dataset):
return TimeSeriesDataSet.from_dataset(
self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True
)
......@@ -6,7 +6,10 @@ from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
import pytz
from pytz import timezone
from datetime import datetime
# from src.log import logger
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
......@@ -17,9 +20,16 @@ METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
# logging.basicConfig(
# filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
# level=logging.INFO,
# datefmt="%Y-%m-%d %H:%M:%S",
# format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
# )
# logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple()
# import logging.config
def run_process(args):
......@@ -47,8 +57,7 @@ class StartListener(stomp.ConnectionListener):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print(self.topic_name)
logging.debug(f" Body: {frame.body}")
logging.info(f" Body: {frame.body}")
message = json.loads(frame.body)
global publish_rate, all_metrics
......@@ -83,12 +92,19 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
logging.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.info(
f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
log = logging.getLogger()
log.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
)
start_app_conn = morphemic.Connection(
......
......@@ -2,7 +2,7 @@ data:
csv_path: demo.csv
training:
bs: 8
max_epochs: 1
max_epochs: 10
loss: quantile
dataset:
tv_unknown_reals: []
......
......@@ -11,12 +11,9 @@ import pandas as pd
import logging
from datetime import datetime
from src.dataset_maker import CSVData
from pytz import timezone
import pytz
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
from datetime import datetime
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -92,13 +89,17 @@ def main():
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
logging.debug("dataset downloaded")
logging.info(f"Dataset downloaded")
logging.info(
f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
# logging.info(
# f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
# )