Commit b058c02a authored by Anna Warno's avatar Anna Warno
Browse files

miliseconds -> seconds

parent 59f73c72
AMQ_HOSTNAME=localhost
AMQ_USER=admin
AMQ_PASSWORD=admin
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_PORT=61613
AMQ_HOST=147.102.17.76
AMQ_PORT_BROKER=61610
APP_NAME=demo
METHOD=nbeats
DATA_PATH=./
......
......@@ -7,6 +7,8 @@ import logging
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "tft")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -74,11 +76,14 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
print("start")
print()
start_app_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_app_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_app_conn.connect()
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_APP_TOPIC}", "1", ack="auto")
......@@ -88,15 +93,15 @@ def main():
start_app_conn.conn.set_listener(
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# print("start")
# msg1 = Msg()
# msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60}]'
# msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60000}]'
# msg2 = Msg()
# msg2.body = """{
# "metrics": ["cpu_usage"],
# "timestamp": 1623057648907,
# "epoch_start": 1623057698298,
# "number_of_forward_predictions": 5,
# "timestamp": 0,
# "epoch_start": 0,
# "number_of_forward_predictions": 8,
# "prediction_horizon": 120}"""
# StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
......
......@@ -3,7 +3,7 @@ data:
training:
bs: 8
max_epochs: 1
loss: mae
loss: quantile
dataset:
tv_unknown_reals: []
known_reals: []
......@@ -12,6 +12,12 @@ dataset:
context_length: 10
prediction_length: 5
classification: 0
model:
learning_rate: 0.05
hidden_size: 32
attention_head_size: 1
hidden_continuous_size: 16
output_size: 7
prediction:
bs: 8
save_path:
......
......@@ -22,6 +22,8 @@ PREDICTION_CYCLE = 1 # minutes
APP_NAME = os.environ.get("APP_NAME", "demo")
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
class CustomListener(stomp.ConnectionListener):
......@@ -65,10 +67,14 @@ class CustomListener(stomp.ConnectionListener):
def main():
logging.debug(f"metrics to predict {predicted_metrics}")
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
stop_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
stop_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
stop_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto")
......@@ -86,39 +92,46 @@ def main():
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
logging.debug(
f"waiting {(msg['epoch_start'] - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
)
time.sleep(
max(0, (msg["epoch_start"] - int(time.time()) * 1000 - prediction_cycle * 1000))
max(
0,
(
msg["epoch_start"] * 1000
- int(time.time()) * 1000
- prediction_cycle * 1000
),
)
// 1000
)
tl = Timeloop()
dataset_preprocessor.prepare_csv()
for metric in predicted_metrics:
predictions = None
global time_0
time_0 = time_0 + prediction_cycle * 1000
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
(prediction_cycle * 1000) // msg["publish_rate"],
extra_data=predictions,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0,
)
if predictions is not None:
predictions = pd.concat([predictions, prediction], ignore_index=True)
else:
predictions = prediction
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
# dataset_preprocessor.prepare_csv()
# for metric in predicted_metrics:
# predictions = None
# # global time_0
# # time_0 = time_0 + prediction_cycle
# for i in range(number_of_forward_predictions[metric]):
# prediction_msgs, prediction = predict(
# metric,
# (prediction_cycle * 1000) // msg["publish_rate"],
# extra_data=predictions,
# m=i + 1,
# prediction_hor=prediction_horizon,
# timestamp=time_0,
# )
# if predictions is not None:
# predictions = pd.concat([predictions, prediction], ignore_index=True)
# else:
# predictions = prediction
# if prediction_msgs:
# dest = f"{PRED_TOPIC_PREF}.{metric}"
# start_conn.send_to_topic(dest, prediction_msgs[metric])
# influxdb_conn.send_to_influxdb(metric, prediction_msgs)
@tl.job(interval=timedelta(seconds=prediction_cycle))
def metric_predict():
......@@ -127,7 +140,20 @@ def main():
for metric in predicted_metrics:
predictions = None
global time_0
time_0 = time_0 + prediction_cycle * 1000
time_1 = time_0
print("time 00000000 ", time_0)
# time_0 = time_0 + prediction_cycle
time_0 = time_0 + int(
(
(int(time.time()) - time_0)
/ (prediction_cycle // number_of_forward_predictions[metric])
)
) * (prediction_cycle // number_of_forward_predictions[metric])
print("time 00000000 ", int(time.time()) - time_1)
# print(f"time_0 difference seconds {int(time.time()) - time_0}")
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
......@@ -135,8 +161,12 @@ def main():
extra_data=predictions,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0,
timestamp=time_0 + (i + 1) * prediction_horizon,
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {time_0 + (i + 1) *prediction_horizon // 1000 - int(time.time())}"
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
......
......@@ -14,11 +14,15 @@ RETRAIN_CYCLE = 2 # minutes
HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613))
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
tl = Timeloop()
......
......@@ -30,7 +30,7 @@ def predict(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path)
dataset = pd.read_csv(data_path).head(1000)
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
......@@ -80,13 +80,13 @@ def predict(
target_column: {
"metricValue": prediction[-1][-1][2].item(),
"level": 1, # TODO
"timestamp": time.time_ns() // 1_000_000,
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
prediction[-1][-1][0].item(),
prediction[-1][-1][-1].item(),
], # quantiles difference
"predictionTime": timestamp + prediction_hor * m,
"predictionTime": timestamp + prediction_hor * m // 1000,
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
......
......@@ -32,6 +32,7 @@ class Dataset(object):
self.prediction_length = prediction_length
self.add_obligatory_columns(dataset)
self.dataset = self.convert_formats(dataset)
# self.dataset.columns[2] = "AvgResponseTime"
self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
......
AMQ_HOSTNAME=localhost
AMQ_USER=admin
AMQ_PASSWORD=admin
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_PORT=61613
APP_NAME=demo
METHOD=tft
......@@ -10,3 +10,5 @@ INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
AMQ_HOST=147.102.17.76
AMQ_PORT_BROKER=61610
......@@ -7,6 +7,8 @@ import logging
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "tft")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -75,9 +77,13 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
start_app_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_app_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_app_conn.connect()
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_APP_TOPIC}", "1", ack="auto")
......@@ -88,18 +94,18 @@ def main():
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# msg1 = Msg()
# msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60000}]'
# msg2 = Msg()
# msg2.body = """{
# "metrics": ["cpu_usage"],
# "timestamp": 0,
# "epoch_start": 0,
# "number_of_forward_predictions": 8,
# "prediction_horizon": 600}"""
# StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
msg1 = Msg()
msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60000}]'
msg2 = Msg()
msg2.body = """{
"metrics": ["cpu_usage"],
"timestamp": 0,
"epoch_start": 0,
"number_of_forward_predictions": 8,
"prediction_horizon": 120}"""
StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
pass
......
......@@ -19,6 +19,6 @@ model:
hidden_continuous_size: 16
output_size: 7
prediction:
bs: 8
bs: 64
save_path:
models
......@@ -22,6 +22,8 @@ PREDICTION_CYCLE = 1 # minutes
APP_NAME = os.environ.get("APP_NAME", "demo")
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
class CustomListener(stomp.ConnectionListener):
......@@ -65,10 +67,14 @@ class CustomListener(stomp.ConnectionListener):
def main():
logging.debug(f"metrics to predict {predicted_metrics}")
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
stop_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
stop_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
stop_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto")
......@@ -86,39 +92,46 @@ def main():
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
logging.debug(
f"waiting {(msg['epoch_start'] - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
)
time.sleep(
max(0, (msg["epoch_start"] - int(time.time()) * 1000 - prediction_cycle * 1000))
max(
0,
(
msg["epoch_start"] * 1000
- int(time.time()) * 1000
- prediction_cycle * 1000
),
)
// 1000
)
tl = Timeloop()
dataset_preprocessor.prepare_csv()
for metric in predicted_metrics:
predictions = None
global time_0
time_0 = time_0 + prediction_cycle * 1000
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
(prediction_cycle * 1000) // msg["publish_rate"],
extra_data=predictions,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0,
)
if predictions is not None:
predictions = pd.concat([predictions, prediction], ignore_index=True)
else:
predictions = prediction
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
# dataset_preprocessor.prepare_csv()
# for metric in predicted_metrics:
# predictions = None
# # global time_0
# # time_0 = time_0 + prediction_cycle
# for i in range(number_of_forward_predictions[metric]):
# prediction_msgs, prediction = predict(
# metric,
# (prediction_cycle * 1000) // msg["publish_rate"],
# extra_data=predictions,
# m=i + 1,
# prediction_hor=prediction_horizon,
# timestamp=time_0,
# )
# if predictions is not None:
# predictions = pd.concat([predictions, prediction], ignore_index=True)
# else:
# predictions = prediction
# if prediction_msgs:
# dest = f"{PRED_TOPIC_PREF}.{metric}"
# start_conn.send_to_topic(dest, prediction_msgs[metric])
# influxdb_conn.send_to_influxdb(metric, prediction_msgs)
@tl.job(interval=timedelta(seconds=prediction_cycle))
def metric_predict():
......@@ -127,7 +140,20 @@ def main():
for metric in predicted_metrics:
predictions = None
global time_0
time_0 = time_0 + prediction_cycle * 1000
time_1 = time_0
print("time 00000000 ", time_0)
# time_0 = time_0 + prediction_cycle
time_0 = time_0 + int(
(
(int(time.time()) - time_0)
/ (prediction_cycle // number_of_forward_predictions[metric])
)
) * (prediction_cycle // number_of_forward_predictions[metric])
print("time 00000000 ", int(time.time()) - time_1)
# print(f"time_0 difference seconds {int(time.time()) - time_0}")
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
......@@ -135,8 +161,12 @@ def main():
extra_data=predictions,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0,
timestamp=time_0 + (i + 1) * prediction_horizon,
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {time_0 + (i + 1) *prediction_horizon // 1000 - int(time.time())}"
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
......
......@@ -14,11 +14,15 @@ RETRAIN_CYCLE = 2 # minutes
HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613))
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
tl = Timeloop()
......
......@@ -33,7 +33,7 @@ def predict(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path)
dataset = pd.read_csv(data_path).head(1000)
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
......@@ -87,13 +87,13 @@ def predict(
target_column: {
"metricValue": prediction[-1][-1][2].item(),
"level": 1, # TODO
"timestamp": time.time_ns() // 1_000_000,
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
prediction[-1][-1][0].item(),
prediction[-1][-1][-1].item(),
], # quantiles difference
"predictionTime": timestamp + prediction_hor * m,
"predictionTime": timestamp + prediction_hor * m // 1000,
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
......
......@@ -34,7 +34,8 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path)
dataset = pd.read_csv(data_path).head(1000)
print(dataset)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
......
......@@ -32,6 +32,7 @@ class Dataset(object):
self.prediction_length = prediction_length
self.add_obligatory_columns(dataset)
self.dataset = self.convert_formats(dataset)
# self.dataset.columns[2] = "AvgResponseTime"
self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment