predict.py 5.97 KB
Newer Older
Anna Warno's avatar
Anna Warno committed
1
import time
2
import os
Anna Warno's avatar
Anna Warno committed
3
4
import stomp
import threading
Anna Warno's avatar
Anna Warno committed
5
from src.model_predict import predict
Anna Warno's avatar
Anna Warno committed
6
7
from amq_message_python_library import *
from src.influxdb_predictions import InfluxdbPredictionsSender
Anna Warno's avatar
Anna Warno committed
8
9
10
from timeloop import Timeloop
from datetime import timedelta
import json
11
12
import sys
import pandas as pd
Anna Warno's avatar
Anna Warno committed
13
14
import logging
from src.dataset_maker import CSVData
Anna Warno's avatar
Anna Warno committed
15
16


Anna Warno's avatar
Anna Warno committed
17
METHOD = os.environ.get("METHOD", "tft")
18
19
20
21
22
23
24
START_TOPIC = f"start_forecasting.{METHOD}"
STOP_TOPIC = f"stop_forecasting.{METHOD}"
PRED_TOPIC_PREF = f"intermediate_prediction.{METHOD}"
PREDICTION_CYCLE = 1  # minutes
APP_NAME = os.environ.get("APP_NAME", "demo")
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
Anna Warno's avatar
Anna Warno committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44


class CustomListener(stomp.ConnectionListener):
    """Custom listener, parameters:
    - conn (stomp connector)
    - topic_name, name of topic to subscribe,
    - start , if start is set to be true recived metrics
    are added to predicted metric, otherwise recived
    metrics are removed from predicted metric (start
    mode corresponds to start_prediction)"""

    def __init__(self, conn, topic_name, start=True):
        self.conn = conn
        self.topic_name = topic_name
        self.start = start

    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
Anna Warno's avatar
Anna Warno committed
45
        global predicted_metrics, lock, number_of_forward_predictions, metrics_info
Anna Warno's avatar
Anna Warno committed
46
47
48
49
50
51
        lock.acquire()
        try:
            metrics = set(json.loads(frame.body)["metrics"])

            if self.start:
                predicted_metrics = predicted_metrics.union(metrics)
52
                for metric in metrics:
Anna Warno's avatar
Anna Warno committed
53
54
55
                    frame_body = json.loads(frame.body)
                    logging.debug(frame_body)
                    number_of_forward_predictions[metric] = frame_body[
56
57
                        "number_of_forward_predictions"
                    ]
Anna Warno's avatar
Anna Warno committed
58
                    self.prediction_cycle = frame_body["prediction_horizon"]
Anna Warno's avatar
Anna Warno committed
59
60
61
62
63
64
65
66
            else:
                predicted_metrics = predicted_metrics.difference(metrics)

        finally:
            lock.release()


def main():
Anna Warno's avatar
Anna Warno committed
67
68
    logging.debug(f"metrics to predict {predicted_metrics}")
    start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
69
    start_conn.connect()
Anna Warno's avatar
Anna Warno committed
70

Anna Warno's avatar
Anna Warno committed
71
72
73
    stop_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
    stop_conn.connect()

74
    start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto")
Anna Warno's avatar
Anna Warno committed
75
    stop_conn.conn.subscribe(f"/topic/{STOP_TOPIC}", "2", ack="auto")
Anna Warno's avatar
Anna Warno committed
76

Anna Warno's avatar
Anna Warno committed
77
78
79
80
    start_conn.set_listener(
        "1", CustomListener(start_conn.conn, START_TOPIC, start=True)
    )
    stop_conn.set_listener("2", CustomListener(stop_conn.conn, STOP_TOPIC, start=False))
Anna Warno's avatar
Anna Warno committed
81

82
83
    dataset_preprocessor = CSVData(APP_NAME)
    dataset_preprocessor.prepare_csv()
Anna Warno's avatar
Anna Warno committed
84
85
86
    logging.debug("dataset downloaded")

    influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
87

Anna Warno's avatar
Anna Warno committed
88
    logging.debug(
Anna Warno's avatar
Anna Warno committed
89
        f"waiting {(msg['epoch_start'] - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
Anna Warno's avatar
Anna Warno committed
90
    )
Anna Warno's avatar
Anna Warno committed
91

92
    time.sleep(
Anna Warno's avatar
Anna Warno committed
93
        max(0, (msg["epoch_start"] - int(time.time()) * 1000 - prediction_cycle * 1000))
94
95
        // 1000
    )
