Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Melodic
morphemic-preprocessor
Commits
323213ae
Commit
323213ae
authored
Apr 07, 2022
by
maciek riedl
Browse files
Merge branch 'iccs-eshybrid-2.0' into 'morphemic-rc2.0'
More logs, and initialization sequence See merge request
!281
parents
3a03dfb3
e59bf1f8
Pipeline
#20536
failed with stages
in 24 minutes and 5 seconds
Changes
4
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
morphemic-forecasting-eshybrid/forecasting/eshybrid.py
View file @
323213ae
...
...
@@ -13,7 +13,9 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
forecast_config
=
dict
()
scheduler
=
False
application
=
'default_application'
state
=
'pending_metrics'
forecasting_version
=
False
_last_training_time
=
False
def
__init__
(
self
,
config
):
self
.
_run
=
False
...
...
@@ -117,16 +119,24 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self
.
_run
=
self
.
run
()
if
self
.
_run
:
logging
.
info
(
"ESHYBRID_STARTED"
)
while
self
.
_run
:
if
self
.
scheduler
:
self
.
scheduler
.
check
(
self
)
else
:
if
len
(
self
.
metrics
)
<=
0
:
logging
.
info
(
"Waiting for metrics_to_predict"
)
else
:
logging
.
info
(
"Waiting for start_forecasting"
)
time
.
sleep
(
1
)
if
self
.
state
==
'forecasting'
:
self
.
scheduler
.
check
(
self
)
continue
if
self
.
state
==
'pending_metrics'
:
logging
.
info
(
"Waiting for metrics_to_predict"
)
continue
if
self
.
state
==
'pending_forecast'
:
logging
.
info
(
"Waiting for start_forecasting"
)
continue
self
.
connector
.
disconnect
()
...
...
@@ -156,13 +166,19 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
)
#adding simple method to retrain the model
if
time
.
time
()
-
self
.
_last_training_time
>
3000
:
if
self
.
_last_training_time
and
time
.
time
()
-
self
.
_last_training_time
>
3000
:
self
.
_train_model
()
def
_train_model
(
self
):
self
.
dataset
.
make
()
self
.
_last_training_time
=
time
.
time
()
if
not
os
.
path
.
exists
(
"%s/%s.csv"
%
(
self
.
data_set_path
,
self
.
application
)):
logging
.
error
(
"**** NO DATA FROM DATASET MAKER ****"
)
return
self
.
model
.
train
(
"%s/%s.csv"
%
(
self
.
data_set_path
,
self
.
application
),
self
.
metrics
)
self
.
_last_training_time
=
time
.
time
()
def
on_train
(
self
,
model
):
...
...
@@ -176,7 +192,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def
on_metrics_to_predict
(
self
,
res
):
logging
.
debug
(
"[2] Metrics to predict %s "
%
res
)
if
len
(
self
.
metrics
)
:
if
self
.
state
!=
'pending_
metrics
'
:
logging
.
warning
(
"[2] We already have metrics >> %s "
%
self
.
metrics
)
return
...
...
@@ -185,12 +201,13 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self
.
metrics
.
remove
(
metric
)
self
.
metrics
=
[
x
[
'metric'
]
for
x
in
res
]
self
.
state
=
'pending_forecast'
def
on_stop_forecasting_eshybrid
(
self
,
res
):
logging
.
debug
(
"[6] Stop Subscribing %s "
%
res
)
if
not
self
.
forecasting
_version
:
if
self
.
state
!=
'
forecasting
'
:
return
for
metric
in
res
[
messaging
.
events
.
StopForecasting
.
METRICS
]:
...
...
@@ -202,12 +219,13 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self
.
forecast_config
=
res
if
not
len
(
self
.
metrics
):
self
.
scheduler
=
False
self
.
state
=
'pending_forecast'
def
on_start_forecasting_eshybrid
(
self
,
res
):
logging
.
debug
(
"[7] Start Forecasting %s "
%
res
)
if
self
.
forecasting_version
and
self
.
forecasting_version
==
res
[
messaging
.
events
.
StartF
orecasting
.
VERSION
]
:
if
self
.
state
==
'f
orecasting
'
:
logging
.
warning
(
"Already forecasting"
)
return
...
...
@@ -216,6 +234,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
logging
.
error
(
"Start forecasting before metrics to predict "
)
return
for
metric
in
res
[
messaging
.
events
.
StartForecasting
.
METRICS
]:
if
metric
not
in
self
.
metrics
:
logging
.
debug
(
"Subscribing to %s "
%
metric
)
...
...
@@ -228,6 +247,8 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
horizon
=
res
[
messaging
.
events
.
StartForecasting
.
PREDICTION_HORIZON
]
)
self
.
state
=
'forecasting'
def
on_error
(
self
,
headers
,
body
):
logging
.
error
(
"Headers %s"
,
headers
)
logging
.
error
(
" %s"
,
body
)
...
...
morphemic-forecasting-eshybrid/morphemic/dataset.py
deleted
100644 → 0
View file @
3a03dfb3
import
time
,
json
,
requests
class
Dataset
:
def
__init__
(
self
,
url
,
port
,
database
,
application
,
usernaname
,
password
):
self
.
_url
=
url
self
.
_port
=
port
self
.
_database
=
database
self
.
_application
=
application
self
.
_usernaname
=
usernaname
self
.
_password
=
password
def
count
(
self
):
"""
curl -XPOST localhost:8086/api/v2/query -sS -H 'Accept:application/csv' -H 'Content-type:application/vnd.flux' -H 'Authorization: Token username:password' -d 'from(bucket:"telegraf")|> range(start:-5m) |> filter(fn:(r) => r._measurement == "cpu")'
"""
url
=
self
.
_url
username
=
self
.
_port
password
=
self
.
_password
database
=
self
.
_database
application
=
'demo'
params
=
'-sS'
headers
=
{
'Accept'
:
'application/csv'
,
'Content-type'
:
'application/vnd.flux'
,
'Authorization'
:
'Token '
+
username
+
':'
+
password
}
data_post
=
'from(bucket:"'
+
database
+
'")|> range(start:-5m)|> filter(fn:(r) => r._measurement == "'
+
application
+
'")'
response
=
requests
.
post
(
url
+
'/api/v2/query'
,
data
=
json
.
dumps
(
data_post
),
headers
=
headers
)
morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py
View file @
323213ae
import
os
,
json
,
time
import
os
,
json
,
time
from
influxdb
import
InfluxDBClient
import
pandas
as
pd
import
pandas
as
pd
from
datetime
import
datetime
url_path_dataset
=
None
url_path_dataset
=
None
class
Row
():
def
__init__
(
self
,
features
,
metricsname
):
self
.
features
=
features
self
.
features
=
features
if
"time"
in
self
.
features
:
time_str
=
self
.
features
[
"time"
]
_obj
=
None
_obj
=
None
try
:
_obj
=
datetime
.
strptime
(
time_str
,
'%Y-%m-%dT%H:%M:%S.%fZ'
)
except
:
...
...
@@ -20,7 +20,7 @@ class Row():
metricsname
.
remove
(
'application'
)
for
field_name
in
metricsname
:
if
not
field_name
in
self
.
features
:
self
.
features
[
field_name
]
=
None
self
.
features
[
field_name
]
=
None
def
getTime
(
self
):
...
...
@@ -28,7 +28,7 @@ class Row():
return
self
.
features
[
"time"
]
if
"timestamp"
in
self
.
features
:
return
self
.
features
[
"timestamp"
]
return
None
return
None
def
makeCsvRow
(
self
):
if
"application"
in
self
.
features
:
...
...
@@ -43,14 +43,14 @@ class Dataset():
self
.
rows
=
{}
self
.
size
=
0
def
addRow
(
self
,
row
):
self
.
rows
[
row
.
getTime
()]
=
row
self
.
rows
[
row
.
getTime
()]
=
row
self
.
size
+=
1
def
reset
(
self
):
self
.
rows
=
{}
self
.
size
=
0
print
(
"Dataset reset"
)
def
getSize
(
self
):
return
self
.
size
return
self
.
size
def
sortRows
(
self
):
return
sorted
(
list
(
self
.
rows
.
values
()),
key
=
lambda
x
:
x
.
getTime
(),
reverse
=
True
)
def
getRows
(
self
):
...
...
@@ -59,7 +59,7 @@ class Dataset():
for
i
in
range
(
tolerance
):
if
int
(
_time
+
i
)
in
self
.
rows
:
return
self
.
rows
[
int
(
_time
+
i
)]
return
None
return
None
def
save
(
self
,
metricnames
,
application_name
):
if
"application"
in
metricnames
:
metricnames
.
remove
(
"application"
)
...
...
@@ -95,7 +95,7 @@ class DatasetMaker():
for
column
in
columns
:
row
[
column
]
=
values
[
index
]
index
+=
1
return
row
return
row
def
prepareResultSet
(
self
,
result_set
):
result
=
[]
...
...
@@ -121,7 +121,7 @@ class DatasetMaker():
for
_row
in
_data
:
row
=
Row
(
_row
,
metricnames
)
self
.
dataset
.
addRow
(
row
)
print
(
"Rows construction completed"
)
print
(
"{0} rows found"
.
format
(
self
.
dataset
.
getSize
()))
#self.dataset.sortRows()
...
...
@@ -130,26 +130,25 @@ class DatasetMaker():
if
features
==
None
:
return
{
'status'
:
False
,
'message'
:
'An error occured while building dataset'
}
return
{
'status'
:
True
,
'url'
:
url
,
'application'
:
self
.
application
,
'features'
:
features
}
def
getFeatures
(
self
,
url
):
try
:
df
=
pd
.
read_csv
(
url
)
return
df
.
columns
.
to_list
()
except
Exception
as
e
:
print
(
"Cannot extract data feature list"
)
return
None
return
None
def
extractMeasurement
(
self
,
_json
):
return
_json
[
"series"
][
0
][
"columns"
]
def
getData
(
self
):
query
=
None
query
=
None
try
:
if
self
.
start_filter
!=
None
and
self
.
start_filter
!=
""
:
query
=
"SELECT * FROM "
+
self
.
application
+
" WHERE time > now() - "
+
self
.
start_filter
else
:
query
=
"SELECT * FROM "
+
self
.
application
query
=
"SELECT * FROM "
+
self
.
application
result_set
=
self
.
influxdb
.
query
(
query
=
query
)
series
=
self
.
extractMeasurement
(
result_set
.
raw
)
#self.influxdb.close() #closing connexion
...
...
morphemic-forecasting-eshybrid/morphemic/model.py
View file @
323213ae
...
...
@@ -152,6 +152,7 @@ class Model:
self
.
status
=
ModelStatus
.
IDLE
def
train
(
self
,
dataset_path
,
metrics
):
_logger
.
info
(
"Start training for %s in %s "
%
(
metrics
,
dataset_path
,))
if
self
.
status
==
ModelStatus
.
IDLE
:
t
=
threading
.
Thread
(
target
=
self
.
_retrain
,
...
...
@@ -162,6 +163,7 @@ class Model:
def
predict
(
self
,
metric
,
times
):
_logger
.
debug
(
"Request prediction for %s @ %s "
%
(
metric
,
times
,))
if
not
self
.
_model
:
_logger
.
error
(
"No model trained yet"
)
return
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment