Commit e6ff94b0 authored by mriedl's avatar mriedl
Browse files

Merge remote-tracking branch 'remotes/origin/morphemic-rc1.5' into predictionOrchestrator

parents d9a27e26 7ac51b5c
Pipeline #16225 passed with stage
in 1 minute and 2 seconds
......@@ -17,6 +17,8 @@ variables:
SLO_SEVERITY_CALCULATOR_CLI: "mvn --batch-mode -f morphemic-slo-severity-calculator/pom.xml"
PREDICTON_ORCHESTRATOR_CLI: "mvn --batch-mode -N -f prediction_orchestrator/pom.xml"
DOCKER_FORECAST_ESHYBRID_DOCKER_NAME: 'morphemic_forecasting_eshybird'
cache:
paths:
- maven_repo/
......@@ -44,21 +46,29 @@ build:scheduling-abstraction-layer:
script:
- $SCHEDULING_ABSTRACTION_LAYER_CLI clean install
artifacts:
expire_in: 1 week
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/org/activeeon/scheduling-abstraction-layer/
build:amq-message-java-library:
stage: deployLibrary
image: $MAVEN_IMAGE
only:
- master
- morphemic-rc1.5
script:
- $AMQ_MESSAGE_JAVA_LIBRARY_CLI clean install
artifacts:
expire_in: 1 week
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/gr/ntua/imu/morphemic/amq-message-java-library/
build:prediction_orchestrator:
stage: build
image: $MAVEN_IMAGE
only:
- master
- morphemic-rc1.5
script:
- $PREDICTON_ORCHESTRATOR_CLI -Pwithout-docker clean install
artifacts:
......@@ -68,9 +78,13 @@ build:prediction_orchestrator:
build:slo-severity-calculator:
stage: build
image: $MAVEN_IMAGE
only:
- master
- morphemic-rc1.5
script:
- $SLO_SEVERITY_CALCULATOR_CLI -Dtest=!UnboundedMonitoringAttributeTests,!ConnectivityTests clean install
artifacts:
expire_in: 1 week
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/gr/ntua/imu/morphemic/SLOSeverityCalculator/
......@@ -80,7 +94,6 @@ deploy:performance-model:
only:
- master
- morphemic-rc1.5
- proactive-dev
services:
- $DOCKER_DIND_SERVICE
script:
......@@ -152,7 +165,6 @@ deploy:prediction_orchestrator:
only:
- master
- morphemic-rc1.5
- predictionOrchestrator
services:
- $DOCKER_DIND_SERVICE
dependencies:
......@@ -212,3 +224,34 @@ deploy:tft:
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag tft:latest $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH
deploy:arima:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- docker build -t arima -f ./deployment/arima/Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag arima:latest $CI_REGISTRY_IMAGE/arima:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/arima:$CI_COMMIT_BRANCH
deploy:morphemic-forecasting-eshybrid:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- cd morphemic-forecasting-eshybrid
- docker build -t $DOCKER_FORECAST_ESHYBRID_DOCKER_NAME .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag $DOCKER_FORECAST_ESHYBRID_DOCKER_NAME:latest $CI_REGISTRY_IMAGE/$DOCKER_FORECAST_ESHYBRID_DOCKER_NAME:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/$DOCKER_FORECAST_ESHYBRID_DOCKER_NAME:$CI_COMMIT_BRANCH
......@@ -336,12 +336,12 @@ public class BrokerClient {
}
// ------------------------------------------------------------------------
/*
public synchronized void openConnection() throws JMSException {
checkProperties();
openConnection(properties.getBrokerUrl(), null, null);
}
*/
public synchronized void openConnection(String connectionString) throws JMSException {
openConnection(connectionString, null, null);
}
......
docker run -t --env-file=env --network=host stomp_app
import os
from multiprocessing import Pool
import stomp
import json
from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
import pytz
from pytz import timezone
from datetime import datetime
import time
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
......@@ -17,10 +17,6 @@ METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
def run_process(args):
print("running")
......@@ -28,10 +24,7 @@ def run_process(args):
def start_training(metrics_to_predict):
processes = (("retrain.py", metrics_to_predict), ("predict.py", metrics_to_predict))
pool = Pool(processes=2)
pool.map(run_process, processes)
run_process(("predict.py", metrics_to_predict))
class StartListener(stomp.ConnectionListener):
......@@ -47,8 +40,7 @@ class StartListener(stomp.ConnectionListener):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print(self.topic_name)
logging.debug(f" Body: {frame.body}")
logging.info(f" Body: {frame.body}")
message = json.loads(frame.body)
global publish_rate, all_metrics
......@@ -83,12 +75,19 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
logging.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.info(
f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
log = logging.getLogger()
log.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
)
start_app_conn = morphemic.Connection(
......@@ -109,11 +108,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}]'
# msg1.body = '[{"metric": "value", "level": 3, "publish_rate": 30000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 120'
# + f'"metrics": ["value"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + "}"
# )
......
......@@ -11,12 +11,9 @@ import pandas as pd
import logging
from datetime import datetime
from src.dataset_maker import CSVData
from pytz import timezone
import pytz
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
from datetime import datetime
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -92,13 +89,14 @@ def main():
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
logging.debug("dataset downloaded")
logging.info(f"Dataset downloaded")
logging.info(
f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
logging.debug(
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
logging.info(
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dataset_preprocessor.prepare_csv()
......@@ -113,57 +111,57 @@ def main():
while True:
start_time = int(time.time())
logging.debug("prediction")
logging.info(f"prediction loop started")
logging.info(
f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
logging.info(
f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
predictions = None
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
(prediction_cycle * 1000) // msg["publish_rate"],
extra_data=predictions,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0 + (i + 1) * (prediction_horizon // 1000),
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
)
else:
predictions = prediction
prediction_msgs = predict(
metric,
prediction_length,
prediction_hor=prediction_horizon,
timestamp=time_0,
)
if prediction_msgs:
logging.info(f"Sending predictions for {metric} metric")
logging.info(
f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
)
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds'
)
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
for message in prediction_msgs:
logging.info(
f"Message: {prediction_msgs[metric]}, destination: {dest}"
f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
start_conn.send_to_topic(dest, message[metric])
influxdb_conn.send_to_influxdb(metric, message)
end_time = int(time.time())
print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds")
time_0 = time_0 + prediction_cycle
time.sleep(prediction_cycle - (end_time - start_time))
time_to_wait = prediction_cycle - (end_time - start_time)
if time_to_wait < 0:
time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle)
logging.info(
f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time_0 = time_0 + prediction_cycle
time.sleep(time_to_wait)
if __name__ == "__main__":
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'arima')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......@@ -175,10 +173,16 @@ if __name__ == "__main__":
time_0 = msg["epoch_start"]
prediction_horizon = msg["prediction_horizon"] * 1000
prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"]
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
logging.debug(f"Predicted metrics: {predicted_metrics}")
prediction_length = (
msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
* msg["number_of_forward_predictions"]
)
logging.info(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
} # deafult number of forward predictions
......
......@@ -6,6 +6,9 @@ import time
from src.model_train import train
from amq_message_python_library import *
from src.dataset_maker import CSVData
import pytz
import time
from datetime import datetime
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 10 # minutes
......@@ -14,13 +17,12 @@ AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
print(os.listdir("./"), "files")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(
......@@ -36,15 +38,25 @@ def main(predicted_metrics, prediction_horizon):
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
if retrain_msg:
logging.info(f"Training completed for {metric} metric")
logging.info(
f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
else:
print("Not enough data for model training, waiting ...")
logging.info("Not enough data for model training, waiting ...")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
logging.info(
f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
end_time = int(time.time())
time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time))
time_to_wait = 60 * RETRAIN_CYCLE - (end_time - start_time)
if time_to_wait < 0:
time_to_wait = 60 * RETRAIN_CYCLE - (time_to_wait % 60 * RETRAIN_CYCLE)
logging.info(
f"Waiting for the next training: {time_to_wait} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time.sleep(time_to_wait)
if __name__ == "__main__":
......@@ -57,7 +69,14 @@ if __name__ == "__main__":
}
for m in msg["all_metrics"]
}
prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"]
prediction_horizon = (
msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
* msg["number_of_forward_predictions"]
)
predicted_metrics = set(metrics_info.keys())
logging.debug(f"Predicted metrics: {predicted_metrics}")
logging.info(
f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
main(predicted_metrics, prediction_horizon)
#!/bin/sh
python3 morphemic-datasetmaker/setup.py install
# rm -r morphemic-datasetmaker
# python3 main.py
# mv amq-message-python-library amq_message_python_library
......@@ -8,70 +8,85 @@ from filelock import FileLock
from src.preprocess_dataset import Dataset
import time
import logging
import pytz
from datetime import datetime
pd.options.mode.chained_assignment = None
"""Script for temporal fusion transformer prediction"""
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
def predict(
target_column,
prediction_length,
yaml_file="model.yaml",
extra_data=None,
m=1,
prediction_hor=60,
timestamp=0,
):
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 10
params["dataset"]["context_length"] = prediction_length * 5
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).head(500)
if not os.path.isfile(data_path):
logging.info("Dataset not found")
return None
dataset = pd.read_csv(data_path)
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("Not enough fresh data, unable to predict TIME:")
return None
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
dataset = pd.read_csv(data_path)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
ts_dataset = ts_dataset.dataset[
ts_dataset.dataset.series == ts_dataset.dataset.series.max()
]
pred_len = params["dataset"]["prediction_length"]
best_model_aic = None
sarima = sm.tsa.statespace.SARIMAX(ts_dataset.dataset[target_column])
model = sarima.fit()
predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05)
future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]]
for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0)]:
sarima = sm.tsa.statespace.SARIMAX(ts_dataset[target_column], order=order)
model = sarima.fit()
if best_model_aic:
if model.aic < best_model_aic:
predictions = model.get_forecast(
pred_len, return_conf_int=True, alpha=0.05
)
best_model_aic = model.aic
else:
predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05)
best_model_aic = model.aic
future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len))
future_df["time_idx"] = future_time_idx
future_df["split"] = "future"
msgs = []
ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index(
drop=True
)
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[-1],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[-1].values[0],
predictions.conf_int(alpha=0.05).iloc[-1].values[1],
], # quantiles difference
"predictionTime": timestamp,
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
for i in range(pred_len):
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[i],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[i].values[0],
predictions.conf_int(alpha=0.05).iloc[i].values[1],
], # quantiles difference
"predictionTime": timestamp + (i + 1) * (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
}
logging.debug(f"prediction msg: {msg}")
future_df["split"] = "val"
future_df[target_column] = predictions.predicted_mean.values
return (msg, future_df)
msgs.append(msg)
return msgs
import yaml
import pandas as pd
from filelock import FileLock
from src.preprocess_dataset import Dataset
"""Script for temporal fusion transformer training"""
import time
import os
def train(target_column, prediction_length, yaml_file="model.yaml"):
msg = None
msg = {
"metrics": [target_column],
"forecasting_method": os.environ.get("METHOD", "tft"),
"timestamp": int(time.time()),
}
return msg
import pandas as pd
import numpy as np
import logging
pd.options.mode.chained_assignment = None
......@@ -12,15 +13,18 @@ class Dataset(object):
self,
dataset,
target_column="value",
tv_unknown_reals="[]",
known_reals="[]",
tv_unknown_cat="[]",
static_reals="[]",
tv_unknown_reals=[],
known_reals=[],
tv_unknown_cat=[],
static_reals=[],
classification=0,
context_length=40,
prediction_length=5,
):