Commit 3e2ecefa authored by maciek riedl's avatar maciek riedl
Browse files

Merge branch 'tft_nbeats' into 'morphemic-rc2.0'

Ensembler - influx integration

See merge request !276
parents dd28e8b7 f52e2733
Pipeline #20425 passed with stages
in 30 minutes and 12 seconds
This diff is collapsed.
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR /wd
COPY deployment/ensembler/poetry.lock deployment/ensembler/pyproject.toml /wd/
RUN apt-get update && apt-get install -y build-essential g++ libgl1-mesa-glx libx11-6 \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir poetry \
\
&& poetry install --no-dev && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY deployment/ensembler/ ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc1.5 \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc1.5/amq-message-python-library /wd/amq_message_python_library \
&& rm -rf /var/lib/morphemic \
&& mkdir -p /wd/logs
CMD ["poetry", "run" , "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# For local tests:
# CMD ["poetry", "run" , "uvicorn", "app:app", "--reload"]
# Ensembler
To test locally:
cd ../../
docker build -t ensembler -f ./deployment/ensembler/Dockerfile .
docker run -t --env-file=deployment/ensembler/env -v <LOGS PATH>:/wd/logs --network=host ensembler
Go to local amw web cosnole: http://localhost:8161/admin
send example start_ensembler message:
{
"metrics":[
{
"metric":"MaxCPULoad",
"level":3,
"publish_rate":60000
},
{
"metric":"MinCPULoad",
"level":3,
"publish_rate":50000
}
],
"models":[
"tft",
"nbeats",
"gluon"
]
}
send ensemble request:
curl -i http://127.0.0.1:8000/ensemble -X POST -H 'Content-Type: application/json' -d '{"method":"BestSubset", "metric": "MaxCPULoad", "predictionTime": 1234567, "predictionsToEnsemble": {"tft": 0, "nbeats": null, "gluon": 9}}' -w '\n'
"""main script for ensembler module"""
import logging
from datetime import datetime
import setproctitle
from fastapi import FastAPI
from pytz import timezone
from ensembler.env_config import create_env_config
from ensembler.messages_schemas import EnsembleResponse, Prediction
from ensembler.services import AMQService
env_config = create_env_config()
setproctitle.setproctitle("Ensembler")
logging.basicConfig(
filename=f"logs/{env_config['LOGING_FILE_NAME']}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(env_config["TZ"])
).timetuple()
amq_service = AMQService(env_config)
app = FastAPI()
@app.post("/ensemble", response_model=EnsembleResponse)
async def add_country(prediction: Prediction):
"""Function for returning ensembled value on request"""
return amq_service.ens.on_ensemble(prediction.dict())
log = logging.getLogger()
log.info(f"Ensebler service started for application: {env_config['APP_NAME']}")
"""Script for data frame with predictions class
"""
import time
class PredictionsDF:
"""Predictions data frame class
df: data frame with predictions and real value,
target_column: target column name"""
def __init__(self, data_frame, target_column="y"):
"""Init method"""
self.df = data_frame
self.last_update_time = int(time.time())
self.target_column = target_column
def update(self, data_frame):
"""Upade method, change df, and updates last_update_time"""
self.data_frame = data_frame
self.last_update_time = int(time.time())
"""Script for downloading data from influx
"""
import os
import pandas as pd
from influxdb import DataFrameClient
class InfluxDataDownloader:
def __init__(
self,
) -> None:
"""class for downloading data from inlux,
necessary are columns with predictions and
real values
"""
self.influx_client = DataFrameClient(
host=os.environ.get("INFLUXDB_HOSTNAME", "localhost"),
port=int(os.environ.get("INFLUXDB_PORT", "8086")),
username=os.environ.get("INFLUXDB_USERNAME", "morphemic"),
password=os.environ.get("INFLUXDB_PASSWORD", "password"),
)
self.influx_client.switch_database(
os.environ.get("INFLUXDB_DBNAME", "morphemic")
)
@staticmethod
def convert_timestamp(data_frame: pd.DataFrame) -> pd.DataFrame:
"""convert date index to desired format
Args:
-------
data_frame (pd.DataFrame): data frame with
time index (pandas time index)
Returns:
-------
pd.DataFrame: data frame with date index
with desired format
"""
return pd.to_datetime(data_frame.index, unit="s").tz_convert(None)
def download_predictions(self, metric_name: str) -> pd.DataFrame:
"""Download predicted values from influx
Returns:
-------
pd.DataFrame: pandas data
frame with predictions values
"""
return self.influx_client.query(
f'SELECT * FROM "{metric_name}Predictions" WHERE time > now() - {os.environ.get("MAX_PAST_DAYS", 100)}d'
)[f"{metric_name}Predictions"]
def download_real(self, start_time: pd.DatetimeIndex) -> pd.DataFrame:
"""Download real values from influx
Args:
-------
start_time (pd.DatetimeIndex): first
date with predictions,
Returns:
-------
pd.DataFrame: pandas data
frame with real values from PS
"""
return self.influx_client.query(
f'SELECT * FROM "{os.environ.get("APP_NAME", "default_application")}" WHERE time > {start_time}'
)[os.environ.get("APP_NAME", "default_application")]
def download_data(self, metric_name: str, predictions_freq: int) -> pd.DataFrame:
"""
Download data from inlux
(2 tables one with predictions, second with
real values from PS), merge data and save them to csv
Args:
------
metric_name (str): metric name
predictions_freq (int): predictions
frequency (in seconds)
Returns:
-------
pd.DataFrame: pandas data
frame with real and predicted values
"""
predictions = self.download_predictions(metric_name)
predictions = self.convert_timestamp(predictions)
start_time = predictions.index.values[0]
real = self.download_data(start_time)
real.index = real["ems_time"]
real = self.convert_timestamp(real)
predictions = predictions.resample(
f"{predictions_freq}S", origin=start_time
).mean()
real = (
real.resample(f"{predictions_freq}S", origin=start_time)
.mean()
.rename({metric_name: "y"}, axis=1)
)["y"]
merged = pd.merge(
predictions,
real,
how="left",
left_index=True,
right_index=True,
).dropna(subset=["y"])
return merged
import logging
import time
import numpy as np
import pandas as pd
from pytorch_forecasting import TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
pd.options.mode.chained_assignment = None
"""Script for preparing time series dataset from pythorch-forecasting package
TODO: add checking whether data consists of multiple series, handle nans values"""
class Dataset(object):
def __init__(
self,
dataset,
target_column="value",
time_column="ems_time",
tv_unknown_reals=[],
known_reals=[],
tv_unknown_cat=[],
static_reals=[],
classification=0,
context_length=40,
prediction_length=5,
publish_rate=10000,
):
self.max_missing_values = (
20 # max consecutive missing values allowed per series
)
self.target_column = target_column
self.time_column = time_column
self.tv_unknown_cat = tv_unknown_cat
self.known_reals = known_reals
self.tv_unknown_reals = tv_unknown_reals
self.static_reals = static_reals
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.publish_rate = publish_rate
self.dataset = dataset
self.dropped_recent_series = True # default set to be true
if self.dataset.shape[0] > 0:
self.check_gap()
self.n = dataset.shape[0]
if self.dataset.shape[0] > 0:
self.ts_dataset = self.create_time_series_dataset()
def cut_nan_start(self, dataset):
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan
if first_not_nan_index is not None:
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
else:
return dataset.dropna()
def fill_na(self, dataset):
dataset = dataset.replace(np.inf, np.nan)
dataset = dataset.ffill(axis="rows")
return dataset
def convert_formats(self, dataset):
if not self.classification:
dataset[self.target_column] = dataset[self.target_column].astype(float)
else:
dataset[self.target_column] = dataset[self.target_column].astype(int)
for name in self.tv_unknown_cat:
dataset[name] = dataset[name].astype(str)
return dataset
def convert_time_to_ms(self):
if self.dataset.shape[0] > 0:
digit_len = len(str(int(self.dataset[self.time_column].values[0])))
if digit_len >= 13:
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(str(int(x))[:13])
)
else:
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(int(str(int(x))[:digit_len]) * 10 ** (13 - digit_len))
)
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(x // 1e4 * 1e4)
)
def add_obligatory_columns(self, dataset):
n = dataset.shape[0]
dataset["time_idx"] = range(n) # TODO check time gaps
return dataset
def get_time_difference_current(self):
if self.dataset.shape[0] > 0:
last_timestamp_database = self.dataset[self.time_column].values[-1]
current_time = int(time.time())
print(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
)
logging.info(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
)
def check_gap(self):
if (self.dataset.shape[0] > 0) and (self.target_column in self.dataset.columns):
self.dataset = self.dataset.groupby(by=[self.time_column]).min()
self.dataset[self.time_column] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.convert_time_to_ms()
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.replace(np.inf, np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset[self.time_column].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
series_freq = (
(self.dataset[self.time_column])
.diff()
.fillna(0)
.value_counts()
.index.values[0]
)
logging.info(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
if series_freq != self.publish_rate:
logging.info(
f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!"
)
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!"
)
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset[self.time_column]
.diff()
.abs()
.fillna(0)
.astype(int)
>= np.abs(self.max_missing_values * self.publish_rate)
),
)
logging.info(f"Metric: {self.target_column} {len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s["split"] = "train"
s = self.convert_formats(s)
logging.info(
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
)
logging.info(
f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found"
)
print(f"{len(preprocessed_series)} long enough series found")