Commit 3aef9ba4 authored by Anna Warno's avatar Anna Warno
Browse files

tft processes names added

parent 067ca54e
Pipeline #16756 passed with stage
in 1 minute and 23 seconds
......@@ -3,17 +3,20 @@ import stomp
import json
from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
from pytz import timezone
from datetime import datetime
import time
import setproctitle
# from src.log import logger
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "nbeats")
METHOD = os.environ.get("METHOD", "model")
START_TOPIC = f"start_forecasting.{METHOD}"
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
......@@ -75,8 +78,9 @@ class Msg(object):
def main():
setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
filename=f"logs/{os.environ.get('METHOD', 'model')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
......@@ -121,6 +125,7 @@ def main():
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
time.sleep(60)
pass
......
......@@ -126,6 +126,7 @@ def main():
single_prediction_points_length=prediction_points_horizons[metric],
prediction_hor=prediction_horizon,
timestamp=time_0,
publish_rate=metrics_info[metric]["publish_rate"],
)
if prediction_msgs:
......@@ -138,7 +139,16 @@ def main():
start_conn.send_to_topic(dest, message[metric])
influxdb_conn.send_to_influxdb(metric, message)
current_time = int(time.time())
logging.info(
f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
end_time = int(time.time())
time_0 = time_0 + prediction_cycle
time_to_wait = time_0 - end_time
if time_to_wait < 0:
......
......@@ -5,5 +5,4 @@ filelock==3.0.12
pyyaml
influxdb
python-slugify
setproctitle
......@@ -94,10 +94,10 @@ class Dataset(object):
last_timestamp_database = self.dataset[self.time_column].values[-1]
current_time = int(time.time())
print(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database / 1000}"
)
logging.info(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database / 1000}"
)
def check_gap(self):
......
import sys
sys.path.append(".")
import pytest
from src.model_train import train
import pandas as pd
import numpy as np
import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"ems_time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
df["metric_0"] = np.nan
return df
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:3
]
]
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["ems_time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["ems_time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None")
print(df)
return df
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf)
return df
@pytest.fixture
def df_9():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_10():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.inf
return df
@pytest.fixture
def df_11():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
if i % 2 == 0:
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_12():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:6000
]
]
for i in range(5):
df[f"metric_{i}"] = [
np.nan if i % 2 == 0 else random.random() for i in range(6000)
]
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
@pytest.fixture
def metric():
return "metric_0"
@pytest.fixture
def prediction_length():
return 60
@pytest.mark.parametrize(
"df,metric,prediction_length",
[
("df_1", metric, prediction_length),
("df_2", metric, prediction_length),
("df_3", metric, prediction_length),
("df_4", metric, prediction_length),
("df_5", metric, prediction_length),
("df_6", metric, prediction_length),
("df_7", metric, prediction_length),
("df_8", metric, prediction_length),
("df_9", metric, prediction_length),
("df_10", metric, prediction_length),
("df_11", metric, prediction_length),
("df_12", metric, prediction_length),
],
indirect=True,
)
def test_predict(df, metric, prediction_length):
df.to_csv("demo.csv")
output = train(metric, prediction_length)
print(output)
if output:
print("True")
assert True
......@@ -7,7 +7,8 @@ import logging
import time
from datetime import datetime
from pytz import timezone
from datetime import datetime
import time
import setproctitle
# from src.log import logger
......@@ -81,8 +82,9 @@ class Msg(object):
def main():
setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
filename=f"logs/{os.environ.get('METHOD', 'model')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
......@@ -127,6 +129,7 @@ def main():
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
time.sleep(60)
pass
......
......@@ -15,6 +15,7 @@ from pytz import timezone
import pytz
from datetime import datetime
import random
import setproctitle
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -139,6 +140,9 @@ def main():
print(
f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
logging.info(
f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
else:
predictions = prediction
......@@ -178,6 +182,7 @@ def main():
if __name__ == "__main__":
setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}_predict")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
)
......
......@@ -6,5 +6,4 @@ filelock==3.0.12
influxdb
python-slugify
torchmetrics==0.5.0
setproctitle
......@@ -10,9 +10,10 @@ import pytz
import time
from pytz import timezone
from datetime import datetime
import setproctitle
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 2 # minutes
RETRAIN_CYCLE = 10 # minutes
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
......@@ -61,6 +62,7 @@ def main(predicted_metrics, prediction_horizons):
if __name__ == "__main__":
setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}_retrain")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
)
......
......@@ -114,7 +114,11 @@ def predict(
lock = FileLock(lockfile + ".lock")
with lock:
model.load_state_dict(torch.load(model_path))
if os.path.isfile(model_path):
model.load_state_dict(torch.load(model_path))
logging.info("Model corrupted unable to predict")
else:
return (None, None)
prediction_input = ts_dataset.get_from_dataset(future_df)
prediction_input = prediction_input.to_dataloader(train=False)
......
......@@ -62,7 +62,6 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate
publish_rate=publish_rate,
)
print(ts_dataset.dataset, "DATASEEEET")
if ts_dataset.dataset.shape[0] < 1:
logging.info(
f"METRIC: {target_column} Preprocessed dataset len: {ts_dataset.dataset.shape[0]}, minimum points required: {(params['context_length_ratio'] + 2) * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......@@ -124,8 +123,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate
if os.path.isfile(lockfile):
print("downloading weigths")
with lock:
model.load_state_dict(torch.load(model_path))
if os.path.isfile(model_path):
with lock:
model.load_state_dict(torch.load(model_path))
trainer.fit(
model,
......
import sys
sys.path.append(".")
import pytest
from src.model_train import train
import pandas as pd
import numpy as np
import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"ems_time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
df["metric_0"] = np.nan
return df
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:3
]
]
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["ems_time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["ems_time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None")
print(df)
return df
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf)
return df
@pytest.fixture
def df_9():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_10():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.inf
return df
@pytest.fixture
def df_11():
df = pd.DataFrame()
df["ems_time"] = [
int(x)
for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[
:1000
]
]
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
if i % 2 == 0:
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.nan
return df