Predictor.py 14.9 KB
Newer Older
1
import json
2
import threading
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import time
import os, sys
import multiprocessing
import traceback
from subprocess import PIPE, run

import logging
import messaging
from messaging import morphemic
from runtime.predictions.Prediction import Prediction
from runtime.operational_status.State import State
from runtime.utilities.Utilities import Utilities


def fix_path_ending(path):
    if (path[-1] is os.sep):
        return path
    else:
        return path + os.sep


def get_prediction_data_filename(configuration_file_location):
    from jproperties import Properties
    p = Properties()
    with open(configuration_file_location, "rb") as f:
        p.load(f, "utf-8")
        path_to_datasets, metadata = p["path_to_datasets"]
        application_name, metadata = p["application_name"]
        path_to_datasets = fix_path_ending(path_to_datasets)
        return "" + str(path_to_datasets) + str(application_name) + ".csv"


def predict_attribute(attribute, configuration_file_location,next_prediction_time):

    prediction_confidence_interval_produced = False
    prediction_value_produced = False
    prediction_valid = False
    os.chdir(os.path.dirname(configuration_file_location))
    prediction_data_file = get_prediction_data_filename(configuration_file_location)

    if State.testing_prediction_functionality:
        print("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
        print("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute)
        command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute]
    else:
        print ("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)

        command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]

    process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
    if (process_output.stdout==""):
        print("Empty output from R predictions")
        print("The error output is the following")
        print(process_output.stderr) #There was an error during the calculation of the predicted value

    process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split()
    prediction_value = 0
    prediction_confidence_interval = "-10000000000000000000000000,10000000000000000000000000"
    prediction_mae = 0
    prediction_mse = 0
    prediction_mape = 0
    prediction_smape = 0
    for string in process_output_string_list:
        if (string.startswith("Prediction:")):
            prediction_value = string.replace("Prediction:", "")
            prediction_value_produced = True
        if (string.startswith("Confidence_interval:")):
            prediction_confidence_interval = string.replace("Confidence_interval:", "")
            prediction_confidence_interval_produced = True
        elif (string.startswith("mae:")):
            prediction_mae = string.replace("mae:", "")
        elif (string.startswith("mse:")):
            prediction_mse = string.replace("mse:", "")
        elif (string.startswith("mape:")):
            prediction_mape = string.replace("mape:", "")
        elif (string.startswith("smape:")):
            prediction_smape = string.replace("smape:", "")
    if (prediction_confidence_interval_produced and prediction_value_produced):
        prediction_valid = True
        print("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
    else:
        print("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
        print(process_output.stderr)

    output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape)
    return output_prediction


def predict_attributes(attributes,next_prediction_time):
    pool = multiprocessing.Pool(len(attributes))
    print("Prediction thread pool size set to " + str(len(attributes)))
    attribute_predictions = {}

    for attribute in attributes:
        print("Starting " + attribute + " prediction thread")
        start_time = time.time()
        attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, State.configuration_file_location,str(next_prediction_time)])
        #attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get()

    for attribute in attributes:
        attribute_predictions[attribute] = attribute_predictions[attribute].get() #get the results of the processing
        attribute_predictions[attribute].set_last_prediction_time_needed(int(time.time() - start_time))
        #prediction_time_needed[attribute])

    pool.close()
    pool.join()
    return attribute_predictions


def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_prediction):
    current_time = time.time()
    prediction_intervals_since_epoch = ((current_time - epoch_start)//prediction_horizon)
    estimated_time_after_prediction = current_time+maximum_time_for_prediction
    earliest_time_to_predict_at = epoch_start + (prediction_intervals_since_epoch+1)*prediction_horizon #these predictions will concern the next prediction interval

    if (estimated_time_after_prediction > earliest_time_to_predict_at ):
        future_prediction_time_factor = 1+(estimated_time_after_prediction-earliest_time_to_predict_at)//prediction_horizon
        print("Due to slowness of the prediction, skipping next prediction interval and targeting "+str(future_prediction_time_factor)+" intervals ahead (instead of simply waiting until the time that the next interval begins)")
        prediction_time = earliest_time_to_predict_at+ future_prediction_time_factor*prediction_horizon
    else:
        prediction_time = earliest_time_to_predict_at + prediction_horizon
    return prediction_time


127
128
129
130
131
132
133
134
135
136
137
def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_for_prediction):
    while Listener.start_forecasting:
        print("Using " + State.configuration_file_location + " for configuration details...")
        State.next_prediction_time = update_prediction_time(State.epoch_start, prediction_horizon,maximum_time_required_for_prediction)

        for attribute in State.metrics_to_predict:
            if ((State.previous_prediction is not None) and (State.previous_prediction[attribute] is not None) and (State.previous_prediction[attribute].last_prediction_time_needed>maximum_time_required_for_prediction)):
                maximum_time_required_for_prediction = State.previous_prediction[attribute].last_prediction_time_needed

        #Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
        wait_time = State.next_prediction_time - prediction_horizon - time.time()
138
        print("Waiting for "+str((int(wait_time*100))/100)+" seconds")
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
        if (wait_time>0):
            time.sleep(wait_time)
            if(not Listener.start_forecasting):
                break

        Utilities.load_configuration()
        Utilities.update_monitoring_data()
        try:
            prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time)
        except Exception as e:
            print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time)
        for attribute in State.metrics_to_predict:
            if(not prediction[attribute].prediction_valid):
                continue
            if (State.disconnected or State.check_stale_connection()):
                State.connection.connect()
            message_not_sent = True
            current_time = int(time.time())
            prediction_message_body = {
                "metricValues": prediction[attribute].value,
                "level": 3,
                "timestamp": current_time,
                "probability": 0.95,
                "confidence_interval": str(prediction[attribute].lower_confidence_interval_value) + "," + str(
                    prediction[attribute].upper_confidence_interval_value),
                "predictionTime": int(State.next_prediction_time),
                "refersTo": "Undefined",#"MySQL_12345",
                "cloud": "Undefined",#"AWS-Dublin",
                "provider": "Undefined",#"AWS"
            }
            training_events_message_body = {
                "metrics": State.metrics_to_predict,
                "forecasting_method": "exponentialsmoothing",
                "timestamp": current_time,
            }
            while (message_not_sent):
                try:
                    State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body)
                    State.connection.send_to_topic('training_events',training_events_message_body)
                    message_not_sent = False
                    print("Successfully sent prediction message for "+attribute)
                except ConnectionError as exception:
                    State.connection.disconnect()
                    State.connection = messaging.morphemic.Connection('admin', 'admin')
                    State.connection.connect()
                    State.disconnected = False

        State.previous_prediction = prediction
        State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from)

189
190
191
192

class Listener(messaging.listener.MorphemicListener):

    start_forecasting = None # Whether the component should start (or keep on) forecasting
193
194
195
    prediction_thread = None
    #previous_prediction = None
    #metrics_to_predict = [] # The metrics to be predicted, will be received in a special message
196
197
198
199
200
201
202
203
204
205
206
207
208
209
    #def on_message(self, headers,body):
    def on_message(self, frame):

        headers = frame.headers
        body = frame.body

        logging.debug("Headers %s", headers)
        logging.debug("        %s", body)

        print("New message on "+self.get_topic_name(headers))
        print("---------------------------------------------")

        if self.get_topic_name(headers) == 'metrics_to_predict':

210
211
212
213
            State.initial_metric_list_received = True
            #body = json.loads(body)
            #for element in body:
            #    State.metrics_to_predict.append(element["metric"])
214
215
216
217
218

        elif self.get_topic_name(headers) == 'test.exponentialsmoothing':
            State.testing_prediction_functionality = True

        elif self.get_topic_name(headers) == 'start_forecasting.exponentialsmoothing':
219
            State.metrics_to_predict = json.loads(body)["metrics"]
220
221
222
223
224
225
226
            #waitfor(first period)
            if (not State.initial_metric_list_received):
                print("The initial metric list has not been received, therefore no predictions are generated")
                return
            prediction_horizon = None
            try:
                Listener.start_forecasting = True
227
                State.epoch_start = json.loads(body)["epoch_start"]
228
                prediction_horizon = int(json.loads(body)["prediction_horizon"])
229
                State.next_prediction_time = State.epoch_start
230
231
232
233
            except Exception as e:
                print("Problem while retrieving epoch start and/or prediction_horizon")

            maximum_time_required_for_prediction = 20 #initialization, assuming 20 seconds processing time to derive a first prediction
234
235
236
            if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
                self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
                self.prediction_thread.start()
237
238
239

        elif self.get_topic_name(headers) == 'stop_forecasting.exponentialsmoothing':
            #waitfor(first period)
240
241
242
243
244
245
246
247
248
            print("Received message to stop predicting some of the metrics")
            metrics_to_remove = json.loads(body)["metrics"]
            for metric in metrics_to_remove:
                if (State.metrics_to_predict.__contains__(metric)):
                    print("Stopping generating predictions for metric "+metric)
                    State.metrics_to_predict.remove(metric)
            if len(State.metrics_to_predict)==0:
                Listener.start_forecasting = False
                self.prediction_thread.join()
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268

    #def on_error(self, headers, body):
    def on_error(self, frame):
        headers = frame.headers
        body = frame.body
        logging.error("Headers %s", headers)
        logging.error("        %s", body)

    def on_disconnected(self):
        print('disconnected')
        State.disconnected=True
        #connection.connect()


def get_dataset_file(attribute):
    pass


if __name__ == "__main__":
    State.configuration_file_location = sys.argv[1]
269
    Utilities.load_configuration()
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
    # Subscribe to retrieve the metrics which should be used

    id = "exponentialsmoothing"
    State.disconnected = True

    #while(True):
    #    State.connection = messaging.morphemic.Connection('admin', 'admin')
    #    State.connection.connect()
    #    State.connection.set_listener(id, Listener())
    #    State.connection.topic("test","helloid")
    #    State.connection.send_to_topic("test","HELLO!!!")
    #exit(100)

    while True:
        State.connection = messaging.morphemic.Connection(State.broker_username,State.broker_password,State.broker_address,State.broker_port)
        # morphemic = morphemic.model.Model(connection)
        State.connection.set_listener(id, Listener())
        if (State.disconnected or State.check_stale_connection()):
            try:
                State.connection.disconnect() #required to avoid the already connected exception
                State.connection.connect()
                State.disconnected = False
                State.connection.topic("metrics_to_predict", id)
293
                print("Successfully connected to metrics_to_predict")
294
                State.connection.topic("start_forecasting.exponentialsmoothing", id)
295
                print("Successfully connected to start_forecasting.exponentialsmoothing")
296
                State.connection.topic("stop_forecasting.exponentialsmoothing", id)
297
                print("Successfully connected to stop_forecasting.exponentialsmoothing")
298
299
300
301
            except Exception as e:
                print("Encountered exception while trying to connect to broker")
                print(traceback.format_exc())
                State.disconnected = True
302
                continue
303
304
305
306
307
308
309
310
311
312
        State.disconnection_handler.acquire()
        State.disconnection_handler.wait()
        State.disconnection_handler.release()
    # data = {}
    # data["testvalue"]="Test"
    # connection.send_to_topic("metrics_to_predict",data)

    # connection.topic("stop_forecasting",id)
    # connection.topic("start_forecasting",id)

313
    State.connection.disconnect()