Commit 424b5b56 authored by Anna Warno's avatar Anna Warno
Browse files

models files added

parent 347ade46
......@@ -4,6 +4,7 @@ FROM python:3.8-slim-buster
WORKDIR /wd
COPY deployment/tft/requirements.txt .
RUN pip3 install -r requirements.txt
RUN mkdir models
# Copy the rest of the codebase into the image
COPY deployment/tft ./
......
......@@ -13,6 +13,7 @@ START_TOPIC = f"start_forecasting.{METHOD}"
def run_process(args):
print("running")
os.system(f"python {args[0]} '{args[1]}'")
......@@ -64,12 +65,17 @@ class StartForecastingListener(stomp.ConnectionListener):
message = json.dumps(message)
start_training(message)
self.conn.disconnect()
class Msg(object):
def __init__(self):
self.body = None
def main():
logging.getLogger().setLevel(logging.DEBUG)
print("start")
print()
start_app_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_app_conn.connect()
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
......@@ -82,6 +88,19 @@ def main():
start_app_conn.conn.set_listener(
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
print("start")
msg1 = Msg()
msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60}]'
msg2 = Msg()
msg2.body = """{
"metrics": ["cpu_usage"],
"timestamp": 0,
"epoch_start": 0,
"number_of_forward_predictions": 5,
"prediction_horizon": 120}"""
StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
pass
......
......@@ -73,7 +73,7 @@ def main():
stop_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto")
start_conn.conn.subscribe(f"/topic/{STOP_TOPIC}", "2", ack="auto")
stop_conn.conn.subscribe(f"/topic/{STOP_TOPIC}", "2", ack="auto")
start_conn.set_listener(
"1", CustomListener(start_conn.conn, START_TOPIC, start=True)
......@@ -90,9 +90,8 @@ def main():
f"waiting {msg['epoch_start'] - int(time.time()) - prediction_cycle} seconds"
)
time.sleep(max(0
msg["epoch_start"] - int(time.time()) - prediction_cycle)
) # time units???
print("waiting: ", msg["epoch_start"] - int(time.time()) - prediction_cycle)
time.sleep(max(0, msg["epoch_start"] - int(time.time()) - prediction_cycle))
tl = Timeloop()
......@@ -104,7 +103,7 @@ def main():
predictions = None
time_0 = int(time.time())
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, predictions = predict(
prediction_msgs, prediction = predict(
metric,
prediction_cycle,
extra_data=predictions,
......@@ -112,6 +111,13 @@ def main():
prediction_hor=prediction_horizon,
timestamp=time_0,
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
)
else:
predictions = prediction
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
start_conn.send_to_topic(dest, prediction_msgs[metric])
......
......@@ -23,7 +23,7 @@ def predict(target_column, yaml_file="nbeats.yaml", extra_data=None):
dataset = pd.read_csv(data_path)
if extra_data is not None:
dataset = pd.concat([dataset, extra_data])
dataset = pd.concat([dataset, extra_data], ignore_index=True)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
......
......@@ -82,8 +82,10 @@ def train(target_column, yaml_file="nbeats.yaml"):
train_dataloader=train_dataloader,
val_dataloaders=val_dataloader,
)
with lock:
if os.path.isfile(lockfile):
with lock:
torch.save(model.state_dict(), model_path)
else:
torch.save(model.state_dict(), model_path)
msg = {
......
......@@ -4,6 +4,7 @@ FROM python:3.8-slim-buster
WORKDIR /wd
COPY deployment/tft/requirements.txt .
RUN pip3 install -r requirements.txt
RUN mkdir models
# Copy the rest of the codebase into the image
COPY deployment/tft ./
......
......@@ -65,7 +65,11 @@ class StartForecastingListener(stomp.ConnectionListener):
message = json.dumps(message)
start_training(message)
self.conn.disconnect()
class Msg(object):
def __init__(self):
self.body = None
def main():
......@@ -84,6 +88,19 @@ def main():
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
msg1 = Msg()
msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60}]'
msg2 = Msg()
msg2.body = """{
"metrics": ["cpu_usage"],
"timestamp": 0,
"epoch_start": 0,
"number_of_forward_predictions": 5,
"prediction_horizon": 120}"""
StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
pass
......
......@@ -89,10 +89,9 @@ def main():
logging.debug(
f"waiting {msg['epoch_start'] - int(time.time()) - prediction_cycle} seconds"
)
print()
# time.sleep(max(0
# msg["epoch_start"] - int(time.time()) - prediction_cycle)
# ) # time units???
print("waiting: ", msg["epoch_start"] - int(time.time()) - prediction_cycle)
time.sleep(max(0, msg["epoch_start"] - int(time.time()) - prediction_cycle))
tl = Timeloop()
......@@ -104,7 +103,7 @@ def main():
predictions = None
time_0 = int(time.time())
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, predictions = predict(
prediction_msgs, prediction = predict(
metric,
prediction_cycle,
extra_data=predictions,
......@@ -112,6 +111,13 @@ def main():
prediction_hor=prediction_horizon,
timestamp=time_0,
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
)
else:
predictions = prediction
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
start_conn.send_to_topic(dest, prediction_msgs[metric])
......
......@@ -34,7 +34,7 @@ def predict(
dataset = pd.read_csv(data_path)
if extra_data is not None:
dataset = pd.concat([dataset, extra_data])
dataset = pd.concat([dataset, extra_data], ignore_index=True)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
......
......@@ -86,7 +86,10 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
val_dataloaders=val_dataloader,
)
with lock:
if os.path.isfile(lockfile):
with lock:
torch.save(tft.state_dict(), model_path)
else:
torch.save(tft.state_dict(), model_path)
msg = {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment