Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Melodic
morphemic-preprocessor
Commits
a9f525e9
Commit
a9f525e9
authored
Jun 28, 2022
by
Bizid Imen
Committed by
maciek riedl
Jun 28, 2022
Browse files
Fix and Enhance Prophet and Gluonts forecasters
parent
f350aa61
Changes
2
Hide whitespace changes
Inline
Side-by-side
forecasting_gluonts/gluonts_listener.py
View file @
a9f525e9
...
...
@@ -46,7 +46,8 @@ def worker(self, body, metric):
with
open
(
directory_path
+
"models/gluonts_"
+
metric
+
".pkl"
,
'rb'
)
as
f
:
models
[
metric
]
=
pickle
.
load
(
f
)
timestamp
=
int
(
time
())
if
timestamp
>=
predictionTimes
[
metric
]:
timestamp_horizon
=
timestamp
+
(
prediction_horizon
*
number_of_forward_predictions
)
if
timestamp_horizon
>=
predictionTimes
[
metric
]:
logging
.
debug
(
f
"Start the prediction for metric:
{
metric
}
"
)
predictions
=
gluonts_forecaster
.
predict
(
models
[
metric
],
number_of_forward_predictions
,
prediction_horizon
,
epoch_start
,
metric
)
...
...
@@ -77,8 +78,9 @@ def worker(self, body, metric):
})
prediction_time
=
prediction_time
+
prediction_horizon
epoch_start
=
epoch_start
+
prediction_horizon
sleep
(
prediction_horizon
-
5
)
execution_time
=
int
(
time
())
-
timestamp
sleep_time
=
prediction_horizon
-
execution_time
-
5
sleep
(
sleep_time
)
class
Gluonts
(
morphemic
.
handler
.
ModelHandler
,
messaging
.
listener
.
MorphemicListener
):
id
=
"gluonts"
...
...
@@ -89,7 +91,16 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
port
=
ACTIVEMQ_PORT
)
def
run
(
self
):
self
.
connector
.
connect
()
retries
=
11
while
retries
>
10
:
try
:
self
.
connector
.
connect
()
retries
=
1
logging
.
debug
(
f
"GluonTS successefuly connected to ActiveMQ"
)
except
:
logging
.
debug
(
f
"GluonTS failed to connect to ActiveMQ"
)
sleep
(
5
)
retries
=
retries
+
1
self
.
connector
.
set_listener
(
self
.
id
,
self
)
self
.
connector
.
topic
(
"start_forecasting.gluonts"
,
self
.
id
)
self
.
connector
.
topic
(
"stop_forecasting.gluonts"
,
self
.
id
)
...
...
@@ -153,4 +164,3 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
def
on_disconnected
(
self
):
logging
.
debug
(
f
'Disconnected from ActiveMQ'
)
self
.
reconnect
()
forecasting_prophet/prophet_listener.py
View file @
a9f525e9
...
...
@@ -14,8 +14,8 @@ import pickle
import
ast
import
logging.config
from
stomp
import
exception
from
time
import
time
from
time
import
sleep
from
time
import
time
from
dataset_maker
import
CSVData
from
multiprocessing
import
Process
...
...
@@ -28,42 +28,40 @@ ACTIVEMQ_PORT = os.environ.get("ACTIVEMQ_PORT")
predictionTimes
=
dict
()
models
=
dict
()
metrics_processes
=
dict
()
metrics
=
set
()
directory_path
=
"/morphemic_project/"
def
worker
(
self
,
body
,
metric
):
timestamp
=
body
[
'timestamp'
]
prediction_horizon
=
body
[
"prediction_horizon"
]
number_of_forward_predictions
=
body
[
"number_of_forward_predictions"
]
number_of_forward_predictions
=
body
[
"number_of_forward_predictions"
]
epoch_start
=
body
[
"epoch_start"
]
predictionTimes
[
metric
]
=
epoch_start
messages
=
list
()
f
=
0
while
not
os
.
path
.
isfile
(
directory_path
+
'models/prophet_'
+
metric
+
".pkl"
):
sleep
(
30
)
logging
.
debug
(
f
"Waiting for the trained model for metric:
{
metric
}
"
)
while
True
:
with
open
(
directory_path
+
"models/prophet_"
+
metric
+
".pkl"
,
'rb'
)
as
f
:
with
open
(
directory_path
+
"models/prophet_"
+
metric
+
".pkl"
,
'rb'
)
as
f
:
models
[
metric
]
=
pickle
.
load
(
f
)
timestamp
=
int
(
time
())
if
timestamp
>=
predictionTimes
[
metric
]:
predictions
=
prophet_forecaster
.
predict
(
models
[
metric
],
number_of_forward_predictions
,
prediction_horizon
,
epoch_start
)
timestamp
=
int
(
time
())
timestamp_horizon
=
timestamp
+
(
prediction_horizon
*
number_of_forward_predictions
)
if
timestamp_horizon
>=
predictionTimes
[
metric
]:
predictions
=
prophet_forecaster
.
predict
(
models
[
metric
],
number_of_forward_predictions
,
prediction_horizon
,
epoch_start
)
yhats
=
predictions
[
'yhat'
].
values
.
tolist
()
yhat_lowers
=
predictions
[
'yhat_lower'
].
values
.
tolist
()
yhat_uppers
=
predictions
[
'yhat_upper'
].
values
.
tolist
()
prediction_time
=
epoch_start
+
prediction_horizon
# change it to the time of the start_forecasting was sent
# read probabilities file
probs
=
np
.
load
(
directory_path
+
'prob_file.npy'
,
allow_pickle
=
'TRUE'
).
item
()
for
k
in
range
(
0
,
len
(
predictions
[
'yhat'
].
values
.
tolist
())):
probs
=
np
.
load
(
directory_path
+
'prob_file.npy'
,
allow_pickle
=
'TRUE'
).
item
()
for
k
in
range
(
0
,
len
(
predictions
[
'yhat'
].
values
.
tolist
())):
yhat
=
yhats
[
k
]
yhat_lower
=
yhat_lowers
[
k
]
yhat_upper
=
yhat_uppers
[
k
]
# wait until epoch_start to send
start_sending_time
=
time
.
time
()
message
=
{
"metricValue"
:
yhat
,
"level"
:
3
,
...
...
@@ -71,28 +69,41 @@ def worker(self, body, metric):
"probability"
:
probs
[
metric
],
"confidence_interval"
:
[
yhat_lower
,
yhat_upper
],
"horizon"
:
prediction_horizon
,
"predictionTime"
:
int
(
prediction_time
),
#
"predictionTime"
:
int
(
prediction_time
),
"refersTo"
:
"todo"
,
"cloud"
:
"todo"
,
"provider"
:
"todo"
}
self
.
connector
.
send_to_topic
(
'intermediate_prediction.prophet.'
+
metric
,
message
)
"provider"
:
"todo"
}
self
.
connector
.
send_to_topic
(
'intermediate_prediction.prophet.'
+
metric
,
message
)
prediction_time
=
prediction_time
+
prediction_horizon
epoch_start
=
epoch_start
+
prediction_horizon
execution_time
=
time
.
time
()
-
start_sending_time
sleep
(
prediction_horizon
-
execution_time
)
execution_time
=
int
(
time
())
-
timestamp
sleep_time
=
prediction_horizon
-
execution_time
sleep
(
sleep_time
)
class
Prophet
(
morphemic
.
handler
.
ModelHandler
,
messaging
.
listener
.
MorphemicListener
):
id
=
"prophet"
metrics
=
set
()
last_training_time
=
0
deployed_version
=
1
def
__init__
(
self
):
self
.
_run
=
False
self
.
connector
=
messaging
.
morphemic
.
Connection
(
ACTIVEMQ_USER
,
ACTIVEMQ_PASSWORD
,
host
=
ACTIVEMQ_HOSTNAME
,
port
=
ACTIVEMQ_PORT
)
self
.
connector
=
messaging
.
morphemic
.
Connection
(
ACTIVEMQ_USER
,
ACTIVEMQ_PASSWORD
,
host
=
ACTIVEMQ_HOSTNAME
,
port
=
ACTIVEMQ_PORT
)
def
run
(
self
):
self
.
connector
.
connect
()
retries
=
11
while
retries
>
10
:
try
:
self
.
connector
.
connect
()
retries
=
1
logging
.
debug
(
f
"Prophet connected to ActiveMQ"
)
except
:
logging
.
debug
(
f
"Prophet failed to connect to ActiveMQ"
)
sleep
(
5
)
retries
+=
1
self
.
connector
.
set_listener
(
self
.
id
,
self
)
self
.
connector
.
topic
(
"start_forecasting.prophet"
,
self
.
id
)
self
.
connector
.
topic
(
"stop_forecasting.prophet"
,
self
.
id
)
...
...
@@ -104,45 +115,68 @@ class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
self
.
run
()
pass
def
retrain
(
self
):
while
True
:
time
.
sleep
(
300
)
dataset_preprocessor
=
CSVData
(
APP_NAME
)
data_file_path
=
os
.
path
.
join
(
os
.
environ
.
get
(
"DATA_PATH"
,
"./"
),
f
'
{
os
.
environ
.
get
(
"APP_NAME"
,
"demo"
)
}
.csv'
)
dataset_preprocessor
.
prepare_csv
()
while
not
os
.
path
.
isfile
(
data_file_path
):
logging
.
debug
(
f
"Waiting for dataset to be loaded"
)
sleep
(
30
)
dataset_preprocessor
.
prepare_csv
()
for
metric
in
self
.
metrics
:
if
os
.
path
.
isfile
(
directory_path
+
'models/prophet_'
+
metric
+
"_"
+
self
.
deployed_version
+
".pkl"
):
logging
.
debug
(
f
"Retraining the Prophet model for metric:
{
metric
}
"
)
model
=
prophet_forecaster
.
train
(
metric
)
self
.
deployed_version
+=
1
new_version_pkl_path
=
directory_path
+
"models/prophet_"
+
metric
+
"_"
+
self
.
deployed_version
+
".pkl"
with
open
(
new_version_pkl_path
,
"wb"
)
as
f
:
pickle
.
dump
(
model
,
f
)
def
on_start_forecasting_prophet
(
self
,
body
):
logging
.
debug
(
f
"Prophet Start Forecasting the following metrics:"
)
sent_metrics
=
body
[
"metrics"
]
logging
.
debug
(
sent_metrics
)
for
metric
in
sent_metrics
:
if
metric
not
in
metrics
:
metrics
.
add
(
metric
)
if
metric
not
in
self
.
metrics
:
self
.
metrics
.
add
(
metric
)
if
metric
not
in
metrics_processes
:
metrics_processes
[
metric
]
=
Process
(
target
=
worker
,
args
=
(
self
,
body
,
metric
,))
metrics_processes
[
metric
]
.
start
()
metrics_processes
[
metric
]
=
Process
(
target
=
worker
,
args
=
(
self
,
body
,
metric
))
metrics_processes
[
metric
].
start
()
#self.retrain(sent_metrics)
def
on_metrics_to_predict
(
self
,
body
):
# getting data from dataset maker
dataset_preprocessor
=
CSVData
(
APP_NAME
)
data_file_path
=
os
.
path
.
join
(
os
.
environ
.
get
(
"DATA_PATH"
,
"./"
),
f
'
{
os
.
environ
.
get
(
"APP_NAME"
,
"demo"
)
}
.csv'
)
dataset_preprocessor
.
prepare_csv
()
while
not
os
.
path
.
isfile
(
data_file_path
):
logging
.
debug
(
f
"Waiting for dataset to be loaded"
)
sleep
(
30
)
dataset_preprocessor
.
prepare_csv
()
for
r
in
body
:
metric
=
r
[
'metric'
]
if
not
os
.
path
.
isfile
(
directory_path
+
'models/prophet_'
+
metric
+
".pkl"
):
if
not
os
.
path
.
isfile
(
directory_path
+
'models/prophet_'
+
metric
+
".pkl"
):
logging
.
debug
(
f
"Training a Prophet model for metric:
{
metric
}
"
)
model
=
prophet_forecaster
.
train
(
metric
)
pkl_path
=
directory_path
+
"models/prophet_"
+
metric
+
".pkl"
last_training_time
=
int
(
time
())
pkl_path
=
directory_path
+
"models/prophet_"
+
metric
+
".pkl"
with
open
(
pkl_path
,
"wb"
)
as
f
:
pickle
.
dump
(
model
,
f
)
metrics
.
add
(
metric
)
self
.
metrics
.
add
(
metric
)
self
.
connector
.
send_to_topic
(
"training_models"
,
{
"metrics"
:
list
(
metrics
),
"forecasting_method"
:
"Prophet"
,
"timestamp"
:
int
(
time
())
}
)
{
"metrics"
:
list
(
self
.
metrics
),
"forecasting_method"
:
"Prophet"
,
"timestamp"
:
int
(
time
())
}
)
def
on_stop_forecasting_prophet
(
self
,
body
):
logging
.
debug
(
f
"Prophet Stop Forecasting the following metrics:
{
body
[
'metrics'
]
}
"
)
...
...
@@ -156,7 +190,7 @@ class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
def
start
(
self
):
logging
.
debug
(
f
"Staring Prophet Forecaster"
)
self
.
run
()
self
.
_run
=
True
self
.
_run
=
True
def
on_disconnected
(
self
):
logging
.
debug
(
f
"Disconnected from ActiveMQ"
)
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment