predict.py 5.97 KB
Newer Older
1
2
3
4
import time
import os
import stomp
import threading
Anna Warno's avatar
Anna Warno committed
5
from src.model_predict import predict
Anna Warno's avatar
Anna Warno committed
6
7
from amq_message_python_library import *
from src.influxdb_predictions import InfluxdbPredictionsSender
8
9
10
11
12
from timeloop import Timeloop
from datetime import timedelta
import json
import sys
import pandas as pd
Anna Warno's avatar
Anna Warno committed
13
14
import logging
from src.dataset_maker import CSVData
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44


METHOD = os.environ.get("METHOD", "tft")
START_TOPIC = f"start_forecasting.{METHOD}"
STOP_TOPIC = f"stop_forecasting.{METHOD}"
PRED_TOPIC_PREF = f"intermediate_prediction.{METHOD}"
PREDICTION_CYCLE = 1  # minutes
APP_NAME = os.environ.get("APP_NAME", "demo")
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")


class CustomListener(stomp.ConnectionListener):
    """Custom listener, parameters:
    - conn (stomp connector)
    - topic_name, name of topic to subscribe,
    - start , if start is set to be true recived metrics
    are added to predicted metric, otherwise recived
    metrics are removed from predicted metric (start
    mode corresponds to start_prediction)"""

    def __init__(self, conn, topic_name, start=True):
        self.conn = conn
        self.topic_name = topic_name
        self.start = start

    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
Anna Warno's avatar
Anna Warno committed
45
        global predicted_metrics, lock, number_of_forward_predictions, metrics_info
46
47
48
49
50
51
52
        lock.acquire()
        try:
            metrics = set(json.loads(frame.body)["metrics"])

            if self.start:
                predicted_metrics = predicted_metrics.union(metrics)
                for metric in metrics:
Anna Warno's avatar
Anna Warno committed
53
54
55
                    frame_body = json.loads(frame.body)
                    logging.debug(frame_body)
                    number_of_forward_predictions[metric] = frame_body[
56
57
                        "number_of_forward_predictions"
                    ]
Anna Warno's avatar
Anna Warno committed
58
                    self.prediction_cycle = frame_body["prediction_horizon"]
59
60
61
62
63
64
65
66
            else:
                predicted_metrics = predicted_metrics.difference(metrics)

        finally:
            lock.release()


def main():
Anna Warno's avatar
Anna Warno committed
67
68
    logging.debug(f"metrics to predict {predicted_metrics}")
    start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
69
70
    start_conn.connect()

Anna Warno's avatar
Anna Warno committed
71
72
73
    stop_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
    stop_conn.connect()

74
    start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto")
Anna Warno's avatar
Anna Warno committed
75
    stop_conn.conn.subscribe(f"/topic/{STOP_TOPIC}", "2", ack="auto")
76

Anna Warno's avatar
Anna Warno committed
77
78
79
80
    start_conn.set_listener(
        "1", CustomListener(start_conn.conn, START_TOPIC, start=True)
    )
    stop_conn.set_listener("2", CustomListener(stop_conn.conn, STOP_TOPIC, start=False))
81
82
83

    dataset_preprocessor = CSVData(APP_NAME)
    dataset_preprocessor.prepare_csv()
Anna Warno's avatar
Anna Warno committed
84
85
86
    logging.debug("dataset downloaded")

    influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
87

Anna Warno's avatar
Anna Warno committed
88
    logging.debug(
Anna Warno's avatar
Anna Warno committed
89
        f"waiting {(msg['epoch_start'] - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
Anna Warno's avatar
Anna Warno committed
90
91
    )

92
    time.sleep(
Anna Warno's avatar
Anna Warno committed
93
        max(0, (msg["epoch_start"] - int(time.time()) * 1000 - prediction_cycle * 1000))
94
95
        // 1000
    )
Anna Warno's avatar
Anna Warno committed
96
97
98

    tl = Timeloop()

Anna Warno's avatar
Anna Warno committed
99
100
101
    dataset_preprocessor.prepare_csv()
    for metric in predicted_metrics:
        predictions = None
Anna Warno's avatar
Anna Warno committed
102
        global time_0
Anna Warno's avatar
Anna Warno committed
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
        time_0 = time_0 + prediction_cycle * 1000
        for i in range(number_of_forward_predictions[metric]):
            prediction_msgs, prediction = predict(
                metric,
                (prediction_cycle * 1000) // msg["publish_rate"],
                extra_data=predictions,
                m=i + 1,
                prediction_hor=prediction_horizon,
                timestamp=time_0,
            )
            if predictions is not None:
                predictions = pd.concat([predictions, prediction], ignore_index=True)
            else:
                predictions = prediction

            if prediction_msgs:
                dest = f"{PRED_TOPIC_PREF}.{metric}"
                start_conn.send_to_topic(dest, prediction_msgs[metric])
                influxdb_conn.send_to_influxdb(metric, prediction_msgs)

Anna Warno's avatar
Anna Warno committed
123
    @tl.job(interval=timedelta(seconds=prediction_cycle))
124
    def metric_predict():
Anna Warno's avatar
Anna Warno committed
125
        logging.debug("prediction")
126
127
128
        dataset_preprocessor.prepare_csv()
        for metric in predicted_metrics:
            predictions = None
Anna Warno's avatar
Anna Warno committed
129
            global time_0
Anna Warno's avatar
Anna Warno committed
130
            time_0 = time_0 + prediction_cycle * 1000
Anna Warno's avatar
Anna Warno committed
131
            for i in range(number_of_forward_predictions[metric]):
Anna Warno's avatar
Anna Warno committed
132
                prediction_msgs, prediction = predict(
Anna Warno's avatar
Anna Warno committed
133
                    metric,
Anna Warno's avatar
Anna Warno committed
134
                    (prediction_cycle * 1000) // msg["publish_rate"],
Anna Warno's avatar
Anna Warno committed
135
136
137
138
139
                    extra_data=predictions,
                    m=i + 1,
                    prediction_hor=prediction_horizon,
                    timestamp=time_0,
                )
Anna Warno's avatar
Anna Warno committed
140
141
142
143
144
145
146
                if predictions is not None:
                    predictions = pd.concat(
                        [predictions, prediction], ignore_index=True
                    )
                else:
                    predictions = prediction

147
148
149
                if prediction_msgs:
                    dest = f"{PRED_TOPIC_PREF}.{metric}"
                    start_conn.send_to_topic(dest, prediction_msgs[metric])
Anna Warno's avatar
Anna Warno committed
150
                    influxdb_conn.send_to_influxdb(metric, prediction_msgs)
151
152
153
154
155
156
157
158

    tl.start(block=True)

    while True:
        pass


if __name__ == "__main__":
Anna Warno's avatar
Anna Warno committed
159
    msg = json.loads(sys.argv[1])
Anna Warno's avatar
Anna Warno committed
160
    metrics_info = {
Anna Warno's avatar
Anna Warno committed
161
162
163
164
165
        m["metric"]: {
            "level": m["level"],
            "publish_rate": m["publish_rate"],
        }
        for m in msg["all_metrics"]
Anna Warno's avatar
Anna Warno committed
166
    }
Anna Warno's avatar
Anna Warno committed
167
    time_0 = msg["epoch_start"]
Anna Warno's avatar
Anna Warno committed
168
    prediction_horizon = msg["prediction_horizon"] * 1000
Anna Warno's avatar
Anna Warno committed
169
170
171
    predicted_metrics = set(msg["metrics"])
    prediction_cycle = msg["prediction_horizon"]

Anna Warno's avatar
Anna Warno committed
172
173
    logging.debug(f"Predicted metrics: {predicted_metrics}")
    number_of_forward_predictions = {
Anna Warno's avatar
Anna Warno committed
174
        metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
Anna Warno's avatar
Anna Warno committed
175
    }  # deafult number of forward predictions
176
177
178
    lock = threading.Lock()

    main()