Anna Warno's avatar
Anna Warno committed
96
97
98

    tl = Timeloop()

Anna Warno's avatar
Anna Warno committed
99
100
101
    dataset_preprocessor.prepare_csv()
    for metric in predicted_metrics:
        predictions = None
Anna Warno's avatar
Anna Warno committed
102
        global time_0
Anna Warno's avatar
Anna Warno committed
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
        time_0 = time_0 + prediction_cycle * 1000
        for i in range(number_of_forward_predictions[metric]):
            prediction_msgs, prediction = predict(
                metric,
                (prediction_cycle * 1000) // msg["publish_rate"],
                extra_data=predictions,
                m=i + 1,
                prediction_hor=prediction_horizon,
                timestamp=time_0,
            )
            if predictions is not None:
                predictions = pd.concat([predictions, prediction], ignore_index=True)
            else:
                predictions = prediction

            if prediction_msgs:
                dest = f"{PRED_TOPIC_PREF}.{metric}"
                start_conn.send_to_topic(dest, prediction_msgs[metric])
                influxdb_conn.send_to_influxdb(metric, prediction_msgs)

Anna Warno's avatar
Anna Warno committed
123
    @tl.job(interval=timedelta(seconds=prediction_cycle))
Anna Warno's avatar
Anna Warno committed
124
    def metric_predict():
Anna Warno's avatar
Anna Warno committed
125
        logging.debug("prediction")
126
127
128
        dataset_preprocessor.prepare_csv()
        for metric in predicted_metrics:
            predictions = None
Anna Warno's avatar
Anna Warno committed
129
            global time_0
Anna Warno's avatar
Anna Warno committed
130
            time_0 = time_0 + prediction_cycle * 1000
Anna Warno's avatar
Anna Warno committed
131
            for i in range(number_of_forward_predictions[metric]):
Anna Warno's avatar
Anna Warno committed
132
                prediction_msgs, prediction = predict(
Anna Warno's avatar
Anna Warno committed
133
                    metric,
Anna Warno's avatar
Anna Warno committed
134
                    (prediction_cycle * 1000) // msg["publish_rate"],
Anna Warno's avatar
Anna Warno committed
135
136
137
138
139
                    extra_data=predictions,
                    m=i + 1,
                    prediction_hor=prediction_horizon,
                    timestamp=time_0,
                )
Anna Warno's avatar
Anna Warno committed
140
141
142
143
144
145
146
                if predictions is not None:
                    predictions = pd.concat(
                        [predictions, prediction], ignore_index=True
                    )
                else:
                    predictions = prediction

147
148
149
                if prediction_msgs:
                    dest = f"{PRED_TOPIC_PREF}.{metric}"
                    start_conn.send_to_topic(dest, prediction_msgs[metric])
Anna Warno's avatar
Anna Warno committed
150
                    influxdb_conn.send_to_influxdb(metric, prediction_msgs)
Anna Warno's avatar
Anna Warno committed
151
152
153

    tl.start(block=True)

154
155
156
    while True:
        pass

Anna Warno's avatar
Anna Warno committed
157
158

if __name__ == "__main__":
Anna Warno's avatar
Anna Warno committed
159
    msg = json.loads(sys.argv[1])
Anna Warno's avatar
Anna Warno committed
160
    metrics_info = {
Anna Warno's avatar
Anna Warno committed
161
162
163
164
165
        m["metric"]: {
            "level": m["level"],
            "publish_rate": m["publish_rate"],
        }
        for m in msg["all_metrics"]
Anna Warno's avatar
Anna Warno committed
166
    }
Anna Warno's avatar
Anna Warno committed
167
    time_0 = msg["epoch_start"]
Anna Warno's avatar
Anna Warno committed
168
    prediction_horizon = msg["prediction_horizon"] * 1000
Anna Warno's avatar
Anna Warno committed
169
170
171
    predicted_metrics = set(msg["metrics"])
    prediction_cycle = msg["prediction_horizon"]

Anna Warno's avatar
Anna Warno committed
172
173
    logging.debug(f"Predicted metrics: {predicted_metrics}")
    number_of_forward_predictions = {
Anna Warno's avatar
Anna Warno committed
174
        metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
Anna Warno's avatar
Anna Warno committed
175
    }  # deafult number of forward predictions
176
177
    lock = threading.Lock()

Anna Warno's avatar
Anna Warno committed
178
    main()