Commit f52e2733 authored by Anna Warno's avatar Anna Warno
Browse files

ensember - influx integration

parent b37d3188
Pipeline #20419 passed with stage
in 2 minutes and 3 seconds
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
......@@ -2,11 +2,15 @@ FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR /wd
# COPY deployment/nbeats/requirements.txt .
COPY deployment/ensembler/requirements.txt .
RUN apt-get update && apt-get install -y build-essential g++ libgl1-mesa-glx libx11-6
RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
COPY deployment/ensembler/poetry.lock deployment/ensembler/pyproject.toml /wd/
RUN apt-get update && apt-get install -y build-essential g++ libgl1-mesa-glx libx11-6 \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir poetry \
\
&& poetry install --no-dev && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
......@@ -23,6 +27,9 @@ RUN cd /var/lib/morphemic/ \
&& mkdir -p /wd/logs
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["poetry", "run" , "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# For local tests:
# CMD ["poetry", "run" , "uvicorn", "app:app", "--reload"]
......@@ -2,12 +2,14 @@
import logging
from datetime import datetime
from pytz import timezone
import setproctitle
from fastapi import FastAPI
from src.messages_schemas import Prediction, EnsembleResponse
from src.env_config import create_env_config
from src.services import AMQService
from pytz import timezone
from ensembler.env_config import create_env_config
from ensembler.messages_schemas import EnsembleResponse, Prediction
from ensembler.services import AMQService
env_config = create_env_config()
setproctitle.setproctitle("Ensembler")
......
"""Script for data frame with prediction class"""
"""Script for data frame with predictions class
"""
import time
......
"""Script for downloading data from influx
"""
import os
import pandas as pd
from influxdb import DataFrameClient
class InfluxDataDownloader:
def __init__(
self,
) -> None:
"""class for downloading data from inlux,
necessary are columns with predictions and
real values
"""
self.influx_client = DataFrameClient(
host=os.environ.get("INFLUXDB_HOSTNAME", "localhost"),
port=int(os.environ.get("INFLUXDB_PORT", "8086")),
username=os.environ.get("INFLUXDB_USERNAME", "morphemic"),
password=os.environ.get("INFLUXDB_PASSWORD", "password"),
)
self.influx_client.switch_database(
os.environ.get("INFLUXDB_DBNAME", "morphemic")
)
@staticmethod
def convert_timestamp(data_frame: pd.DataFrame) -> pd.DataFrame:
"""convert date index to desired format
Args:
-------
data_frame (pd.DataFrame): data frame with
time index (pandas time index)
Returns:
-------
pd.DataFrame: data frame with date index
with desired format
"""
return pd.to_datetime(data_frame.index, unit="s").tz_convert(None)
def download_predictions(self, metric_name: str) -> pd.DataFrame:
"""Download predicted values from influx
Returns:
-------
pd.DataFrame: pandas data
frame with predictions values
"""
return self.influx_client.query(
f'SELECT * FROM "{metric_name}Predictions" WHERE time > now() - {os.environ.get("MAX_PAST_DAYS", 100)}d'
)[f"{metric_name}Predictions"]
def download_real(self, start_time: pd.DatetimeIndex) -> pd.DataFrame:
"""Download real values from influx
Args:
-------
start_time (pd.DatetimeIndex): first
date with predictions,
Returns:
-------
pd.DataFrame: pandas data
frame with real values from PS
"""
return self.influx_client.query(
f'SELECT * FROM "{os.environ.get("APP_NAME", "default_application")}" WHERE time > {start_time}'
)[os.environ.get("APP_NAME", "default_application")]
def download_data(self, metric_name: str, predictions_freq: int) -> pd.DataFrame:
"""
Download data from inlux
(2 tables one with predictions, second with
real values from PS), merge data and save them to csv
Args:
------
metric_name (str): metric name
predictions_freq (int): predictions
frequency (in seconds)
Returns:
-------
pd.DataFrame: pandas data
frame with real and predicted values
"""
predictions = self.download_predictions(metric_name)
predictions = self.convert_timestamp(predictions)
start_time = predictions.index.values[0]
real = self.download_data(start_time)
real.index = real["ems_time"]
real = self.convert_timestamp(real)
predictions = predictions.resample(
f"{predictions_freq}S", origin=start_time
).mean()
real = (
real.resample(f"{predictions_freq}S", origin=start_time)
.mean()
.rename({metric_name: "y"}, axis=1)
)["y"]
merged = pd.merge(
predictions,
real,
how="left",
left_index=True,
right_index=True,
).dropna(subset=["y"])
return merged
import logging
import time
import numpy as np
import pandas as pd
from pytorch_forecasting import TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
import numpy as np
import logging
import time
pd.options.mode.chained_assignment = None
......
"""Script for ensembler class"""
"""Script for ensembler class, currenntly
3 types of ensembling models are available:
* Mean ensembler
* Linnear programming
* Top k on last n
"""
import json
import logging
import time
import threading
from slugify import slugify
import time
from typing import List
import stomp
from src.mocking.helpers import mock_predictions_df
from src.models.ensembler_models import (
from slugify import slugify
from ensembler.dataset.data import PredictionsDF
from ensembler.dataset.download_data import InfluxDataDownloader
from ensembler.mocking.helpers import mock_predictions_df
from ensembler.models.ensembler_models import (
AverageEnsembler,
BestSubsetEnsembler,
LinnearProgrammingEnsembler,
)
from src.dataset.data import PredictionsDF
class Ensembler(stomp.ConnectionListener):
......@@ -28,17 +37,41 @@ class Ensembler(stomp.ConnectionListener):
self.prediction_dfs = {}
self.metrics_frequency = {}
self.predictions_tables_names = None
self.influx_data_dwonloader = InfluxDataDownloader()
def get_data(self, metric, columns):
def get_data(self, metric: str, columns: List[str], mock: bool = True) -> None:
"""Download data frame with predictions and real
values for given metric,
currently mocked."""
if not self.prediction_dfs[metric]:
self.prediction_dfs[metric] = PredictionsDF(mock_predictions_df(columns))
currently mocked.
Args:
-----
metric (str): metric name
columns (List[str]): list of columns
"""
if mock:
if not self.prediction_dfs[metric]:
self.prediction_dfs[metric] = PredictionsDF(
mock_predictions_df(columns)
)
else:
self.prediction_dfs[metric].update(mock_predictions_df(columns))
else:
self.prediction_dfs[metric].update(mock_predictions_df(columns))
if not self.prediction_dfs[metric]:
self.prediction_dfs[metric] = PredictionsDF(
self.influx_data_dwonloader.download_data(
metric, self.metrics_frequency[metric]
)
)
else:
self.prediction_dfs[metric].update(
self.influx_data_dwonloader.download_data(
metric, self.metrics_frequency[metric]
)
)
def get_predictions_fields(self):
def get_predictions_fields(self) -> None:
"""Get predictions columns names"""
self.predictions_tables_names = {
metric: [
......
"""Script with helpers for testing ensemblers, mocking ActiveMQ messages etc."""
import json
import pandas as pd
import numpy as np
import pandas as pd
class Msg(object):
......
"""Script for pythorch dataset class"""
import torch.utils.data as data
import torch
import re
import random
import re
import numpy as np
import torch
import torch.utils.data as data
class DatasetHistMask(torch.utils.data.Dataset):
......@@ -22,7 +23,7 @@ class DatasetHistMask(torch.utils.data.Dataset):
self,
networks_prediction_df,
models_list=["arima", "cnn"],
max_pred_len=5,
max_pred_len=1,
series_len=15,
target_column="y",
start_idx=0,
......@@ -34,13 +35,14 @@ class DatasetHistMask(torch.utils.data.Dataset):
self.get_rows(last_idx, start_idx)
self.models_list = models_list
assert len(self.models_list) > 1, "There must be more than one forecaster!"
self.series_len = series_len
self.max_pred_len = max_pred_len
self.x_columns = self.get_x_col_names()
self.series_lengths = self.get_series_lengths()
self.valid_indices = self.get_valid_indices()
self.df = self.df[self.x_columns + [target_column, "series_id"]]
self.df = self.df.loc[:, ~self.df.columns.duplicated()]
self.target_column = target_column
self.max_pred_len = max_pred_len
self.series_len = series_len
self.nan_fill_value = nan_fill_value
def get_x_col_names(self):
......@@ -61,7 +63,8 @@ class DatasetHistMask(torch.utils.data.Dataset):
indices = [
i
for i in range(self.df.shape[0])
if self.series_lengths.loc[self.df.iloc[i]["series_id"]] - i >= 15
if self.series_lengths.loc[self.df.iloc[i]["series_id"]] - i
>= self.series_len + self.max_pred_len
]
return indices
......@@ -79,11 +82,14 @@ class DatasetHistMask(torch.utils.data.Dataset):
def get_one_series_df_part(self, idx):
"""Get single series (consecutive rows from
data frame of length: series_len)"""
return self.df.iloc[idx : idx + self.series_len].reset_index()
series = self.df.iloc[idx : idx + self.series_len].reset_index()
series = series.loc[:, ~series.columns.duplicated()]
return series
def get_target_value(self, df):
"""Get target value, single y true value which
we want to approximate with ensembling"""
assert df[self.target_column].values[-1] != np.nan
return df[self.target_column].values[-1]
def replace_future_values(self, col, hist_len):
......@@ -101,7 +107,7 @@ class DatasetHistMask(torch.utils.data.Dataset):
0 for _ in range(self.series_len - hist_len)
]
for col in self.x_columns:
x[f"{col}_mask"] = x[col].isna().astype(int)
x[f"{col}_mask"] = x[col].notna().astype(int)
return x
def get_predictions_to_ensemble(self, x):
......@@ -114,7 +120,7 @@ class DatasetHistMask(torch.utils.data.Dataset):
def to_tensors(x, preds, y):
"""Convert network input to tensor"""
return (
torch.tensor(x.to_numpy().astype(np.float32)),
torch.tensor(x.fillna(0).to_numpy().astype(np.float32)),
torch.tensor(preds.fillna(0).values.astype(np.float32)).squeeze(),
torch.tensor(y),
)
......
"""SCript for deep learning helpers fucnctions"""
import torch.nn as nn
import torch
import torch.nn as nn
from torch.nn.utils import weight_norm
......
"""Script for Ensembler torch network"""
import torch.nn as nn
import torch
from dl_helpers import (
masked_softmax,
calculate_matching_padding,
base_conv_layer,
attention_module,
)
import torch.nn as nn
from dl_helpers import (attention_module, base_conv_layer, base_linear_module,
calculate_matching_padding, masked_softmax)
class FCNN(nn.Module):
"""Torch ensemble network"""
def __init__(self, forecasters_num=6, n_inputs=[11, 32, 64], series_len=21):
super().__init__()
self.forecasters_num = forecasters_num
n_inputs[0] = n_inputs[0] * series_len
layers = [
base_linear_module(in_f, out_f)
for in_f, out_f in zip(n_inputs[:-1], n_inputs[1:])
] + [nn.Linear(n_inputs[-1], forecasters_num)]
self.fcnn = nn.Sequential(*layers)
def get_mask_and_preds(self, prediction):
"""Get binary mask for predictions which
are not present and predicted values"""
return (
prediction[:, self.forecasters_num :],
prediction[:, : self.forecasters_num],
)
def forward(self, batch):
"""model forward function"""
x, preds_to_ensemble = batch
x = torch.flatten(x, start_dim=1)
x = self.fcnn(x)
mask, preds = self.get_mask_and_preds(preds_to_ensemble)
x = masked_softmax(x, mask)
return torch.sum(preds * x, 1)
class EnsemblerRegressorModel(nn.Module):
......@@ -15,14 +42,14 @@ class EnsemblerRegressorModel(nn.Module):
def __init__(
self,
forecasters_num=2,
n_inputs=[11, 16, 32],
n_outputs=[16, 32, 64],
kernel_size=[5, 4, 4],
stride=[1, 1, 2],
dilation=[1, 2, 1],
series_len=16,
with_att=True,
forecasters_num=6,
n_inputs=[11, 32, 64],
n_outputs=[32, 64, 128],
kernel_size=[5, 5, 3],
stride=[1, 1, 1],
dilation=[1, 1, 1],
series_len=21,
with_att=False,
):
super().__init__()