Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Melodic
morphemic-preprocessor
Commits
72ea6e67
Commit
72ea6e67
authored
Jun 28, 2021
by
Anna Warno
Browse files
confidence intervals corrected
parent
e81ffcab
Changes
12
Hide whitespace changes
Inline
Side-by-side
deployment/nbeats/Dockerfile
View file @
72ea6e67
...
...
@@ -2,11 +2,11 @@ FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR
/wd
COPY
deployment/
tft
/requirements.txt .
COPY
deployment/
nbeats
/requirements.txt .
RUN
pip3
install
--no-cache-dir
-r
requirements.txt
&&
mkdir
models
# Copy the rest of the codebase into the image
COPY
deployment/
tft
./
COPY
deployment/
nbeats
./
COPY
morphemic-datasetmaker ./morphemic-datasetmaker
COPY
amq-message-python-library ./amq-message-python-library
...
...
deployment/nbeats/main.py
View file @
72ea6e67
...
...
@@ -76,6 +76,7 @@ class Msg(object):
def
main
():
print
(
"STARTTTTT"
)
logging
.
getLogger
().
setLevel
(
logging
.
DEBUG
)
start_app_conn
=
morphemic
.
Connection
(
...
...
@@ -95,17 +96,18 @@ def main():
"2"
,
StartForecastingListener
(
start_conn
.
conn
,
START_TOPIC
)
)
# msg1 = Msg()
# msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["cpu_usage"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60'
# + "}"
# )
# StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
msg1
=
Msg
()
msg1
.
body
=
'[{"metric": "cpu_usage", "level": 3, "publish_rate": 60000}]'
msg2
=
Msg
()
msg2
.
body
=
(
"{"
+
f
'"metrics": ["cpu_usage"],"timestamp":
{
int
(
time
.
time
())
}
, "epoch_start":
{
int
(
time
.
time
())
+
30
}
, "number_of_forward_predictions": 8,"prediction_horizon": 60'
+
"}"
)
print
(
msg1
)
StartListener
(
start_conn
.
conn
,
START_APP_TOPIC
).
on_message
(
msg1
)
StartForecastingListener
(
start_conn
.
conn
,
START_APP_TOPIC
).
on_message
(
msg2
)
while
True
:
pass
...
...
deployment/nbeats/model.yaml
View file @
72ea6e67
...
...
@@ -2,8 +2,8 @@ data:
csv_path
:
demo.csv
training
:
bs
:
8
max_epochs
:
1
loss
:
quantil
e
max_epochs
:
5
loss
:
rms
e
dataset
:
tv_unknown_reals
:
[]
known_reals
:
[]
...
...
deployment/nbeats/predict.py
View file @
72ea6e67
...
...
@@ -66,12 +66,12 @@ class CustomListener(stomp.ConnectionListener):
def
main
():
logging
.
debug
(
f
"metrics to predict
{
predicted_metrics
}
"
)
start_conn
=
morphemic
.
Connection
(
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
,
debug
=
True
)
start_conn
.
connect
()
stop_conn
=
morphemic
.
Connection
(
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
,
debug
=
True
)
stop_conn
.
connect
()
...
...
deployment/nbeats/retrain.py
View file @
72ea6e67
...
...
@@ -10,7 +10,7 @@ from datetime import timedelta
from
src.dataset_maker
import
CSVData
TOPIC_NAME
=
"training_models"
RETRAIN_CYCLE
=
2
# minutes
RETRAIN_CYCLE
=
10
# minutes
HOSTS
=
(
os
.
environ
.
get
(
"AMQ_HOSTNAME"
,
"localhost"
),
os
.
environ
.
get
(
"AMQ_PORT"
,
61613
))
AMQ_USER
=
os
.
environ
.
get
(
"AMQ_USER"
,
"admin"
)
AMQ_PASSWORD
=
os
.
environ
.
get
(
"AMQ_PASSWORD"
,
"admin"
)
...
...
@@ -28,10 +28,6 @@ def main(predicted_metrics, prediction_horizon):
dataset_preprocessor
=
CSVData
(
APP_NAME
)
dataset_preprocessor
.
prepare_csv
()
for
metric
in
predicted_metrics
:
retrain_msg
=
train
(
metric
,
prediction_horizon
)
start_conn
.
send_to_topic
(
TOPIC_NAME
,
retrain_msg
)
while
True
:
start_time
=
int
(
time
.
time
())
logging
.
debug
(
"TRAINING"
)
...
...
deployment/nbeats/src/model_predict.py
View file @
72ea6e67
...
...
@@ -78,13 +78,13 @@ def predict(
msg
=
{
target_column
:
{
"metricValue"
:
prediction
[
-
1
]
[
-
1
][
2
]
.
item
(),
"metricValue"
:
prediction
[
-
1
].
item
(),
"level"
:
1
,
# TODO
"timestamp"
:
int
(
time
.
time
()),
"probability"
:
0.95
,
"confidence_interval"
:
[
prediction
[
-
1
]
[
-
1
][
0
]
.
item
(),
prediction
[
-
1
]
[
-
1
][
-
1
]
.
item
(),
prediction
[
-
1
].
item
(),
prediction
[
-
1
].
item
(),
],
# quantiles difference
"predictionTime"
:
timestamp
+
prediction_hor
*
m
//
1000
,
"refersTo"
:
"TODO"
,
...
...
@@ -95,5 +95,5 @@ def predict(
logging
.
debug
(
f
"prediction msg:
{
msg
}
"
)
future_df
[
"split"
]
=
"val"
future_df
[
target_column
]
=
prediction
.
permute
(
0
,
2
,
1
)[
0
][
2
]
future_df
[
target_column
]
=
prediction
[
-
1
].
item
()
return
(
msg
,
future_df
)
deployment/nbeats/src/model_train.py
View file @
72ea6e67
...
...
@@ -33,7 +33,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
os
.
environ
.
get
(
"DATA_PATH"
,
"./"
),
f
'
{
os
.
environ
.
get
(
"APP_NAME"
,
"demo"
)
}
.csv'
)
dataset
=
pd
.
read_csv
(
data_path
)
dataset
=
pd
.
read_csv
(
data_path
)
.
head
(
1000
)
ts_dataset
=
Dataset
(
dataset
,
target_column
=
target_column
,
**
params
[
"dataset"
])
...
...
deployment/nbeats/src/preprocess_dataset.py
View file @
72ea6e67
...
...
@@ -72,23 +72,14 @@ class Dataset(object):
self
.
dataset
[
lambda
x
:
x
.
split
==
"train"
],
time_idx
=
"time_idx"
,
target
=
self
.
target_column
,
categorical_encoders
=
{
"series"
:
NaNLabelEncoder
().
fit
(
self
.
dataset
.
series
)},
group_ids
=
[
"series"
],
min_encoder_length
=
self
.
context_length
,
# keep encoder length long (as it is in the validation set)
time_varying_unknown_reals
=
[
self
.
target_column
],
min_encoder_length
=
self
.
context_length
,
max_encoder_length
=
self
.
context_length
,
min_prediction_length
=
self
.
prediction_length
,
max_prediction_length
=
self
.
prediction_length
,
static_categoricals
=
[],
static_reals
=
self
.
static_reals
,
time_varying_known_categoricals
=
[],
categorical_encoders
=
{
"series"
:
NaNLabelEncoder
().
fit
(
self
.
dataset
.
series
)},
variable_groups
=
{},
# group of categorical variables can be treated as one variable
time_varying_known_reals
=
[
"time_idx"
]
+
self
.
known_reals
,
time_varying_unknown_categoricals
=
self
.
tv_unknown_cat
,
time_varying_unknown_reals
=
[
self
.
target_column
]
+
self
.
tv_unknown_reals
,
add_relative_time_idx
=
True
,
add_target_scales
=
True
if
not
self
.
classification
else
False
,
add_encoder_length
=
True
,
allow_missings
=
True
,
min_prediction_length
=
self
.
prediction_length
,
add_relative_time_idx
=
False
,
)
return
ts_dataset
...
...
deployment/tft/main.py
View file @
72ea6e67
...
...
@@ -79,11 +79,11 @@ def main():
logging
.
getLogger
().
setLevel
(
logging
.
DEBUG
)
start_app_conn
=
morphemic
.
Connection
(
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
,
debug
=
True
)
start_app_conn
.
connect
()
start_conn
=
morphemic
.
Connection
(
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
,
debug
=
True
)
start_conn
.
connect
()
...
...
deployment/tft/predict.py
View file @
72ea6e67
...
...
@@ -66,12 +66,12 @@ class CustomListener(stomp.ConnectionListener):
def
main
():
logging
.
debug
(
f
"metrics to predict
{
predicted_metrics
}
"
)
start_conn
=
morphemic
.
Connection
(
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
,
debug
=
True
)
start_conn
.
connect
()
stop_conn
=
morphemic
.
Connection
(
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
,
debug
=
True
)
stop_conn
.
connect
()
...
...
deployment/tft/retrain.py
View file @
72ea6e67
...
...
@@ -21,7 +21,7 @@ APP_NAME = os.environ.get("APP_NAME", "demo")
def
main
(
predicted_metrics
,
prediction_horizon
):
start_conn
=
morphemic
.
Connection
(
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
AMQ_USER
,
AMQ_PASSWORD
,
host
=
AMQ_HOST
,
port
=
AMQ_PORT_BROKER
,
debug
=
True
)
start_conn
.
connect
()
...
...
deployment/tft/src/model_predict.py
View file @
72ea6e67
...
...
@@ -83,15 +83,23 @@ def predict(
prediction_input
=
prediction_input
.
to_dataloader
(
train
=
False
)
prediction
=
tft
.
predict
(
prediction_input
,
mode
=
"raw"
)[
"prediction"
]
predicted_values
=
[
prediction
[
-
1
][
-
1
][
0
].
item
(),
prediction
[
-
1
][
-
1
][
3
].
item
(),
prediction
[
-
1
][
-
1
][
-
1
].
item
(),
]
predicted_values
.
sort
()
msg
=
{
target_column
:
{
"metricValue"
:
predict
ion
[
-
1
][
-
1
][
2
].
item
()
,
"metricValue"
:
predict
ed_values
[
1
]
,
"level"
:
1
,
# TODO
"timestamp"
:
int
(
time
.
time
()),
"probability"
:
0.95
,
"confidence_interval"
:
[
predict
ion
[
-
1
][
-
1
][
0
].
item
()
,
predict
ion
[
-
1
][
-
1
][
-
1
].
item
()
,
predict
ed_values
[
0
]
,
predict
ed_values
[
-
1
]
,
],
# quantiles difference
"predictionTime"
:
timestamp
,
"refersTo"
:
"TODO"
,
...
...
@@ -102,5 +110,5 @@ def predict(
logging
.
debug
(
f
"prediction msg:
{
msg
}
"
)
future_df
[
"split"
]
=
"val"
future_df
[
target_column
]
=
prediction
.
permute
(
0
,
2
,
1
)[
0
][
2
]
future_df
[
target_column
]
=
prediction
.
permute
(
0
,
2
,
1
)[
0
][
3
]
return
(
msg
,
future_df
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment