Commit 994e528a authored by Anna Warno's avatar Anna Warno
Browse files

arima corrected

parent 0a697b69
,awarno,bulls-ThinkPad-T480,28.09.2021 09:11,file:///home/awarno/.config/libreoffice/4;
\ No newline at end of file
......@@ -108,11 +108,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "value", "level": 3, "publish_rate": 30000}]'
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "metric_0", "level": 3, "publish_rate": 30000}, {"metric": "metric_1", "level": 3, "publish_rate": 30000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["value"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + "}"
# )
......
......@@ -122,7 +122,7 @@ def main():
for metric in predicted_metrics:
prediction_msgs = predict(
metric,
prediction_length,
prediction_lengths[metric],
prediction_hor=prediction_horizon,
timestamp=time_0,
)
......@@ -153,15 +153,9 @@ def main():
if __name__ == "__main__":
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'arima')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
)
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......@@ -173,15 +167,20 @@ if __name__ == "__main__":
time_0 = msg["epoch_start"]
prediction_horizon = msg["prediction_horizon"] * 1000
prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"]
prediction_points_horizons = {
metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"]
for metric in msg["all_metrics"]
}
print(prediction_points_horizons)
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
prediction_length = (
msg["prediction_horizon"]
prediction_lengths = {
metric["metric"]: msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
// metric["publish_rate"]
* msg["number_of_forward_predictions"]
)
for metric in msg["all_metrics"]
}
logging.info(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
......
def square(x):
return x * x
\ No newline at end of file
......@@ -44,30 +44,55 @@ def predict(
logging.info(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("Not enough fresh data, unable to predict TIME:")
print(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
dataset = pd.read_csv(data_path)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if ts_dataset.dataset.shape[0] < 1:
logging.info(
f"Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
ts_dataset = ts_dataset.dataset[
ts_dataset.dataset.series == ts_dataset.dataset.series.max()
]
pred_len = params["dataset"]["prediction_length"]
best_model_aic = None
for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0)]:
sarima = sm.tsa.statespace.SARIMAX(ts_dataset[target_column], order=order)
model = sarima.fit()
if best_model_aic:
if model.aic < best_model_aic:
for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0), (1, 0, 0), (4, 0, 2)]:
try:
sarima = sm.tsa.statespace.SARIMAX(
ts_dataset[target_column], order=order, enforce_stationarity=False
)
model = sarima.fit()
if best_model_aic:
if model.aic < best_model_aic:
predictions = model.get_forecast(
pred_len, return_conf_int=True, alpha=0.05
)
best_model_aic = model.aic
else:
predictions = model.get_forecast(
pred_len, return_conf_int=True, alpha=0.05
)
best_model_aic = model.aic
else:
predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05)
best_model_aic = model.aic
except np.linalg.LinAlgError:
logging.info(
f"SARIMAX model order: {order} failed for metric {target_column} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
if not best_model_aic:
logging.info(
f"All SARIMAX failed for metric: {target_column}, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
msgs = []
......
......@@ -34,13 +34,18 @@ class Dataset(object):
self.context_length = context_length
self.prediction_length = prediction_length
self.dataset = dataset
self.check_gap()
self.dropped_recent_series = True # default set to be true
if self.dataset.shape[0] > 0:
self.check_gap()
self.n = dataset.shape[0]
def cut_nan_start(self, dataset):
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
return dataset[dataset.index > first_not_nan_index]
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
else:
return dataset.dropna()
def fill_na(self, dataset):
dataset = dataset.replace("None", np.nan)
......@@ -68,43 +73,63 @@ class Dataset(object):
return dataset
def check_gap(self):
max_gap = self.dataset["time"].diff().abs().max()
logging.info(f"Max time gap in series {max_gap}")
print(f"Max time gap in series {max_gap}")
series_freq = self.dataset["time"].diff().value_counts().index.values[0]
logging.info(f"Detected series with {series_freq} frequency")
print(f"Detected series with {series_freq} frequency")
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * series_freq)
),
)
logging.info(f"{len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
self.series_freq = (
self.dataset["time"] // 1e9
).diff().value_counts().index.values[0] * 1e9
logging.info(
f"Metric: {self.target_column} Detected series with {self.series_freq} frequency"
)
print(
f"Metric: {self.target_column} Detected series with {self.series_freq} frequency"
)
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * self.series_freq)
),
)
logging.info(f"Metric: {self.target_column} {len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
logging.info(
f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
)
logging.info(f"{len(preprocessed_series)} long enough series found")
print(f"{len(preprocessed_series)} long enough series found")
# logging.info(f"")
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
self.dataset.index = range(self.dataset.shape[0])
logging.info(
f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found"
)
print(f"{len(preprocessed_series)} long enough series found")
if preprocessed_series:
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
else:
self.dataset = pd.DataFrame()
self.dataset.index = range(self.dataset.shape[0])
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment