Commit 92b62634 authored by Anna Warno's avatar Anna Warno
Browse files

not enough data error fixed

parent 58e2a97f
......@@ -9,7 +9,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY . ./
COPY deployment/nbeats/ ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
......
......@@ -3,10 +3,10 @@ AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_PORT=61613
APP_NAME=demo
METHOD=nbeats
METHOD=tft
DATA_PATH=./
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
INFLUXDB_DBNAME=morphemic
\ No newline at end of file
......@@ -33,6 +33,10 @@ def main(predicted_metrics, prediction_horizon):
logging.debug("TRAINING")
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
if retrain_msg:
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
else:
print("Not enough data for model training, waiting ...")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
end_time = int(time.time())
......
......@@ -35,6 +35,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
dataset = pd.read_csv(data_path).head(1000)
if dataset.shape[0] < 12 * prediction_length:
return None
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
training = ts_dataset.ts_dataset
......
......@@ -30,12 +30,23 @@ class Dataset(object):
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.add_obligatory_columns(dataset)
self.dataset = self.convert_formats(dataset)
# self.dataset.columns[2] = "AvgResponseTime"
self.dataset = self.cut_nan_start(dataset)
self.fill_na()
self.dataset = self.add_obligatory_columns(self.dataset)
self.dataset = self.convert_formats(self.dataset)
self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
def cut_nan_start(self, dataset):
first_not_nan_index = dataset[self.target_column][
dataset[self.target_column] != "None"
].index[0]
return dataset[dataset.index > first_not_nan_index]
def fill_na(self):
self.dataset = self.dataset.replace("None", np.nan)
self.dataset = self.dataset.ffill(axis="rows")
def convert_formats(self, dataset):
if not self.classification:
dataset[self.target_column] = dataset[self.target_column].astype(float)
......
......@@ -9,7 +9,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY . ./
COPY deployment/tft ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
......
......@@ -9,6 +9,4 @@ INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
INFLUXDB_DBNAME=morphemic
\ No newline at end of file
......@@ -95,12 +95,14 @@ def main():
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# print("START")
# msg1 = Msg()
# msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 10000}, {"metric": "response_time", "level": 3, "publish_rate": 10000}, {"metric": "latency", "level": 3, "publish_rate": 10000}, {"metric": "memory", "level": 3, "publish_rate": 10000}]'
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}, {"metric": "avgResponseTime", "level": 3, "publish_rate": 10000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["cpu_usage", "response_time", "latency", "memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 200}, "number_of_forward_predictions": 8,"prediction_horizon": 120'
# + f'"metrics": ["memory", "avgResponseTime"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 50'
# + "}"
# )
......
......@@ -19,6 +19,6 @@ model:
hidden_continuous_size: 16
output_size: 7
prediction:
bs: 64
bs: 32
save_path:
models
......@@ -10,7 +10,7 @@ from datetime import timedelta
from src.dataset_maker import CSVData
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 2 # minutes
RETRAIN_CYCLE = 10 # minutes
HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613))
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
......@@ -21,22 +21,22 @@ APP_NAME = os.environ.get("APP_NAME", "demo")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER, debug=True
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
while True:
start_time = int(time.time())
logging.debug("TRAINING")
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
if retrain_msg:
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
else:
print("Not enough data for model training, waiting ...")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
end_time = int(time.time())
......
......@@ -33,14 +33,15 @@ def predict(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).head(1000)
dataset = pd.read_csv(data_path)
print(dataset)
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
dataset = dataset[
dataset = ts_dataset.dataset[
lambda x: x.series == x.series.max()
] # multiple or single series training?
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
......
......@@ -34,8 +34,10 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).head(1000)
print(dataset)
dataset = pd.read_csv(data_path).head(4000)
if dataset.shape[0] < 12 * prediction_length:
return None
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
......@@ -44,6 +46,13 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
"train", "val"
) # only train and val splits will be used
print(
ts_dataset.dataset,
prediction_length,
"PR LEN",
"ENC LEN",
prediction_length * 10,
)
bs = params["training"]["bs"]
train_dataloader = training.to_dataloader(
......
import pandas as pd
import numpy as np
from pytorch_forecasting import TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
......@@ -30,12 +31,23 @@ class Dataset(object):
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.add_obligatory_columns(dataset)
self.dataset = self.convert_formats(dataset)
# self.dataset.columns[2] = "AvgResponseTime"
self.dataset = self.cut_nan_start(dataset)
self.fill_na()
self.dataset = self.add_obligatory_columns(self.dataset)
self.dataset = self.convert_formats(self.dataset)
self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
def cut_nan_start(self, dataset):
first_not_nan_index = dataset[self.target_column][
dataset[self.target_column] != "None"
].index[0]
return dataset[dataset.index > first_not_nan_index]
def fill_na(self):
self.dataset = self.dataset.replace("None", np.nan)
self.dataset = self.dataset.ffill(axis="rows")
def convert_formats(self, dataset):
if not self.classification:
dataset[self.target_column] = dataset[self.target_column].astype(float)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment