Commit 0949cd6f authored by Anna Warno's avatar Anna Warno
Browse files

prediction speed corrected

parent 3aef9ba4
Pipeline #16924 passed with stage
in 1 minute and 55 seconds
......@@ -112,11 +112,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]'
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 10000}, {"metric": "ETPercentile_Ctx", "level": 3, "publish_rate": 10000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext", "ETPercentile_Ctx"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + "}"
# )
......
......@@ -14,6 +14,7 @@ from src.dataset_maker import CSVData
from pytz import timezone
import pytz
from datetime import datetime
import random
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -146,6 +147,7 @@ def main():
print(
f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print()
end_time = int(time.time())
......
def square(x):
return x * x
\ No newline at end of file
......@@ -41,7 +41,12 @@ def predict(
return None
dataset = pd.read_csv(data_path)
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate)
new_ts_dataset = Dataset(
dataset,
target_column=target_column,
**params["dataset"],
publish_rate=publish_rate,
)
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......@@ -53,7 +58,12 @@ def predict(
dataset = pd.read_csv(data_path)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate)
ts_dataset = Dataset(
dataset,
target_column=target_column,
**params["dataset"],
publish_rate=publish_rate,
)
if ts_dataset.dataset.shape[0] < 1:
logging.info(
f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......@@ -110,7 +120,9 @@ def predict(
predictions.conf_int(alpha=0.05).iloc[i].values[0],
predictions.conf_int(alpha=0.05).iloc[i].values[1],
], # quantiles difference
"predictionTime": timestamp + (i + 1) * (prediction_hor // 1000),
"predictionTime": timestamp
+ (i // single_prediction_points_length + 1)
* (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
......
......@@ -101,7 +101,6 @@ class Dataset(object):
)
def check_gap(self):
print(self.dataset)
if self.dataset.shape[0] > 0:
self.dataset = self.dataset.groupby(by=[self.time_column]).min()
self.dataset[self.time_column] = self.dataset.index
......
......@@ -122,48 +122,32 @@ def main():
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
predictions = None
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
prediction_lengths[metric],
extra_data=None,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0 + (i + 1) * (prediction_horizon // 1000),
predicted_point_idx=int(
(i + 1) * prediction_points_horizons[metric] - 1
),
publish_rate=metrics_info[metric]["publish_rate"],
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
logging.info(
f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
prediction_msgs = predict(
metric,
prediction_lengths[metric],
single_prediction_points_length=prediction_points_horizons[metric],
prediction_hor=prediction_horizon,
timestamp=time_0,
publish_rate=metrics_info[metric]["publish_rate"],
)
else:
predictions = prediction
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
if prediction_msgs:
for message in prediction_msgs:
logging.info(
f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, message[metric])
influxdb_conn.send_to_influxdb(metric, message)
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
)
logging.info(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}'
)
logging.info(
f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
current_time = int(time.time())
logging.info(
f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
end_time = int(time.time())
time_0 = time_0 + prediction_cycle
......
......@@ -18,13 +18,14 @@ TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
"""Script for nbeats fusion transformer prediction"""
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
filename=f"logs/{os.environ.get('METHOD', 'model')}.out", level=logging.INFO
)
def predict(
target_column,
prediction_length,
single_prediction_points_length=1,
yaml_file="model.yaml",
extra_data=None,
m=1,
......@@ -48,14 +49,14 @@ def predict(
f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("no pretrained model, unable to predict")
return (None, None)
return None
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
if not os.path.isfile(data_path):
return (None, None)
return None
dataset = pd.read_csv(data_path)
new_ts_dataset = Dataset(
......@@ -69,7 +70,7 @@ def predict(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("Not enough fresh data, unable to predict TIME:")
return (None, None)
return None
dataset = new_ts_dataset.dataset
......@@ -118,7 +119,7 @@ def predict(
model.load_state_dict(torch.load(model_path))
logging.info("Model corrupted unable to predict")
else:
return (None, None)
return None
prediction_input = ts_dataset.get_from_dataset(future_df)
prediction_input = prediction_input.to_dataloader(train=False)
......@@ -133,43 +134,43 @@ def predict(
for x, _ in prediction_input:
# make prediction
out = model(x)
out = torch.flatten(model.transform_output(out))[predicted_point_idx]
out = out.item()
out = torch.flatten(model.transform_output(out))
predictions_with_dropout.append(out)
conf_intervals = st.t.interval(
alpha=0.95,
df=len(predictions_with_dropout) - 1,
loc=np.mean(predictions_with_dropout),
scale=st.sem(predictions_with_dropout),
)
predicted_values = list(conf_intervals) + [
torch.flatten(prediction)[predicted_point_idx].item()
]
predicted_values.sort() # ensure that predictions and confidence intervals are in correct order
msg = {
target_column: {
"metricValue": predicted_values[1], # middle predicted value
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predicted_values[0],
predicted_values[-1],
], # quantiles difference
"predictionTime": timestamp,
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
logging.info(
f"prediction msg: {msg} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
future_df = future_df.tail(pred_len)
future_df["split"] = "val"
future_df[target_column] = prediction[0]
return (msg, future_df)
msgs = []
for i in range(pred_len):
if ((i + 1) % single_prediction_points_length) == 0:
predictions_intervals = [p[i].item() for p in predictions_with_dropout]
conf_intervals = st.t.interval(
alpha=0.95,
df=len(predictions_intervals) - 1,
loc=np.mean(predictions_intervals),
scale=st.sem(predictions_intervals),
)
predicted_values = list(conf_intervals) + [
torch.flatten(prediction)[i].item()
]
predicted_values.sort() # ensure that predictions and confidence intervals are in correct order
msg = {
target_column: {
"metricValue": predicted_values[1], # middle predicted value
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predicted_values[0],
predicted_values[-1],
], # quantiles difference
"predictionTime": timestamp
+ (i // single_prediction_points_length + 1)
* (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
msgs.append(msg)
return msgs
......@@ -122,48 +122,32 @@ def main():
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
predictions = None
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
prediction_lengths[metric],
extra_data=None,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0 + (i + 1) * (prediction_horizon // 1000),
predicted_point_idx=int(
(i + 1) * prediction_points_horizons[metric] - 1
),
publish_rate=metrics_info[metric]["publish_rate"],
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
logging.info(
f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
prediction_msgs = predict(
metric,
prediction_lengths[metric],
single_prediction_points_length=prediction_points_horizons[metric],
prediction_hor=prediction_horizon,
timestamp=time_0,
publish_rate=metrics_info[metric]["publish_rate"],
)
else:
predictions = prediction
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
if prediction_msgs:
for message in prediction_msgs:
logging.info(
f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, message[metric])
influxdb_conn.send_to_influxdb(metric, message)
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
)
logging.info(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}'
)
logging.info(
f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
current_time = int(time.time())
logging.info(
f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
end_time = int(time.time())
time_0 = time_0 + prediction_cycle
......
......@@ -27,6 +27,7 @@ pd.options.mode.chained_assignment = None
def predict(
target_column,
prediction_length,
single_prediction_points_length=1,
yaml_file="model.yaml",
extra_data=None,
m=1,
......@@ -47,14 +48,14 @@ def predict(
f"METRIC {target_column} no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("no pretrained model, unable to predict")
return (None, None)
return None
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
if not os.path.isfile(data_path):
return (None, None)
return None
dataset = pd.read_csv(data_path)
new_ts_dataset = Dataset(
......@@ -68,7 +69,7 @@ def predict(
f"METRIC {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("METRIC {target_column} Not enough fresh data, unable to predict TIME:")
return (None, None)
return None
dataset = new_ts_dataset.dataset
......@@ -119,48 +120,50 @@ def predict(
logging.info(
f"METRIC {target_column} No pretrained model unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return (None, None)
return None
with lock:
if os.path.isfile(model_path):
model.load_state_dict(torch.load(model_path))
logging.info("Model corrupted unable to predict")
else:
return (None, None)
return None
prediction_input = ts_dataset.get_from_dataset(future_df)
prediction_input = prediction_input.to_dataloader(train=False)
prediction = model.predict(future_df, mode="raw")["prediction"]
predicted_values = [
prediction[-1][predicted_point_idx][0].item(),
prediction[-1][predicted_point_idx][3].item(),
prediction[-1][predicted_point_idx][-1].item(),
]
predicted_values.sort()
msg = {
target_column: {
"metricValue": predicted_values[1],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predicted_values[0],
predicted_values[-1],
], # quantiles difference
"predictionTime": timestamp,
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
logging.debug(f"prediction msg: {msg}")
future_df["split"] = "val"
future_df = future_df.tail(pred_len)
future_df["split"] = "val"
future_df[target_column] = prediction.permute(0, 2, 1)[0][3]
return (msg, future_df)
msgs = []
for i in range(pred_len):
if ((i + 1) % single_prediction_points_length) == 0:
predicted_values = [
prediction[-1][i][0].item(),
prediction[-1][i][3].item(),
prediction[-1][i][-1].item(),
]
predicted_values.sort()
msg = {
target_column: {
"metricValue": predicted_values[1],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predicted_values[0],
predicted_values[-1],
], # quantiles difference
"predictionTime": timestamp
+ (i // single_prediction_points_length + 1)
* (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
msgs.append(msg)
return msgs
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment