From 53cf424f0a547bc0aeea75a58b0c52f1f2028f17 Mon Sep 17 00:00:00 2001
From: szymon sadkkowski <szysad108@gmail.com>
Date: Thu, 4 Mar 2021 20:29:01 +0100
Subject: [PATCH 01/10] added some interpolator test, more to come (see TODO),
 few interpolator bugfixes

---
 FCRgendata/src/FCRGenData/interpolator.py     |  23 +++-
 FCRgendata/tests/iterpolator/mock_data.py     |  17 ++-
 .../tests/iterpolator/test_interpolator.py    | 111 ++++++++++++++++--
 3 files changed, 136 insertions(+), 15 deletions(-)

diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py
index 2f9c285..15f71d5 100644
--- a/FCRgendata/src/FCRGenData/interpolator.py
+++ b/FCRgendata/src/FCRGenData/interpolator.py
@@ -2,10 +2,19 @@ import datetime as dt
 from typing import Dict, Generator
 from numbers import Number
 from scipy import interpolate
+import itertools
 
 from FCRGenData.rawDataReader import IRowDataProvider
 
 
+class InterpolatedDataStreamException(Exception):
+    pass
+
+
+class EmptyReaderExcpetion(InterpolatedDataStreamException):
+    pass
+
+
 class InterpolatedDataStream:
     '''
         generator that interpolates data given by
@@ -46,8 +55,12 @@ class InterpolatedDataStream:
                  timestamp_generator: Generator[dt.datetime, None, None],
                  datasource=None,
                  ):
-        self.ts_gen = timestamp_generator
-        self.t0 = next(iter(self.ts_gen))
+        
+        gen = iter(timestamp_generator)
+        self.t0 = next(gen)
+        self.ts_gen = itertools.chain([self.t0], gen)
+        #  trick so that we can look ahead for first elem of generator
+
         self.last_ts = self.t0
         self.ts_col_name = reader.timestamp_column_name
         ts_col_embed = reader.column_names.index(self.ts_col_name)
@@ -70,6 +83,9 @@ class InterpolatedDataStream:
                 intp_t = self.raw_data[cname]
                 intp_t[0].append(tdelta_embed)
                 intp_t[1].append(row[col_idx])
+        
+        if any(map(lambda v: len(v[0]) == len(v[1]) == 0, self.raw_data.values())):
+            raise EmptyReaderExcpetion()
 
         # interpolate aggregated data
         self.interpolants = dict()
@@ -77,11 +93,10 @@ class InterpolatedDataStream:
             self.interpolants[cname] = interpolate.interp1d(x=cdata[0], y=cdata[1], kind='linear')
 
     def __iter__(self):
-        self.ts_gen_iter = iter(self.ts_gen)
         return self
 
     def __next__(self) -> Dict:
-        ts = next(self.ts_gen_iter)
+        ts = next(self.ts_gen)
         if ts > self.last_ts:
             raise StopIteration
 
diff --git a/FCRgendata/tests/iterpolator/mock_data.py b/FCRgendata/tests/iterpolator/mock_data.py
index e61fee1..331cee5 100644
--- a/FCRgendata/tests/iterpolator/mock_data.py
+++ b/FCRgendata/tests/iterpolator/mock_data.py
@@ -1,5 +1,5 @@
 import datetime as dt
-from typing import TypedDict, List, Any, Iterator, Tuple
+from typing import TypedDict, List, Any, Iterator, Tuple, Callable
 
 
 class MockData(TypedDict):
@@ -35,3 +35,18 @@ def mock_data_factory(
         ts_col_name=ts_col_name,
         data=[row for row in zip(*col_gens)]
     )
+
+
+def lambda_generator(lamb: Callable[[int], Any], iters: int) -> Iterator[Any]:
+    '''
+        simple wrapper for generator.
+        if iters eq -1, generate sequnce is infinite
+    '''
+
+    i = 0
+    if iters == -1:
+        iters = float('inf')
+
+    while i < iters:
+        yield lamb(i)
+        i += 1
diff --git a/FCRgendata/tests/iterpolator/test_interpolator.py b/FCRgendata/tests/iterpolator/test_interpolator.py
index d19bfe7..c7b24ed 100644
--- a/FCRgendata/tests/iterpolator/test_interpolator.py
+++ b/FCRgendata/tests/iterpolator/test_interpolator.py
@@ -1,11 +1,12 @@
 import pytest
 import datetime as dt
+import random
 from typing import Dict, Any
 from numbers import Number
 
-from FCRGenData.interpolator import InterpolatedDataStream
+from FCRGenData.interpolator import InterpolatedDataStream, EmptyReaderExcpetion
 from .mock_row_provider import MockRowDataProvider
-from .mock_data import mock_data_factory
+from .mock_data import mock_data_factory, lambda_generator as lg
 
 
 def dict_eq_w_margin(d1: Dict[str, Any], d2: Dict[str, Any], margin: float) -> bool:
@@ -30,17 +31,25 @@ def dict_eq_w_margin(d1: Dict[str, Any], d2: Dict[str, Any], margin: float) -> b
 
 @pytest.mark.interpolation
 def test_same_vals_at_interpolation_nodes():
+    '''
+        checks if values at interpolation nodes
+        are still same (with small margin)
+    '''
+
     MAX_ERR = 1e-4
-    ROWS = 10
+    ITERS = 10
     t0 = dt.datetime(year=2000, month=1, day=1)
-    ts_gen = [t0 + dt.timedelta(minutes=m) for m in range(ROWS)]
-    timestamp_cname = 'ts'
 
     data = mock_data_factory(
-        header_names=[timestamp_cname, 'const1'],
-        ts_col_name=timestamp_cname,
-        col_types=[dt.datetime, int],
-        col_gens=[ts_gen, [1.] * ROWS]
+        header_names=['ts', 'rand1', 'rand2', 'rand3'],
+        ts_col_name='ts',
+        col_types=[dt.datetime, float, float, float],
+        col_gens=[
+            lg(lambda n: t0 + dt.timedelta(minutes=n), ITERS),
+            lg(lambda n: -random.random(), ITERS),
+            lg(lambda n: random.random() * 100, ITERS),
+            lg(lambda n: random.random() * 1000, ITERS),
+        ]
     )
 
     rowProvider = MockRowDataProvider(
@@ -49,9 +58,91 @@ def test_same_vals_at_interpolation_nodes():
     )
     interp = InterpolatedDataStream(
         reader=rowProvider,
-        timestamp_generator=ts_gen,
+        timestamp_generator=lg(lambda n: t0 + dt.timedelta(minutes=n), ITERS * 2),
     )
 
     rowProvGen = rowProvider.reader_annotated()
     for d1, d2 in zip(interp, rowProvGen):
         assert dict_eq_w_margin(d1, d2, MAX_ERR)
+
+
+@pytest.mark.interpolation
+def test_no_data_but_timestamps():
+    '''
+        checks if interpolator raises
+        correct exception when rowProvider returns
+        no rows
+    '''
+    data = mock_data_factory(
+        header_names=['ts', 'v1', 'v2'],
+        ts_col_name='ts',
+        col_types=[dt.datetime, float],
+        col_gens=[range(0), range(0)]
+    )
+
+    rowProv = MockRowDataProvider(
+        data=data,
+        max_time_difference=dt.timedelta(seconds=1)
+    )
+
+    t0 = dt.datetime(year=2000, month=1, day=1)
+    def ts_gen():
+        i = 0
+        while True:
+            yield t0 + dt.timedelta(seconds=i)
+    try:
+        interp = InterpolatedDataStream(reader=rowProv, timestamp_generator=ts_gen())
+    except EmptyReaderExcpetion:
+        assert True
+    else:
+        assert False
+        
+
+@pytest.mark.interpolation
+def test_only_none_data():
+    '''
+        checks if interpolator raises
+        correct exception when rowProvider returns
+        only None values and correct timestamps
+    '''
+    ITERS = 29
+    t0 = dt.datetime(year=2000, month=1, day=1)
+    data = mock_data_factory(
+        header_names=['time stamp', 'i1', 'c2'],
+        ts_col_name='time stamp',
+        col_types=[dt.datetime, int, complex],
+        col_gens=[
+            lg(lambda n: t0 + dt.timedelta(hours=n), ITERS),
+            lg(lambda n: None, ITERS),
+            lg(lambda n: None, ITERS)
+        ]
+    )
+
+    t0 = dt.datetime(year=2000, month=1, day=1)
+    rowProv = MockRowDataProvider(data, dt.timedelta(seconds=1))
+    try:
+        interp = InterpolatedDataStream(
+            reader=rowProv,
+            timestamp_generator=lg(lambda n: t0 + dt.timedelta(days=2*n), -1)
+        )
+    except EmptyReaderExcpetion as e:
+        assert True
+    else:
+        assert False
+
+
+'''
+    TODO:
+        test if exception is raised if some cols contain full data
+        and some cols contain only None data
+
+        test what happends when some col contain so little data
+        at boundaries that extrapolation is needed
+
+        test if exception is raised if client tries to get thata
+        from out of bounds of interpolation (and extrapolation)
+
+        test if data taken not from sample node is somewhat ok
+
+        test what happens on larger dataset > 5000 sample nodes
+'''
-- 
GitLab


From f72476663ab1554080c4a0a36d6ef472edca3b6e Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Thu, 4 Mar 2021 21:10:05 +0100
Subject: [PATCH 02/10] Data aggregator integration with rest of the code

---
 FCRgendata/src/FCRGenData/__main__.py         | 89 ++++++++-----------
 FCRgendata/src/FCRGenData/config_schemas.py   |  6 +-
 FCRgendata/src/FCRGenData/data_aggregator.py  | 41 +++++++--
 FCRgendata/src/FCRGenData/interpolator.py     |  7 +-
 .../FCRGenData/rawDataReader/csv_reader.py    | 10 ++-
 .../rawDataReader/row_data_reader.py          |  2 +
 FCRgendata/src/FCRGenData/solv_summoner.py    | 21 +++--
 .../tests/data_reader/data_reader_template.py |  3 +-
 .../tests/data_reader/test_csv_reader.py      |  7 +-
 9 files changed, 102 insertions(+), 84 deletions(-)

diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py
index a81e509..6c49b6f 100644
--- a/FCRgendata/src/FCRGenData/__main__.py
+++ b/FCRgendata/src/FCRGenData/__main__.py
@@ -1,81 +1,62 @@
 """ Generates data using CP-solver and FCR time series for network training """
-
+import argparse
 import csv
 import logging
-import sys
 from contextlib import ExitStack
 from pathlib import Path
 
-import numpy as np
-import numpy.lib.recfunctions as rfn
-from progress.bar import IncrementalBar as IBar
-
+from FCRGenData.data_aggregator import DataAggregator
 from .solv_summoner import CPSolverSolutionSummoner, CPSolverSolutionSummonerError
-from .validate_config import validate_config
 
 logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
-CONFIG_PATH: Path = None
 
 
-def run():
-    if len(sys.argv) != 2:
-        logging.critical(
-            f"received {len(sys.argv) - 1} arguments, required one: config file path.")
-        sys.exit(1)
+def _json_path(path_str: str) -> Path:
+    if Path(path_str).is_file() and path_str.endswith('.json'):
+        return Path(path_str)
     else:
-        CONFIG_PATH = Path(sys.argv[1])
-
-    j_conf = validate_config(CONFIG_PATH)
-    avg_rsp_times = np.genfromtxt(  # shape (n, )
-        j_conf['AvgResponseTimeTableFilePath'],
-        delimiter=',',
-        dtype=np.dtype([('avg_rsp_time', np.float64), ('datetime', 'datetime64[s]')]),
-        skip_header=1,
-        usecols=(6, 1)
-    )
-
-    avg_rsp_prediction = np.genfromtxt(  # shape (n, )
-        j_conf['predictionsFilePath'],
-        delimiter=',',
-        dtype=np.dtype([('avg_rsp_time_pred', np.float64), ('datetime', 'datetime64[s]'), ('split', np.int)]),
-        skip_header=1,
-        usecols=(1, 3, 4),
-        converters={4: lambda split: {b"train": 0, b"val": 1, b"test": -1}[split]}
-    )
-
-    joined = rfn.join_by('datetime', avg_rsp_times, avg_rsp_prediction, jointype='inner', usemask=False)
-    assert len(joined) == len(avg_rsp_prediction)
-    #  joined is structured array of tuples with dtype
-    #  [('datetime', '<M8[s]'), ('avg_rsp_time', '<f8'), ('avg_rsp_time_pred', '<f8'), ('split', '<i8')]
-
-    solver_host = j_conf['cpSolverHost']
-    csv_header = ['timestamp', 'AvgResponseTime', 'AvgResponseTimePrediction', 'split',
-                  'cardinality_Component_LB', 'provider_Component_LB', 'AppCardinality',
-                  'cardinality_Component_DB', 'provider_Component_App', 'provider_Component_DB'
-                  ]
+        raise argparse.ArgumentTypeError(f"{path_str} is not a valid path")
+
+
+def run():
+    # Argument parsing
+    parser = argparse.ArgumentParser(description="Very not advanced data aggregator with interpolation")
+
+    parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file")
+    parser.add_argument('gendata_config', action='store', type=_json_path,
+                        description="Path to cpsolver json config file")
+
+    args = parser.parse_args()
+
+    cpsolver_config_path = args.cp_config
+    gendata_config = args.gendata_config
+
+    # Create aggregator
+    cpsolver = CPSolverSolutionSummoner(cpsolver_config_path)
+
+    aggregator = DataAggregator(json_path=gendata_config)
 
     with ExitStack() as stack:  # substiture for multiline with statement
-        summoner = stack.enter_context(CPSolverSolutionSummoner(j_conf))
-        csvfile = stack.enter_context(open(j_conf['outpath'], 'w', newline=''))
-        bar = IBar('Progress', max=len(joined))
+        summoner = stack.enter_context(cpsolver)
+        csvfile = stack.enter_context(open(aggregator.outpath, 'w', newline=''))
 
         writer = csv.writer(csvfile, delimiter=',')
-        writer.writerow(csv_header)
+        writer.writerow(aggregator.column_names)
 
-        for i, row in enumerate(joined):
+        it = 0
+        cpsolver_column_index = aggregator.column_names.index(aggregator.avg_response_time_colname)
+        for row in aggregator.row_generator():
+            it += 1
+            print(f'Row {it}')
             try:
-                solution = summoner.get_solution(row[1])
+                solution = summoner.get_solution(row[cpsolver_column_index])
                 writer.writerow((*row, *solution))
             except CPSolverSolutionSummonerError as err:
-                bar.finish()
                 logging.error(
                     "CPSolverSolutionSummoner raised exception while "
-                    f"summoning cp-solver near line {i} "
+                    f"summoning cp-solver near line {it} "
                     f"exception:\n{err}"
                 )
-                bar = IBar('Progress', max=len(joined) - i)
-            bar.next()
-    bar.finish()
 
 
 if __name__ == "__main__":
diff --git a/FCRgendata/src/FCRGenData/config_schemas.py b/FCRgendata/src/FCRGenData/config_schemas.py
index 9dfcdd5..a11ddf8 100644
--- a/FCRgendata/src/FCRGenData/config_schemas.py
+++ b/FCRgendata/src/FCRGenData/config_schemas.py
@@ -77,7 +77,9 @@ DATA_CONFIG_SCHEMA = {
         "timedelta":
             {"type": "int"},
         "max_time_gap":
-            {"type": "int"}
+            {"type": "int"},
+        "avg_response_time_colname": # temporary, input for cpsolver
+            {"type": "string"}
     },
-    "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap"]
+    "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "avg_response_time_colname"]
 }
diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py
index 784f46d..0f8618e 100644
--- a/FCRgendata/src/FCRGenData/data_aggregator.py
+++ b/FCRgendata/src/FCRGenData/data_aggregator.py
@@ -1,7 +1,8 @@
 import datetime as dt
+import functools
 import logging
 from pathlib import Path
-from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator
+from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator, Iterable
 
 from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.interpolator import InterpolatedDataStream
@@ -16,7 +17,7 @@ def _timestamp_generator(t0: dt.datetime, tstep: dt.timedelta) -> Generator[dt.d
         t += tstep
 
 
-class AggregatedData:
+class DataAggregator:
     """Aggregates data from multiple sources and uneven time periods
 
     Args:
@@ -42,7 +43,7 @@ class AggregatedData:
 
         # Roottime data source
         roottime_data = None
-
+        max_time_gap = dt.timedelta(seconds=self.__conf['max_time_gap'])
         # Parse data sources
         for datasource in self.__conf["datasources"]:
             desc = datasource['desc']
@@ -53,7 +54,8 @@ class AggregatedData:
 
             if source['type'] == 'csv':
                 csv_path = Path(source['path'])
-                reader = RowCSVReader(csv_path, timestamp_column_name=timestamp_column)
+                reader = RowCSVReader(csv_path, timestamp_column_name=timestamp_column,
+                                      max_time_difference=max_time_gap)
                 self.__datasources.append((datasource, reader))
             else:
                 raise NotImplementedError("Unsupported source type")
@@ -79,6 +81,32 @@ class AggregatedData:
         self.__roottime_reader = roottime_data[1]
         self.__roottime_datasource = roottime_data[0]
 
+    @functools.cached_property
+    def avg_response_time_colname(self) -> str: # todo export
+        return self.__conf['avg_response_time_colname']
+
+    @functools.cached_property
+    def outpath(self) -> Path:
+        return Path(self.__conf['outpath'])
+
+    @functools.cached_property
+    def column_names(self) -> List[str]:
+        """Names of the variable columns"""
+        header = []
+        for value_name in self.__conf["aggregated_columns"]:
+            for datasource, rowdict in self.__datasources:
+                for value_info in datasource["values"]:
+                    # if alias present
+                    if 'alias' in value_info:
+                        if value_info['alias'] == value_name:
+                            header.append(value_info['colname'])
+                            break
+                    else:
+                        if value_info['colname'] == value_name:
+                            header.append(value_info['colname'])
+                            break
+        return header
+
     def row_generator(self) -> Generator[Mapping[str, any], None, None]:
         """Generates values mapping with correct and even time periods.
         Combines all values from different sources and unify their timestamps.
@@ -91,8 +119,9 @@ class AggregatedData:
         for datasource, reader in self.__datasources:
             interpolated_streams.append(
                 (datasource,
-                 InterpolatedDataStream(datasource, reader, _timestamp_generator(self.__t0, self.__delta))), self.__max_time_gap)
-
+                 InterpolatedDataStream(datasource=datasource,
+                                        reader=reader,
+                                        timestamp_generator=_timestamp_generator(self.__t0, self.__delta))))
         while True:
             grouped_values = []
             timestamp = None
diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py
index 15f71d5..fbe06aa 100644
--- a/FCRgendata/src/FCRGenData/interpolator.py
+++ b/FCRgendata/src/FCRGenData/interpolator.py
@@ -47,8 +47,9 @@ class InterpolatedDataStream:
             performance issues.
     '''
 
-    def __ts_embedding(self, ts: dt.datetime, ts0: dt.datetime) -> int:
-        return (ts - ts0).seconds
+    @staticmethod
+    def __ts_embedding(t1: dt.datetime, t2: dt.datetime) -> int:
+        return (t1 - t2).seconds
 
     def __init__(self,
                  reader: IRowDataProvider,
@@ -69,7 +70,7 @@ class InterpolatedDataStream:
 
         # col name -> (timestamp embedding, given value)
         self.raw_data = {cname: ([], []) for cname in self.intp_col_embed}
-        row_generator = reader.reader()
+        row_generator = reader.reader_annotated()
 
         # populate self.inp_data
         for row in row_generator:
diff --git a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py
index 66d526e..9bf3591 100644
--- a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py
+++ b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py
@@ -29,7 +29,7 @@ class RowCSVReader(IRowDataProvider):
     __column_names: Tuple[str]
     __column_types: Dict[str, type]
 
-    __timestamp_column_name: str
+    _timestamp_column_name: str
 
     def __init__(self,
                  path: Union[Path, str],
@@ -130,11 +130,13 @@ class RowCSVReader(IRowDataProvider):
         Raises:
             TooBigTimeDifference
         """
+        time_index = list(self.columns).index(self.timestamp_column_name)
+
         for index, row in self.__arr.iterrows():
             values = row.values.tolist()
-            values = map(RowCSVReader._convert_to_pytype, values)
-            self._check_time_difference(getattr(values, self.__timestamp_column_name))
-            yield list(values)
+            values = list(map(RowCSVReader._convert_to_pytype, values))
+            self._check_time_difference(values[time_index])
+            yield values
 
     def reader_annotated(self) -> Generator[Dict[str, any], None, None]:
         """Returns dict iterator over rows.
diff --git a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
index ff7d436..58686e2 100644
--- a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
+++ b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
@@ -38,6 +38,8 @@ class IRowDataProvider(abc.ABC):
             if self.__previous_timestamp + self.__max_time_difference < timestamp:
                 raise TooBigTimeDifference(
                     f'Previous timestamp: {self.__previous_timestamp}, actual timestamp: {timestamp}, max difference: {self.__max_time_difference}')
+            else:
+                self.__previous_timestamp = timestamp
 
     @property
     @abc.abstractmethod
diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py
index 0378848..c351288 100644
--- a/FCRgendata/src/FCRGenData/solv_summoner.py
+++ b/FCRgendata/src/FCRGenData/solv_summoner.py
@@ -1,26 +1,31 @@
 from pathlib import Path
+
 from lxml import etree
-from tempfile import NamedTemporaryFile
+import json
+import os
 from contextlib import ContextDecorator
-from typing import List
 from functools import lru_cache
-import logging
+from tempfile import NamedTemporaryFile
+from typing import List
+
 import requests
-import json
-import os
+from lxml import etree
 
+from FCRGenData.config_schemas import CPSOLVER_CONFIG_SCHEMA
+from FCRGenData.validate_config import validate_config
 
-CACHE_SIZE = 2**10
+CACHE_SIZE = 2 ** 10
 
 
 class CPSolverSolutionSummonerError(Exception):
     pass
 
 
-class CPSolverSolutionSummoner(ContextDecorator):
+class CPSolverSolutionSummoner:
     """ calls cp-solver for solution, and manages additional resources """
 
-    def __init__(self, config):
+    def __init__(self, cpsolver_config_path: Path):
+        config = validate_config(cpsolver_config_path, CPSOLVER_CONFIG_SCHEMA)
         self.solver_host = config['cpSolverHost']
         self.solution_file = f"{config['request']['camelModelFilePath'][:-4]}-solution.xmi"
         self.FCRcpxml = etree.parse(config['request']['cpProblemFilePath'])
diff --git a/FCRgendata/tests/data_reader/data_reader_template.py b/FCRgendata/tests/data_reader/data_reader_template.py
index e984732..ce0ae2f 100644
--- a/FCRgendata/tests/data_reader/data_reader_template.py
+++ b/FCRgendata/tests/data_reader/data_reader_template.py
@@ -102,8 +102,9 @@ class RawDataProviderTestTemplate(abc.ABC):
     def test_read(self, reader_factory):
         header = ['time', 'int', 'str', 'float', 'bool']
         content = []
+        t0 = dt.datetime.now()
         for i in range(5):
-            row = [dt.datetime.now() + dt.timedelta(days=i), i, str(i) + 'pln', i / 10, i % 2 == 0]
+            row = [t0 + dt.timedelta(days=i), i, str(i) + 'pln', i / 10, i % 2 == 0]
             assert len(row) == len(header)
             content.append(row)
 
diff --git a/FCRgendata/tests/data_reader/test_csv_reader.py b/FCRgendata/tests/data_reader/test_csv_reader.py
index a0f6ed6..8c92650 100644
--- a/FCRgendata/tests/data_reader/test_csv_reader.py
+++ b/FCRgendata/tests/data_reader/test_csv_reader.py
@@ -2,6 +2,7 @@ import tempfile
 from typing import Callable
 
 import pytest
+import datetime as dt
 
 from FCRGenData.rawDataReader import RowCSVReader
 from .data_reader_template import RawDataProviderTestTemplate
@@ -9,12 +10,6 @@ from .data_reader_template import RawDataProviderTestTemplate
 
 class TestCSVReader(RawDataProviderTestTemplate):
 
-    @pytest.fixture
-    def empty_reader(self) -> RowCSVReader:
-        tfile = tempfile.NamedTemporaryFile(suffix='.csv')
-        yield RowCSVReader(tfile.name)
-        tfile.close()
-
     @pytest.fixture
     def reader_factory(self) -> Callable[[str], RowCSVReader]:
         tfile = tempfile.NamedTemporaryFile(suffix='.csv', mode='w')
-- 
GitLab


From fa7d14259656a636342e2cea976665da18de689d Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Fri, 5 Mar 2021 17:25:27 +0100
Subject: [PATCH 03/10] Missing functions definition, integration with
 aggregator

---
 FCRgendata/src/FCRGenData/__main__.py         |  7 ++-
 FCRgendata/src/FCRGenData/config_schemas.py   |  7 ++-
 FCRgendata/src/FCRGenData/data_aggregator.py  | 45 +++++++++----------
 FCRgendata/src/FCRGenData/interpolator.py     |  8 ++--
 .../FCRGenData/rawDataReader/csv_reader.py    |  6 +--
 .../rawDataReader/row_data_reader.py          |  4 +-
 FCRgendata/src/FCRGenData/solv_summoner.py    |  7 +--
 FCRgendata/src/FCRGenData/validate_config.py  |  8 ++--
 .../tests/data_reader/data_reader_template.py |  4 +-
 .../tests/iterpolator/mock_row_provider.py    |  4 +-
 .../tests/iterpolator/test_interpolator.py    |  2 +-
 11 files changed, 51 insertions(+), 51 deletions(-)

diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py
index 6c49b6f..4db0c2a 100644
--- a/FCRgendata/src/FCRGenData/__main__.py
+++ b/FCRgendata/src/FCRGenData/__main__.py
@@ -44,12 +44,15 @@ def run():
         writer.writerow(aggregator.column_names)
 
         it = 0
-        cpsolver_column_index = aggregator.column_names.index(aggregator.avg_response_time_colname)
+        solver_cols_indexes = [aggregator.column_names.index(colname) for colname in aggregator.solver_column_names]
+
+        # We can also get annotated rows (what should be easier?)
         for row in aggregator.row_generator():
             it += 1
             print(f'Row {it}')
             try:
-                solution = summoner.get_solution(row[cpsolver_column_index])
+                solver_params = map(lambda index: row[index], solver_cols_indexes)
+                solution = summoner.get_solution(solver_params)
                 writer.writerow((*row, *solution))
             except CPSolverSolutionSummonerError as err:
                 logging.error(
diff --git a/FCRgendata/src/FCRGenData/config_schemas.py b/FCRgendata/src/FCRGenData/config_schemas.py
index a11ddf8..31c29eb 100644
--- a/FCRgendata/src/FCRGenData/config_schemas.py
+++ b/FCRgendata/src/FCRGenData/config_schemas.py
@@ -78,8 +78,11 @@ DATA_CONFIG_SCHEMA = {
             {"type": "int"},
         "max_time_gap":
             {"type": "int"},
-        "avg_response_time_colname": # temporary, input for cpsolver
-            {"type": "string"}
+        "solver_cols":
+            {"type": "array",
+             "minItems": 1,
+             "items": {"type": "string"}
+             }
     },
     "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "avg_response_time_colname"]
 }
diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py
index 0f8618e..b56e7ff 100644
--- a/FCRgendata/src/FCRGenData/data_aggregator.py
+++ b/FCRgendata/src/FCRGenData/data_aggregator.py
@@ -1,8 +1,7 @@
 import datetime as dt
-import functools
 import logging
 from pathlib import Path
-from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator, Iterable
+from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator
 
 from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.interpolator import InterpolatedDataStream
@@ -81,33 +80,33 @@ class DataAggregator:
         self.__roottime_reader = roottime_data[1]
         self.__roottime_datasource = roottime_data[0]
 
-    @functools.cached_property
-    def avg_response_time_colname(self) -> str: # todo export
-        return self.__conf['avg_response_time_colname']
-
-    @functools.cached_property
+    @property
     def outpath(self) -> Path:
         return Path(self.__conf['outpath'])
 
-    @functools.cached_property
+    @property
+    def solver_column_names(self) -> List[str]:
+        """List of the column names reuired by the cpsolver"""
+        return self.__conf['solver_cols']
+
+    @property
     def column_names(self) -> List[str]:
         """Names of the variable columns"""
-        header = []
-        for value_name in self.__conf["aggregated_columns"]:
-            for datasource, rowdict in self.__datasources:
-                for value_info in datasource["values"]:
-                    # if alias present
-                    if 'alias' in value_info:
-                        if value_info['alias'] == value_name:
-                            header.append(value_info['colname'])
-                            break
-                    else:
-                        if value_info['colname'] == value_name:
-                            header.append(value_info['colname'])
-                            break
-        return header
+        return self.__conf['aggregated_columns']
+
+    def row_generator(self) -> Generator[List[type], None, None]:
+        """Generator over raw data, provides rows of data in increasing order (by timestamp).
+        Returns dict with column names mapping to current values.
+        """
+        for values_dict in self.row_generator_annotated():
+            values_list = []
+
+            for value_name in self.__conf["aggregated_columns"]:
+                values_list.append(values_dict[value_name])
+
+            yield values_list
 
-    def row_generator(self) -> Generator[Mapping[str, any], None, None]:
+    def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]:
         """Generates values mapping with correct and even time periods.
         Combines all values from different sources and unify their timestamps.
 
diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py
index fbe06aa..e06d445 100644
--- a/FCRgendata/src/FCRGenData/interpolator.py
+++ b/FCRgendata/src/FCRGenData/interpolator.py
@@ -18,15 +18,15 @@ class EmptyReaderExcpetion(InterpolatedDataStreamException):
 class InterpolatedDataStream:
     '''
         generator that interpolates data given by
-        reader in points given by timestamp_generator
+        row_generator in points given by timestamp_generator
         with additional config from datasource.
 
         Interpolator returns dict which includes:
-            1. Mapping column names given by reader
+            1. Mapping column names given by row_generator
                 (only numerical types) to its
                 interpolated values in points returned
                 by timestamp_generator.
-            2. Mapping from reader column name that
+            2. Mapping from row_generator column name that
                 contains data timestamp to timestamp
                 given by timestamp_generator
 
@@ -70,7 +70,7 @@ class InterpolatedDataStream:
 
         # col name -> (timestamp embedding, given value)
         self.raw_data = {cname: ([], []) for cname in self.intp_col_embed}
-        row_generator = reader.reader_annotated()
+        row_generator = reader.row_generator_annotated()
 
         # populate self.inp_data
         for row in row_generator:
diff --git a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py
index 9bf3591..4b79499 100644
--- a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py
+++ b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py
@@ -17,7 +17,7 @@ def _match_columns(column: List[str], keys: List[str]) -> List[str]:
 
 
 class RowCSVReader(IRowDataProvider):
-    """CSV data reader implementation."""
+    """CSV data row_generator implementation."""
 
     __path: Path
 
@@ -121,7 +121,7 @@ class RowCSVReader(IRowDataProvider):
         else:
             return value
 
-    def reader(self) -> Generator[Iterable[type], None, None]:
+    def row_generator(self) -> Generator[Iterable[type], None, None]:
         """Returns iterator over rows.
 
         Yields:
@@ -138,7 +138,7 @@ class RowCSVReader(IRowDataProvider):
             self._check_time_difference(values[time_index])
             yield values
 
-    def reader_annotated(self) -> Generator[Dict[str, any], None, None]:
+    def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]:
         """Returns dict iterator over rows.
 
         Yields:
diff --git a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
index 58686e2..01f7aed 100644
--- a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
+++ b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
@@ -65,14 +65,14 @@ class IRowDataProvider(abc.ABC):
         pass
 
     @abc.abstractmethod
-    def reader(self) -> Generator[Iterable[type], None, None]:
+    def row_generator(self) -> Generator[Iterable[type], None, None]:
         """Generator over raw data, provides rows of data in increasing order (by timestamp)
         Returns list with values (order same as in column_names).
         """
         pass
 
     @abc.abstractmethod
-    def reader_annotated(self) -> Generator[Dict[str, any], None, None]:
+    def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]:
         """Generator over raw data, provides rows of data in increasing order (by timestamp).
         Returns dict with column names mapping to current values.
         """
diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py
index c351288..ba01b90 100644
--- a/FCRgendata/src/FCRGenData/solv_summoner.py
+++ b/FCRgendata/src/FCRGenData/solv_summoner.py
@@ -1,10 +1,7 @@
-from pathlib import Path
-
-from lxml import etree
 import json
 import os
-from contextlib import ContextDecorator
 from functools import lru_cache
+from pathlib import Path
 from tempfile import NamedTemporaryFile
 from typing import List
 
@@ -45,7 +42,7 @@ class CPSolverSolutionSummoner:
         return False
 
     @lru_cache(maxsize=CACHE_SIZE)
-    def get_solution(self, avg_rsp_time: float) -> List[int]:
+    def get_solution(self, avg_rsp_time: float) -> List[int]:  # TODO fix
         """ returns solution as list of int """
 
         self.avg_rsp_time_xelem.set('value', str(avg_rsp_time))
diff --git a/FCRgendata/src/FCRGenData/validate_config.py b/FCRgendata/src/FCRGenData/validate_config.py
index f7b5a6b..5448a68 100644
--- a/FCRgendata/src/FCRGenData/validate_config.py
+++ b/FCRgendata/src/FCRGenData/validate_config.py
@@ -4,15 +4,13 @@ Defines validate_config function responsible for validating path to config file,
 it's structure and content
 """
 
-from pathlib import Path
-import logging
 import json
+import logging
+import sys
+from pathlib import Path
 from typing import Dict
 
 import jsonschema
-import sys
-
-
 
 # create two custom types: "filepath" (path to exsisting file)
 # and "creatablepath" (path in which new file can be created)
diff --git a/FCRgendata/tests/data_reader/data_reader_template.py b/FCRgendata/tests/data_reader/data_reader_template.py
index ce0ae2f..ab4c3ee 100644
--- a/FCRgendata/tests/data_reader/data_reader_template.py
+++ b/FCRgendata/tests/data_reader/data_reader_template.py
@@ -115,11 +115,11 @@ class RawDataProviderTestTemplate(abc.ABC):
 
         reader: IRowDataProvider = reader_factory(s.read(), timestamp_column_name='time', max_time_difference=dt.timedelta(days=1))
 
-        row_gen = reader.reader()
+        row_gen = reader.row_generator()
         for row, grow in zip(content, row_gen):
             assert row == grow
 
-        dict_gen = reader.reader_annotated()
+        dict_gen = reader.row_generator_annotated()
         for row, row_dict in zip(content, dict_gen):
             for col_name, value in zip(header, row):
                 assert row_dict[col_name] == value
diff --git a/FCRgendata/tests/iterpolator/mock_row_provider.py b/FCRgendata/tests/iterpolator/mock_row_provider.py
index 0f694bd..f48afdc 100644
--- a/FCRgendata/tests/iterpolator/mock_row_provider.py
+++ b/FCRgendata/tests/iterpolator/mock_row_provider.py
@@ -37,8 +37,8 @@ class MockRowDataProvider(IRowDataProvider):
     def columns(self) -> Dict[str, type]:
         return {cname: ctype for cname, ctype in zip(self.headers, self.types)}
 
-    def reader_annotated(self) -> Generator[Dict[str, any], None, None]:
+    def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]:
         return map(lambda row: dict(zip(self.headers, row)), self.data)
 
-    def reader(self) -> Generator[List[type], None, None]:
+    def row_generator(self) -> Generator[List[type], None, None]:
         yield from self.data
diff --git a/FCRgendata/tests/iterpolator/test_interpolator.py b/FCRgendata/tests/iterpolator/test_interpolator.py
index c7b24ed..84e9f85 100644
--- a/FCRgendata/tests/iterpolator/test_interpolator.py
+++ b/FCRgendata/tests/iterpolator/test_interpolator.py
@@ -61,7 +61,7 @@ def test_same_vals_at_interpolation_nodes():
         timestamp_generator=lg(lambda n: t0 + dt.timedelta(minutes=n), ITERS * 2),
     )
 
-    rowProvGen = rowProvider.reader_annotated()
+    rowProvGen = rowProvider.row_generator_annotated()
     for d1, d2 in zip(interp, rowProvGen):
         assert dict_eq_w_margin(d1, d2, MAX_ERR)
 
-- 
GitLab


From 4047411e9758a50995da008d2cd0bfff633b4c2d Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Sat, 6 Mar 2021 19:22:28 +0100
Subject: [PATCH 04/10] Added support for multiple params in solver summoner +
 docker support

---
 FCRgendata/src/FCRGenData/solv_summoner.py | 187 ++++++++++++++++++---
 1 file changed, 165 insertions(+), 22 deletions(-)

diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py
index ba01b90..910c7ee 100644
--- a/FCRgendata/src/FCRGenData/solv_summoner.py
+++ b/FCRgendata/src/FCRGenData/solv_summoner.py
@@ -1,11 +1,17 @@
+import abc
 import json
-import os
+import tarfile
+import time
 from functools import lru_cache
+from io import BytesIO
 from pathlib import Path
 from tempfile import NamedTemporaryFile
-from typing import List
+from typing import List, Optional, Dict
+from xml.etree.ElementTree import ElementTree, Element
 
+import docker
 import requests
+from docker.models.containers import Container
 from lxml import etree
 
 from FCRGenData.config_schemas import CPSOLVER_CONFIG_SCHEMA
@@ -14,49 +20,186 @@ from FCRGenData.validate_config import validate_config
 CACHE_SIZE = 2 ** 10
 
 
+def create_archive(filepath: Path) -> BytesIO:
+    """Packs content of the file to the tar archive.
+
+    Args:
+        filepath(Path): path to the file.
+
+    Returns:
+        (BytesIO) stream of the tarballed file.
+    """
+    pw_tarstream = BytesIO()
+    pw_tar = tarfile.TarFile(fileobj=pw_tarstream, mode='w')
+    with open(filepath, 'rb') as file:
+        file_data = file.read()
+        tarinfo = tarfile.TarInfo(name=filepath.name)
+        tarinfo.size = len(file_data)
+        tarinfo.mtime = time.time()
+        pw_tar.addfile(tarinfo, BytesIO(file_data))
+        pw_tar.close()
+        pw_tarstream.seek(0)
+        return pw_tarstream
+
+
+def docker_put(container: Container, src: Path, dst: Path) -> None:
+    """Copies specified file to the container.
+
+    Args:
+        container(Container): destination container.
+        src(Path): path to the source file.
+        dst(Path): destination path in the container.
+    """
+    with create_archive(src) as archive:
+        container.put_archive(str(dst), archive)
+
+
+def docker_get(container, src, dst) -> None:
+    """Copies specified file from container.
+
+    Args:
+        container(Container): source container.
+        src(Path): path to the source file inside the container.
+        dst(Path): destination path where to store file.
+    """
+    with open(dst, 'wb') as file:
+        file.write(container.get_archive(str(src)))
+
+
 class CPSolverSolutionSummonerError(Exception):
     pass
 
 
-class CPSolverSolutionSummoner:
+class CPSolverSolutionSummoner(abc.ABC):
+    _config: Dict
+
+    _solver_host: str
+    _camelModelFilePath: Path
+    _solutionFilePath: Path
+    _FCRcpxml_path: Path
+    _FCRcpxml: ElementTree  # TODO    VERIFY
+    _nodeCandidatesFilePath: Path
+
+    _xml_fillings: Dict[str, Element]  # TODO check
+    _request_part: Dict
+
+    def __init__(self,
+                 cpsolver_config_path: Path,
+                 args_names: List[str]):
+        self._config = validate_config(cpsolver_config_path, CPSOLVER_CONFIG_SCHEMA)
+        self._solver_host = self._config['cpSolverHost']
+        self._camelModelFilePath = Path(self._config['request']['camelModelFilePath'])
+
+        self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi")
+        self._FCRcpxml_path = Path(self._config['request']['cpProblemFilePath'])
+        self._FCRcpxml = etree.parse(self._FCRcpxml_path)
+        self._nodeCandidatesFilePath = Path(self._config['request']['nodeCandidatesFilePath'])
+
+        self._xml_fillings = {}
+        for name in args_names:
+            self._xml_fillings[name] = self._FCRcpxml.find(
+                f"cpMetrics[@id='{name}']")[0]  # TODO check
+        self._request_part = self._config['request']
+
+    @abc.abstractmethod
+    def get_solution(self, avg_rsp_time: float) -> List[int]:
+        pass
+
+
+class LocalCPSolverSolutionSummoner(CPSolverSolutionSummoner):
     """ calls cp-solver for solution, and manages additional resources """
 
-    def __init__(self, cpsolver_config_path: Path):
-        config = validate_config(cpsolver_config_path, CPSOLVER_CONFIG_SCHEMA)
-        self.solver_host = config['cpSolverHost']
-        self.solution_file = f"{config['request']['camelModelFilePath'][:-4]}-solution.xmi"
-        self.FCRcpxml = etree.parse(config['request']['cpProblemFilePath'])
-        self.avg_rsp_time_xelem, = self.FCRcpxml.find(
-            "cpMetrics[@id='AvgResponseTime']")
-        self.req = config['request']
+    def __enter__(self):
+        self.tempfile = NamedTemporaryFile()
+        self._request_part['cpProblemFilePath'] = self.tempfile.name
+        return self
+
+    def __exit__(self, *exc):
+        self.tempfile.close()
+        if self._solutionFilePath.exists():
+            self._solutionFilePath.unlink()
+        return False
+
+    @lru_cache(maxsize=CACHE_SIZE)
+    def get_solution(self, solver_params: Dict) -> List[int]:  # TODO fix
+        """ returns solution as list of int """
+        for arg_name, arg_loc in self._xml_fillings:
+            arg_loc.set('value', str(solver_params[arg_name]))
+
+        self._FCRcpxml.write(self.tempfile.name,
+                             xml_declaration=True,
+                             encoding="ASCII")
+        r = requests.post(f'http://{self._solver_host}/constraintProblemSolutionFromFile',
+                          data=json.dumps(self._request_part),
+                          headers={'Content-Type': 'application/json'}
+                          )
+
+        if r.status_code != 200:
+            raise Exception("cp-solver returned non 200 status code")
+
+        sol_xelem = etree.parse(self._solutionFilePath).find("solution")
+        if sol_xelem is None:
+            raise CPSolverSolutionSummonerError(f"cp-solver didn't return solution for avg_rsp_time")
+        return [int(v.get('value', 0)) for v in sol_xelem.iterfind(".//value")]
+
+
+class DockerCPSolverSolutionSummoner(CPSolverSolutionSummoner):
+    __container: Container = None
+
+    def __init__(self,
+                 cpsolver_config_path: Path,
+                 args_names: List[str],
+                 container_name: str = None,
+                 docker_url: Optional[str] = None):
+        super().__init__(cpsolver_config_path, args_names)
+        if docker_url:
+            docker_client = docker.DockerClient(base_url=docker_url)
+        else:
+            docker_client = docker.from_env()
+
+        assert container_name is not None, 'Container name not specified!'
+        self.__container = docker_client.containers.get(container_name)
+        self._request_part['camelModelFilePath'] = f'/res/{self._camelModelFilePath.name}'
+        docker_put(self.__container, self._camelModelFilePath, Path('/res'))
+
+        self._request_part['cpProblemFilePath'] = f'/res/{self._FCRcpxml_path.name}'
+        docker_put(self.__container, self._FCRcpxml_path, Path('/res'))
+
+        self._request_part['nodeCandidatesFilePath'] = f'/res/{self._nodeCandidatesFilePath.name}'
+        docker_put(self.__container, self._nodeCandidatesFilePath, Path(f'/res'))
 
     def __enter__(self):
         self.tempfile = NamedTemporaryFile()
-        self.req['cpProblemFilePath'] = self.tempfile.name
+        self._request_part['cpProblemFilePath'] = f'/tmp/{Path(self.tempfile.name).name}'
         return self
 
     def __exit__(self, *exc):
         self.tempfile.close()
-        if os.path.exists(self.solution_file):
-            os.remove(self.solution_file)
+        if self._solutionFilePath.exists():
+            self._solutionFilePath.unlink()
         return False
 
     @lru_cache(maxsize=CACHE_SIZE)
-    def get_solution(self, avg_rsp_time: float) -> List[int]:  # TODO fix
+    def get_solution(self, solver_params: Dict) -> List[int]:
         """ returns solution as list of int """
 
-        self.avg_rsp_time_xelem.set('value', str(avg_rsp_time))
-        self.FCRcpxml.write(self.tempfile.name,
-                            xml_declaration=True, encoding="ASCII")
-        r = requests.post(f'http://{self.solver_host}/constraintProblemSolutionFromFile',
-                          data=json.dumps(self.req),
+        for arg_name, arg_loc in self._xml_fillings:
+            arg_loc.set('value', str(solver_params[arg_name]))
+        self._FCRcpxml.write(self.tempfile.name,
+                             xml_declaration=True, encoding="ASCII")
+        docker_put(self.__container, src=Path(self.tempfile.name), dst=Path('/tmp'))
+
+        r = requests.post(f'http://{self._solver_host}/constraintProblemSolutionFromFile',
+                          data=json.dumps(self._request_part),
                           headers={'Content-Type': 'application/json'}
                           )
 
         if r.status_code != 200:
             raise Exception("cp-solver returned non 200 status code")
 
-        sol_xelem = etree.parse(self.solution_file).find("solution")
+        docker_get(self.__container, Path(f'/res/{self._solutionFilePath.name}'), self._solutionFilePath)
+
+        sol_xelem = etree.parse(self._solutionFilePath).find("solution")
         if sol_xelem is None:
-            raise CPSolverSolutionSummonerError(f"cp-solver didn't return solution for avg_rsp_time = {avg_rsp_time}")
+            raise CPSolverSolutionSummonerError(f"cp-solver didn't return solution")
         return [int(v.get('value', 0)) for v in sol_xelem.iterfind(".//value")]
-- 
GitLab


From 4c8eb68490a60d6ee717513b617d674b72ac68c8 Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Sat, 6 Mar 2021 21:10:11 +0100
Subject: [PATCH 05/10] Main script compatible with new interface

---
 FCRgendata/data_config.json                  |  1 +
 FCRgendata/src/FCRGenData/__main__.py        | 35 +++++++++++++++-----
 FCRgendata/src/FCRGenData/data_aggregator.py | 18 ++--------
 3 files changed, 29 insertions(+), 25 deletions(-)

diff --git a/FCRgendata/data_config.json b/FCRgendata/data_config.json
index f563394..29017d0 100644
--- a/FCRgendata/data_config.json
+++ b/FCRgendata/data_config.json
@@ -21,6 +21,7 @@
         }
     ],
     "aggregated_columns": ["value1", "value2", "value3"],
+    "solver_cols":["value1", "value2"],
     "timedelta": 10,
     "max_time_gap": 100
 }
\ No newline at end of file
diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py
index 4db0c2a..f51ae26 100644
--- a/FCRgendata/src/FCRGenData/__main__.py
+++ b/FCRgendata/src/FCRGenData/__main__.py
@@ -5,8 +5,11 @@ import logging
 from contextlib import ExitStack
 from pathlib import Path
 
+from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.data_aggregator import DataAggregator
-from .solv_summoner import CPSolverSolutionSummoner, CPSolverSolutionSummonerError
+from FCRGenData.validate_config import validate_config
+from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \
+    LocalCPSolverSolutionSummoner
 
 logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
 
@@ -25,33 +28,47 @@ def run():
     parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file")
     parser.add_argument('gendata_config', action='store', type=_json_path,
                         description="Path to cpsolver json config file")
+    parser.add_argument('-r', '--docker', action='store_true', type=bool, default=False,
+                        description="Use dockerized cpsolver instead of localhost")
+    parser.add_argument('--container', action='store', type=str, default=None,
+                        description='Name of the docker container')
+    parser.add_argument('--docker-url', action='store', type=str, default=None,
+                        description='URL to the remote docker (local by default)')
 
     args = parser.parse_args()
 
     cpsolver_config_path = args.cp_config
-    gendata_config = args.gendata_config
+    gendata_config_path = Path(args.gendata_config)
+
+    gendata_config = validate_config(gendata_config_path, DATA_CONFIG_SCHEMA)
 
     # Create aggregator
-    cpsolver = CPSolverSolutionSummoner(cpsolver_config_path)
+    if args.docker:
+        cpsolver = DockerCPSolverSolutionSummoner(cpsolver_config_path,
+                                                  gendata_config['solver_cols'],
+                                                  args.container,
+                                                  args.docker_url)
+    else:
+
+        cpsolver = LocalCPSolverSolutionSummoner(cpsolver_config_path,
+                                                 gendata_config['solver_cols'])
 
-    aggregator = DataAggregator(json_path=gendata_config)
+    aggregator = DataAggregator(gendata_config)
 
     with ExitStack() as stack:  # substiture for multiline with statement
         summoner = stack.enter_context(cpsolver)
-        csvfile = stack.enter_context(open(aggregator.outpath, 'w', newline=''))
+        csvfile = stack.enter_context(open(gendata_config['outpath'], 'w', newline=''))
 
         writer = csv.writer(csvfile, delimiter=',')
         writer.writerow(aggregator.column_names)
 
         it = 0
-        solver_cols_indexes = [aggregator.column_names.index(colname) for colname in aggregator.solver_column_names]
 
-        # We can also get annotated rows (what should be easier?)
-        for row in aggregator.row_generator():
+        for row in aggregator.row_generator_annotated():
             it += 1
             print(f'Row {it}')
             try:
-                solver_params = map(lambda index: row[index], solver_cols_indexes)
+                solver_params = {name: row[name] for name in gendata_config['solver_cols']}
                 solution = summoner.get_solution(solver_params)
                 writer.writerow((*row, *solution))
             except CPSolverSolutionSummonerError as err:
diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py
index b56e7ff..80b45dc 100644
--- a/FCRgendata/src/FCRGenData/data_aggregator.py
+++ b/FCRgendata/src/FCRGenData/data_aggregator.py
@@ -3,10 +3,8 @@ import logging
 from pathlib import Path
 from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator
 
-from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.interpolator import InterpolatedDataStream
 from FCRGenData.rawDataReader import RowCSVReader, IRowDataProvider
-from FCRGenData.validate_config import validate_config
 
 
 def _timestamp_generator(t0: dt.datetime, tstep: dt.timedelta) -> Generator[dt.datetime, None, None]:
@@ -33,11 +31,8 @@ class DataAggregator:
     __delta: dt.timedelta
     __max_time_gap: dt.timedelta
 
-    def __init__(self, json_path: Union[str, Path]):
-        if isinstance(json_path, str):
-            json_path = Path(json_path)
-
-        self.__conf = validate_config(json_path, DATA_CONFIG_SCHEMA)
+    def __init__(self, config: Dict):
+        self.__conf = config
         self.__datasources = []
 
         # Roottime data source
@@ -80,15 +75,6 @@ class DataAggregator:
         self.__roottime_reader = roottime_data[1]
         self.__roottime_datasource = roottime_data[0]
 
-    @property
-    def outpath(self) -> Path:
-        return Path(self.__conf['outpath'])
-
-    @property
-    def solver_column_names(self) -> List[str]:
-        """List of the column names reuired by the cpsolver"""
-        return self.__conf['solver_cols']
-
     @property
     def column_names(self) -> List[str]:
         """Names of the variable columns"""
-- 
GitLab


From 847929d3eb1ac5cff685a92d924a62d67da4f96e Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Sat, 6 Mar 2021 21:10:11 +0100
Subject: [PATCH 06/10] Main script compatible with new interface

---
 FCRgendata/data_config.json                  |  3 +-
 FCRgendata/src/FCRGenData/__main__.py        | 35 +++++++++++++++-----
 FCRgendata/src/FCRGenData/data_aggregator.py | 18 ++--------
 3 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/FCRgendata/data_config.json b/FCRgendata/data_config.json
index f563394..9be9b42 100644
--- a/FCRgendata/data_config.json
+++ b/FCRgendata/data_config.json
@@ -1,5 +1,5 @@
 {
-    "outpath": "/home/szysad/mimuw/3rok/ZPP/my-time-series/FCR-time-series/output/generated-data4.csv",
+    "outpath": "output.csv",
     "datasources": [
         {
             "desc": "desc <optinal>",
@@ -21,6 +21,7 @@
         }
     ],
     "aggregated_columns": ["value1", "value2", "value3"],
+    "solver_cols":["value1", "value2"],
     "timedelta": 10,
     "max_time_gap": 100
 }
\ No newline at end of file
diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py
index 4db0c2a..f51ae26 100644
--- a/FCRgendata/src/FCRGenData/__main__.py
+++ b/FCRgendata/src/FCRGenData/__main__.py
@@ -5,8 +5,11 @@ import logging
 from contextlib import ExitStack
 from pathlib import Path
 
+from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.data_aggregator import DataAggregator
-from .solv_summoner import CPSolverSolutionSummoner, CPSolverSolutionSummonerError
+from FCRGenData.validate_config import validate_config
+from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \
+    LocalCPSolverSolutionSummoner
 
 logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
 
@@ -25,33 +28,47 @@ def run():
     parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file")
     parser.add_argument('gendata_config', action='store', type=_json_path,
                         description="Path to cpsolver json config file")
+    parser.add_argument('-r', '--docker', action='store_true', type=bool, default=False,
+                        description="Use dockerized cpsolver instead of localhost")
+    parser.add_argument('--container', action='store', type=str, default=None,
+                        description='Name of the docker container')
+    parser.add_argument('--docker-url', action='store', type=str, default=None,
+                        description='URL to the remote docker (local by default)')
 
     args = parser.parse_args()
 
     cpsolver_config_path = args.cp_config
-    gendata_config = args.gendata_config
+    gendata_config_path = Path(args.gendata_config)
+
+    gendata_config = validate_config(gendata_config_path, DATA_CONFIG_SCHEMA)
 
     # Create aggregator
-    cpsolver = CPSolverSolutionSummoner(cpsolver_config_path)
+    if args.docker:
+        cpsolver = DockerCPSolverSolutionSummoner(cpsolver_config_path,
+                                                  gendata_config['solver_cols'],
+                                                  args.container,
+                                                  args.docker_url)
+    else:
+
+        cpsolver = LocalCPSolverSolutionSummoner(cpsolver_config_path,
+                                                 gendata_config['solver_cols'])
 
-    aggregator = DataAggregator(json_path=gendata_config)
+    aggregator = DataAggregator(gendata_config)
 
     with ExitStack() as stack:  # substiture for multiline with statement
         summoner = stack.enter_context(cpsolver)
-        csvfile = stack.enter_context(open(aggregator.outpath, 'w', newline=''))
+        csvfile = stack.enter_context(open(gendata_config['outpath'], 'w', newline=''))
 
         writer = csv.writer(csvfile, delimiter=',')
         writer.writerow(aggregator.column_names)
 
         it = 0
-        solver_cols_indexes = [aggregator.column_names.index(colname) for colname in aggregator.solver_column_names]
 
-        # We can also get annotated rows (what should be easier?)
-        for row in aggregator.row_generator():
+        for row in aggregator.row_generator_annotated():
             it += 1
             print(f'Row {it}')
             try:
-                solver_params = map(lambda index: row[index], solver_cols_indexes)
+                solver_params = {name: row[name] for name in gendata_config['solver_cols']}
                 solution = summoner.get_solution(solver_params)
                 writer.writerow((*row, *solution))
             except CPSolverSolutionSummonerError as err:
diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py
index b56e7ff..80b45dc 100644
--- a/FCRgendata/src/FCRGenData/data_aggregator.py
+++ b/FCRgendata/src/FCRGenData/data_aggregator.py
@@ -3,10 +3,8 @@ import logging
 from pathlib import Path
 from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator
 
-from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.interpolator import InterpolatedDataStream
 from FCRGenData.rawDataReader import RowCSVReader, IRowDataProvider
-from FCRGenData.validate_config import validate_config
 
 
 def _timestamp_generator(t0: dt.datetime, tstep: dt.timedelta) -> Generator[dt.datetime, None, None]:
@@ -33,11 +31,8 @@ class DataAggregator:
     __delta: dt.timedelta
     __max_time_gap: dt.timedelta
 
-    def __init__(self, json_path: Union[str, Path]):
-        if isinstance(json_path, str):
-            json_path = Path(json_path)
-
-        self.__conf = validate_config(json_path, DATA_CONFIG_SCHEMA)
+    def __init__(self, config: Dict):
+        self.__conf = config
         self.__datasources = []
 
         # Roottime data source
@@ -80,15 +75,6 @@ class DataAggregator:
         self.__roottime_reader = roottime_data[1]
         self.__roottime_datasource = roottime_data[0]
 
-    @property
-    def outpath(self) -> Path:
-        return Path(self.__conf['outpath'])
-
-    @property
-    def solver_column_names(self) -> List[str]:
-        """List of the column names reuired by the cpsolver"""
-        return self.__conf['solver_cols']
-
     @property
     def column_names(self) -> List[str]:
         """Names of the variable columns"""
-- 
GitLab


From aed967bd7263272d2e543400385b5b410f98b80a Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Sun, 7 Mar 2021 01:19:50 +0100
Subject: [PATCH 07/10] Bug fixes

---
 FCRgendata/data_config.json                | 4 ++--
 FCRgendata/src/FCRGenData/solv_summoner.py | 2 --
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/FCRgendata/data_config.json b/FCRgendata/data_config.json
index 9be9b42..b72a2e6 100644
--- a/FCRgendata/data_config.json
+++ b/FCRgendata/data_config.json
@@ -2,7 +2,7 @@
     "outpath": "output.csv",
     "datasources": [
         {
-            "desc": "desc <optinal>",
+            "desc": "desc <optional>",
             "files": [
                 {
                     "desc": "desc",
@@ -13,7 +13,7 @@
             "values": [
                 {
                     "colname": "value",
-                    "alias": "valiue1",
+                    "alias": "value1",
                     "cumulation_period": 60
                 }
             ],
diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py
index 7928a1f..910c7ee 100644
--- a/FCRgendata/src/FCRGenData/solv_summoner.py
+++ b/FCRgendata/src/FCRGenData/solv_summoner.py
@@ -19,8 +19,6 @@ from FCRGenData.validate_config import validate_config
 
 CACHE_SIZE = 2 ** 10
 
-def create_archive(filepath: Path) -> BytesIO:
-    """Packs content of the file to the tar archive.
 
 def create_archive(filepath: Path) -> BytesIO:
     """Packs content of the file to the tar archive.
-- 
GitLab


From df1e013a7721f2199c47faf086ad3a95d7c3d642 Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Sun, 7 Mar 2021 02:13:25 +0100
Subject: [PATCH 08/10] Bug fixes

---
 FCRgendata/config.json                       |  5 +-
 FCRgendata/setup.py                          | 79 +++-----------------
 FCRgendata/src/FCRGenData/__main__.py        | 15 ++--
 FCRgendata/src/FCRGenData/config_schemas.py  | 13 ++--
 FCRgendata/src/FCRGenData/data_aggregator.py |  2 +-
 FCRgendata/src/FCRGenData/solv_summoner.py   |  8 +-
 6 files changed, 31 insertions(+), 91 deletions(-)

diff --git a/FCRgendata/config.json b/FCRgendata/config.json
index 18f8785..6a6144f 100644
--- a/FCRgendata/config.json
+++ b/FCRgendata/config.json
@@ -11,8 +11,5 @@
             "uuid": "fb6280ec-1ab8-11e7-93ae-92361f002671"
             }
         },
-    "cpSolverHost": "localhost:8080",
-    "outpath": "<path>/generated_data.csv",
-    "AvgResponseTimeTableFilePath": "<path to avgresponsedata>",
-    "predictionsFilePath": "<path to predictions>"
+    "cpSolverHost": "localhost:8080"
 }
\ No newline at end of file
diff --git a/FCRgendata/setup.py b/FCRgendata/setup.py
index 30f974b..7f578e9 100644
--- a/FCRgendata/setup.py
+++ b/FCRgendata/setup.py
@@ -5,15 +5,15 @@ https://github.com/pypa/sampleproject
 Modified by Madoshakalaka@Github (dependency links added)
 """
 
-# Always prefer setuptools over distutils
-from setuptools import setup, find_packages
-from os import path
-
 # io.open is needed for projects that support Python 2.7
 # It ensures open() defaults to text mode with universal newlines,
 # and accepts an argument to specify the text encoding
 # Python 3 only projects can skip this import
 from io import open
+from os import path
+
+# Always prefer setuptools over distutils
+from setuptools import setup
 
 here = path.abspath(path.dirname(__file__))
 
@@ -25,63 +25,8 @@ with open(path.join(here, "README.md"), encoding="utf-8") as f:
 # Fields marked as "Optional" may be commented out.
 
 setup(
-    # This is the name of your project. The first time you publish this
-    # package, this name will be registered for you. It will determine how
-    # users can install this project, e.g.:
-    #
-    # $ pip install sampleproject
-    #
-    # And where it will live on PyPI: https://pypi.org/project/sampleproject/
-    #
-    # There are some restrictions on what makes a valid project name
-    # specification here:
-    # https://packaging.python.org/specifications/core-metadata/#name
-    name="FCRgenData",  # Required
-    # Versions should comply with PEP 440:
-    # https://www.python.org/dev/peps/pep-0440/
-    #
-    # For a discussion on single-sourcing the version across setup.py and the
-    # project code, see
-    # https://packaging.python.org/en/latest/single_source_version.html
-    version="0.0.0",  # Required
-    # This is a one-line description or tagline of what your project does. This
-    # corresponds to the "Summary" metadata field:
-    # https://packaging.python.org/specifications/core-metadata/#summary
-    #description="A sample Python project",  # Optional
-    # This is an optional longer description of your project that represents
-    # the body of text which users will see when they visit PyPI.
-    #
-    # Often, this is the same as your README, so you can just read it in from
-    # that file directly (as we have already done above)
-    #
-    # This field corresponds to the "Description" metadata field:
-    # https://packaging.python.org/specifications/core-metadata/#description-optional
-    #long_description=long_description,  # Optional
-    # Denotes that our long_description is in Markdown; valid values are
-    # text/plain, text/x-rst, and text/markdown
-    #
-    # Optional if long_description is written in reStructuredText (rst) but
-    # required for plain-text or Markdown; if unspecified, "applications should
-    # attempt to render [the long_description] as text/x-rst; charset=UTF-8 and
-    # fall back to text/plain if it is not valid rst" (see link below)
-    #
-    # This field corresponds to the "Description-Content-Type" metadata field:
-    # https://packaging.python.org/specifications/core-metadata/#description-content-type-optional
-    #long_description_content_type="text/markdown",  # Optional (see note above)
-    # This should be a valid link to your project's main homepage.
-    #
-    # This field corresponds to the "Home-Page" metadata field:
-    # https://packaging.python.org/specifications/core-metadata/#home-page-optional
-    #url="https://github.com/pypa/sampleproject",  # Optional
-    # This should be your name or the name of the organization which owns the
-    # project.
-    #author="The Python Packaging Authority",  # Optional
-    # This should be a valid email address corresponding to the author listed
-    # above.
-    #author_email="pypa-dev@googlegroups.com",  # Optional
-    # Classifiers help users find your project by categorizing it.
-    #
-    # For a list of valid classifiers, see https://pypi.org/classifiers/
+    name="FCRgenData",
+    version="0.0.1",
     classifiers=[  # Optional
         # How mature is this project? Common values are
         #   3 - Alpha
@@ -108,8 +53,8 @@ setup(
     # project page. What does your project relate to?
     #
     # Note that this is a string of words separated by whitespace, not a list.
-    #keywords="sample setuptools development",  # Optional
-    #keywords="sample setuptools development",  # Optional
+    # keywords="sample setuptools development",  # Optional
+    # keywords="sample setuptools development",  # Optional
     # You can just specify package directories manually here if your project is
     # simple. Or you can use find_packages().
     #
@@ -119,12 +64,8 @@ setup(
     #
     #   py_modules=["my_module"],
     #
-    packages=find_packages(),  # Required
-    # Specify which Python versions you support. In contrast to the
-    # 'Programming Language' classifiers above, 'pip install' will check this
-    # and refuse to install the project if the version does not match. If you
-    # do not support Python 2, you can simplify this to '>=3.5' or similar, see
-    # https://packaging.python.org/guides/distributing-packages-using-setuptools/#python-requires
+    packages=['FCRGenData'],
+    packages_dir={'FCRGenData': 'src'},
     python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4",
     # This field lists other packages that your project depends on to run.
     # Any package you put here will be installed by pip when your project is
diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py
index f51ae26..10a6fa1 100644
--- a/FCRgendata/src/FCRGenData/__main__.py
+++ b/FCRgendata/src/FCRGenData/__main__.py
@@ -8,7 +8,7 @@ from pathlib import Path
 from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.data_aggregator import DataAggregator
 from FCRGenData.validate_config import validate_config
-from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \
+from FCRGenData.solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \
     LocalCPSolverSolutionSummoner
 
 logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
@@ -25,15 +25,16 @@ def run():
     # Argument parsing
     parser = argparse.ArgumentParser(description="Very not advanced data aggregator with interpolation")
 
-    parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file")
+    parser.add_argument('cp_config', action='store', type=_json_path,
+                        help="Path to cpsolver json config file")
     parser.add_argument('gendata_config', action='store', type=_json_path,
-                        description="Path to cpsolver json config file")
-    parser.add_argument('-r', '--docker', action='store_true', type=bool, default=False,
-                        description="Use dockerized cpsolver instead of localhost")
+                        help="Path to cpsolver json config file")
+    parser.add_argument('-r', '--docker', action='store_true',  default=False,
+                        help="Use dockerized cpsolver instead of localhost")
     parser.add_argument('--container', action='store', type=str, default=None,
-                        description='Name of the docker container')
+                        help='Name of the docker container')
     parser.add_argument('--docker-url', action='store', type=str, default=None,
-                        description='URL to the remote docker (local by default)')
+                        help='URL to the remote docker (local by default)')
 
     args = parser.parse_args()
 
diff --git a/FCRgendata/src/FCRGenData/config_schemas.py b/FCRgendata/src/FCRGenData/config_schemas.py
index 31c29eb..c6dbc79 100644
--- a/FCRgendata/src/FCRGenData/config_schemas.py
+++ b/FCRgendata/src/FCRGenData/config_schemas.py
@@ -39,6 +39,7 @@ DATA_CONFIG_SCHEMA = {
                 "type": "object",
                 "properties": {
                     "desc": {"type": "string"},
+                    "type": {"type": "string"},
                     "source":
                         {
                             "type": "object",
@@ -57,15 +58,15 @@ DATA_CONFIG_SCHEMA = {
                             "properties": {
                                 "column_name": {"type": "string"},
                                 "alias": {"type": "string"},
-                                "cumulation_period": {"type": "int"}
+                                "cumulation_period": {"type": "integer"}
                             },
                             "required": ["column_name"]
                         }
                     },
                     "time_root":
-                        {"type": "bool"}
+                        {"type": "boolean"}
                 },
-                "required": ["desc", "source", "values"]
+                "required": ["desc", "type", "source", "values"]
             }
         },
         "aggregated_columns":
@@ -75,14 +76,14 @@ DATA_CONFIG_SCHEMA = {
                 "items": {"type": "string"}
             },
         "timedelta":
-            {"type": "int"},
+            {"type": "integer"},
         "max_time_gap":
-            {"type": "int"},
+            {"type": "integer"},
         "solver_cols":
             {"type": "array",
              "minItems": 1,
              "items": {"type": "string"}
              }
     },
-    "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "avg_response_time_colname"]
+    "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "solver_cols"]
 }
diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py
index 80b45dc..6891278 100644
--- a/FCRgendata/src/FCRGenData/data_aggregator.py
+++ b/FCRgendata/src/FCRGenData/data_aggregator.py
@@ -34,7 +34,7 @@ class DataAggregator:
     def __init__(self, config: Dict):
         self.__conf = config
         self.__datasources = []
-
+        self.__header = []
         # Roottime data source
         roottime_data = None
         max_time_gap = dt.timedelta(seconds=self.__conf['max_time_gap'])
diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py
index 910c7ee..939a9f9 100644
--- a/FCRgendata/src/FCRGenData/solv_summoner.py
+++ b/FCRgendata/src/FCRGenData/solv_summoner.py
@@ -77,10 +77,10 @@ class CPSolverSolutionSummoner(abc.ABC):
     _camelModelFilePath: Path
     _solutionFilePath: Path
     _FCRcpxml_path: Path
-    _FCRcpxml: ElementTree  # TODO    VERIFY
+    _FCRcpxml: ElementTree
     _nodeCandidatesFilePath: Path
 
-    _xml_fillings: Dict[str, Element]  # TODO check
+    _xml_fillings: Dict[str, Element]
     _request_part: Dict
 
     def __init__(self,
@@ -92,13 +92,13 @@ class CPSolverSolutionSummoner(abc.ABC):
 
         self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi")
         self._FCRcpxml_path = Path(self._config['request']['cpProblemFilePath'])
-        self._FCRcpxml = etree.parse(self._FCRcpxml_path)
+        self._FCRcpxml = etree.parse(str(self._FCRcpxml_path))
         self._nodeCandidatesFilePath = Path(self._config['request']['nodeCandidatesFilePath'])
 
         self._xml_fillings = {}
         for name in args_names:
             self._xml_fillings[name] = self._FCRcpxml.find(
-                f"cpMetrics[@id='{name}']")[0]  # TODO check
+                f"cpMetrics[@id='{name}']")[0]
         self._request_part = self._config['request']
 
     @abc.abstractmethod
-- 
GitLab


From 040a52443859ed232fdd13267aa32c89238786d7 Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Sun, 7 Mar 2021 20:43:15 +0100
Subject: [PATCH 09/10] Bug fixes

---
 FCRgendata/src/FCRGenData/__main__.py         | 16 ++--
 FCRgendata/src/FCRGenData/data_aggregator.py  | 10 ++-
 FCRgendata/src/FCRGenData/interpolator.py     | 88 +++++++++++--------
 .../rawDataReader/row_data_reader.py          |  2 +-
 FCRgendata/src/FCRGenData/solv_summoner.py    | 27 +++---
 .../tests/iterpolator/mock_row_provider.py    |  4 +-
 6 files changed, 90 insertions(+), 57 deletions(-)

diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py
index 10a6fa1..13199ef 100644
--- a/FCRgendata/src/FCRGenData/__main__.py
+++ b/FCRgendata/src/FCRGenData/__main__.py
@@ -8,7 +8,7 @@ from pathlib import Path
 from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA
 from FCRGenData.data_aggregator import DataAggregator
 from FCRGenData.validate_config import validate_config
-from FCRGenData.solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \
+from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \
     LocalCPSolverSolutionSummoner
 
 logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
@@ -25,11 +25,10 @@ def run():
     # Argument parsing
     parser = argparse.ArgumentParser(description="Very not advanced data aggregator with interpolation")
 
-    parser.add_argument('cp_config', action='store', type=_json_path,
-                        help="Path to cpsolver json config file")
+    parser.add_argument('cp_config', action='store', type=_json_path, help="Path to cpsolver json config file")
     parser.add_argument('gendata_config', action='store', type=_json_path,
                         help="Path to cpsolver json config file")
-    parser.add_argument('-r', '--docker', action='store_true',  default=False,
+    parser.add_argument('-r', '--docker', action='store_true', default=False,
                         help="Use dockerized cpsolver instead of localhost")
     parser.add_argument('--container', action='store', type=str, default=None,
                         help='Name of the docker container')
@@ -57,7 +56,7 @@ def run():
     aggregator = DataAggregator(gendata_config)
 
     with ExitStack() as stack:  # substiture for multiline with statement
-        summoner = stack.enter_context(cpsolver)
+        # summoner = stack.enter_context(cpsolver)
         csvfile = stack.enter_context(open(gendata_config['outpath'], 'w', newline=''))
 
         writer = csv.writer(csvfile, delimiter=',')
@@ -69,9 +68,10 @@ def run():
             it += 1
             print(f'Row {it}')
             try:
-                solver_params = {name: row[name] for name in gendata_config['solver_cols']}
-                solution = summoner.get_solution(solver_params)
-                writer.writerow((*row, *solution))
+                with cpsolver as summoner:
+                    solver_params = [row[name] for name in gendata_config['solver_cols']]
+                    solution = summoner.get_solution(*solver_params)
+                    writer.writerow((*row, *solution))
             except CPSolverSolutionSummonerError as err:
                 logging.error(
                     "CPSolverSolutionSummoner raised exception while "
diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py
index 6891278..7bd5f63 100644
--- a/FCRgendata/src/FCRGenData/data_aggregator.py
+++ b/FCRgendata/src/FCRGenData/data_aggregator.py
@@ -3,6 +3,8 @@ import logging
 from pathlib import Path
 from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator
 
+import numpy as np
+
 from FCRGenData.interpolator import InterpolatedDataStream
 from FCRGenData.rawDataReader import RowCSVReader, IRowDataProvider
 
@@ -132,11 +134,17 @@ class DataAggregator:
                                 break
                         else:
                             if value_info['colname'] == value_name:
-                                filtered_row[value_name] = rowdict[value_info['colname']]
+                                filtered_row[value_name] = rowdict[value_name]
                                 break
                     if value_name in filtered_row:
                         break
                 if value_name not in filtered_row:
                     raise KeyError("Value specified in aggregated_values not found among provided datasources")
 
+            # Convert values to pythonic
+            for key, value in filtered_row.items():
+                if any(map(lambda numpy_type: isinstance(value, numpy_type),
+                           [np.float64, np.float32, np.int16, np.uint32])):
+                    filtered_row[key] = value.item()
+
             yield filtered_row
diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py
index e06d445..aac3319 100644
--- a/FCRgendata/src/FCRGenData/interpolator.py
+++ b/FCRgendata/src/FCRGenData/interpolator.py
@@ -1,8 +1,8 @@
 import datetime as dt
-from typing import Dict, Generator
 from numbers import Number
+from typing import Dict, Generator, Optional
+
 from scipy import interpolate
-import itertools
 
 from FCRGenData.rawDataReader import IRowDataProvider
 
@@ -47,6 +47,9 @@ class InterpolatedDataStream:
             performance issues.
     '''
 
+    _last_ts: dt.datetime
+    _t0: dt.datetime
+
     @staticmethod
     def __ts_embedding(t1: dt.datetime, t2: dt.datetime) -> int:
         return (t1 - t2).seconds
@@ -54,57 +57,72 @@ class InterpolatedDataStream:
     def __init__(self,
                  reader: IRowDataProvider,
                  timestamp_generator: Generator[dt.datetime, None, None],
-                 datasource=None,
-                 ):
-        
-        gen = iter(timestamp_generator)
-        self.t0 = next(gen)
-        self.ts_gen = itertools.chain([self.t0], gen)
-        #  trick so that we can look ahead for first elem of generator
-
-        self.last_ts = self.t0
-        self.ts_col_name = reader.timestamp_column_name
-        ts_col_embed = reader.column_names.index(self.ts_col_name)
+                 datasource=None):
+
+        self._gen = timestamp_generator
+        self._t0 = self._last_ts = reader.peek_t0()
+        previous_timestamp: Optional[dt.datetime] = None
+
+        self._ts_col_name = reader.timestamp_column_name
         col_types = reader.columns
-        self.intp_col_embed = {cname: i for i, cname in enumerate(col_types) if issubclass(col_types[cname], Number)}
+        self.intp_col_embed = [colname for colname, coltype in col_types.items() if issubclass(coltype, Number)]
+        self.intp_col_str = [colname for colname, coltype in col_types.items() if issubclass(coltype, str)]
 
         # col name -> (timestamp embedding, given value)
-        self.raw_data = {cname: ([], []) for cname in self.intp_col_embed}
-        row_generator = reader.row_generator_annotated()
+        self.raw_data = {colname: ([], []) for colname in self.intp_col_embed}
+        self._str_interpolants = {colname: [] for colname in self.intp_col_str}
 
         # populate self.inp_data
-        for row in row_generator:
-            for cname, col_idx in self.intp_col_embed.items():
-                if row[col_idx] is None:
+        for row in reader.row_generator_annotated():
+
+            row_ts = row[self._ts_col_name]
+            if previous_timestamp:
+                tdelta_embed = self.__ts_embedding(row_ts, previous_timestamp)
+            else:
+                tdelta_embed = 0
+
+            previous_timestamp = row_ts
+            for colname in self.intp_col_embed:
+                if row[colname] is None:
                     continue
 
-                row_ts = row[ts_col_embed]
-                self.last_ts = max(self.last_ts, row_ts)
-                tdelta_embed = self.__ts_embedding(row_ts, self.t0)
-                intp_t = self.raw_data[cname]
-                intp_t[0].append(tdelta_embed)
-                intp_t[1].append(row[col_idx])
-        
+                self.raw_data[colname][0].append(tdelta_embed)
+                self.raw_data[colname][1].append(row[colname])
+
+            for colname_str in self.intp_col_str:
+                if row[colname_str] is None:
+                    continue
+
+                row_ts = row[self._ts_col_name]
+                self._str_interpolants[colname_str].append((row_ts, row[colname_str]))
+
         if any(map(lambda v: len(v[0]) == len(v[1]) == 0, self.raw_data.values())):
             raise EmptyReaderExcpetion()
 
         # interpolate aggregated data
-        self.interpolants = dict()
-        for cname, cdata in self.raw_data.items():
-            self.interpolants[cname] = interpolate.interp1d(x=cdata[0], y=cdata[1], kind='linear')
+        self.interpolants = {}
+        for colname, cdata in self.raw_data.items():
+            self.interpolants[colname] = interpolate.interp1d(x=cdata[0], y=cdata[1], kind='linear')
 
     def __iter__(self):
         return self
 
     def __next__(self) -> Dict:
-        ts = next(self.ts_gen)
-        if ts > self.last_ts:
+        ts = next(self._gen)
+        if ts > self._last_ts:
             raise StopIteration
 
-        row_dict = {self.ts_col_name: ts}
-        ts_embed = self.__ts_embedding(ts, self.t0)
+        row_dict = {self._ts_col_name: ts}
+        ts_embed = self.__ts_embedding(ts, self._t0)
+
+        for colname, interpolant in self.interpolants.items():
+            row_dict[colname] = interpolant(ts_embed).flat[0]
 
-        for cname in self.interpolants:
-            row_dict[cname] = self.interpolants[cname](ts_embed).flat[0]
+        for colname, interpolant in self._str_interpolants.items():
+            timestamp, value = interpolant[0]
+            while ts > timestamp and len(interpolant) > 1:
+                interpolant.pop(0)
+                timestamp, value = interpolant[0]
+            row_dict[colname] = value
 
         return row_dict
diff --git a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
index 01f7aed..4bb1b2e 100644
--- a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
+++ b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py
@@ -65,7 +65,7 @@ class IRowDataProvider(abc.ABC):
         pass
 
     @abc.abstractmethod
-    def row_generator(self) -> Generator[Iterable[type], None, None]:
+    def row_generator(self) -> Generator[List[any], None, None]:
         """Generator over raw data, provides rows of data in increasing order (by timestamp)
         Returns list with values (order same as in column_names).
         """
diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py
index 939a9f9..a26b434 100644
--- a/FCRgendata/src/FCRGenData/solv_summoner.py
+++ b/FCRgendata/src/FCRGenData/solv_summoner.py
@@ -77,10 +77,11 @@ class CPSolverSolutionSummoner(abc.ABC):
     _camelModelFilePath: Path
     _solutionFilePath: Path
     _FCRcpxml_path: Path
-    _FCRcpxml: ElementTree
+    _FCRcpxml: ElementTree  # TODO    VERIFY
     _nodeCandidatesFilePath: Path
 
-    _xml_fillings: Dict[str, Element]
+    _args_names: List[str]
+    _xml_fillings: Dict[str, Element]  # TODO check
     _request_part: Dict
 
     def __init__(self,
@@ -90,11 +91,13 @@ class CPSolverSolutionSummoner(abc.ABC):
         self._solver_host = self._config['cpSolverHost']
         self._camelModelFilePath = Path(self._config['request']['camelModelFilePath'])
 
-        self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi")
+        # self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi").name
+        self._solutionFilePath = self._camelModelFilePath.parent.joinpath(f"{self._camelModelFilePath.name}-solution.xmi")
         self._FCRcpxml_path = Path(self._config['request']['cpProblemFilePath'])
         self._FCRcpxml = etree.parse(str(self._FCRcpxml_path))
         self._nodeCandidatesFilePath = Path(self._config['request']['nodeCandidatesFilePath'])
 
+        self._args_names = args_names
         self._xml_fillings = {}
         for name in args_names:
             self._xml_fillings[name] = self._FCRcpxml.find(
@@ -102,7 +105,7 @@ class CPSolverSolutionSummoner(abc.ABC):
         self._request_part = self._config['request']
 
     @abc.abstractmethod
-    def get_solution(self, avg_rsp_time: float) -> List[int]:
+    def get_solution(self, *params_list) -> List[int]:
         pass
 
 
@@ -121,10 +124,12 @@ class LocalCPSolverSolutionSummoner(CPSolverSolutionSummoner):
         return False
 
     @lru_cache(maxsize=CACHE_SIZE)
-    def get_solution(self, solver_params: Dict) -> List[int]:  # TODO fix
+    def get_solution(self, *params_list) -> List[int]:  # TODO fix
         """ returns solution as list of int """
-        for arg_name, arg_loc in self._xml_fillings:
-            arg_loc.set('value', str(solver_params[arg_name]))
+
+        for arg_name, arg_value in zip(self._args_names, params_list):
+            arg_loc = self._xml_fillings[arg_name]
+            arg_loc.set('value', str(arg_value))
 
         self._FCRcpxml.write(self.tempfile.name,
                              xml_declaration=True,
@@ -180,11 +185,13 @@ class DockerCPSolverSolutionSummoner(CPSolverSolutionSummoner):
         return False
 
     @lru_cache(maxsize=CACHE_SIZE)
-    def get_solution(self, solver_params: Dict) -> List[int]:
+    def get_solution(self, *params_list) -> List[int]:
         """ returns solution as list of int """
 
-        for arg_name, arg_loc in self._xml_fillings:
-            arg_loc.set('value', str(solver_params[arg_name]))
+        for arg_name, arg_value in zip(self._args_names, params_list):
+            arg_loc = self._xml_fillings[arg_name]
+            arg_loc.set('value', str(arg_value))
+
         self._FCRcpxml.write(self.tempfile.name,
                              xml_declaration=True, encoding="ASCII")
         docker_put(self.__container, src=Path(self.tempfile.name), dst=Path('/tmp'))
diff --git a/FCRgendata/tests/iterpolator/mock_row_provider.py b/FCRgendata/tests/iterpolator/mock_row_provider.py
index f48afdc..388f365 100644
--- a/FCRgendata/tests/iterpolator/mock_row_provider.py
+++ b/FCRgendata/tests/iterpolator/mock_row_provider.py
@@ -14,11 +14,11 @@ class MockRowDataProvider(IRowDataProvider):
     def __init__(self,
                  data: MockData,
                  max_time_difference: dt.timedelta):
-        super().__init__(timestamp_column_name=data['ts_col_name'],
+        super().__init__(timestamp_column_name=data['_ts_col_name'],
                          max_time_difference=max_time_difference)
         self.data = data['data']
         self.headers = data['headers']
-        self.ts_col_name = data['ts_col_name']
+        self.ts_col_name = data['_ts_col_name']
         self.types = data['types']
 
     @property
-- 
GitLab


From bcff3d1b30ec583ccf5c8affa1799c4050a8f328 Mon Sep 17 00:00:00 2001
From: jkk <jk394387@students.mimuw.edu.pl>
Date: Sun, 7 Mar 2021 22:58:16 +0100
Subject: [PATCH 10/10] test fixes

---
 FCRgendata/src/FCRGenData/interpolator.py         | 11 ++++++++---
 FCRgendata/tests/iterpolator/mock_row_provider.py |  8 ++++----
 FCRgendata/tests/iterpolator/test_interpolator.py |  6 +-----
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py
index aac3319..b80bd38 100644
--- a/FCRgendata/src/FCRGenData/interpolator.py
+++ b/FCRgendata/src/FCRGenData/interpolator.py
@@ -47,8 +47,8 @@ class InterpolatedDataStream:
             performance issues.
     '''
 
-    _last_ts: dt.datetime
-    _t0: dt.datetime
+    _last_ts: Optional[dt.datetime]
+    _t0: Optional[dt.datetime]
 
     @staticmethod
     def __ts_embedding(t1: dt.datetime, t2: dt.datetime) -> int:
@@ -60,7 +60,7 @@ class InterpolatedDataStream:
                  datasource=None):
 
         self._gen = timestamp_generator
-        self._t0 = self._last_ts = reader.peek_t0()
+        self._t0 = self._last_ts = None
         previous_timestamp: Optional[dt.datetime] = None
 
         self._ts_col_name = reader.timestamp_column_name
@@ -109,6 +109,11 @@ class InterpolatedDataStream:
 
     def __next__(self) -> Dict:
         ts = next(self._gen)
+        if self._t0 is None:
+            self._t0 = ts
+        if self._last_ts is None:
+            self._last_ts = ts
+
         if ts > self._last_ts:
             raise StopIteration
 
diff --git a/FCRgendata/tests/iterpolator/mock_row_provider.py b/FCRgendata/tests/iterpolator/mock_row_provider.py
index 388f365..3000671 100644
--- a/FCRgendata/tests/iterpolator/mock_row_provider.py
+++ b/FCRgendata/tests/iterpolator/mock_row_provider.py
@@ -14,11 +14,11 @@ class MockRowDataProvider(IRowDataProvider):
     def __init__(self,
                  data: MockData,
                  max_time_difference: dt.timedelta):
-        super().__init__(timestamp_column_name=data['_ts_col_name'],
+        super().__init__(timestamp_column_name=data['ts_col_name'],
                          max_time_difference=max_time_difference)
         self.data = data['data']
         self.headers = data['headers']
-        self.ts_col_name = data['_ts_col_name']
+        self.ts_col_name = data['ts_col_name']
         self.types = data['types']
 
     @property
@@ -30,7 +30,7 @@ class MockRowDataProvider(IRowDataProvider):
         return self.data[0][ts_col_idx]
 
     @property
-    def column_names(self) -> Tuple[str]:
+    def column_names(self) -> List[str]:
         return self.headers
 
     @property
@@ -38,7 +38,7 @@ class MockRowDataProvider(IRowDataProvider):
         return {cname: ctype for cname, ctype in zip(self.headers, self.types)}
 
     def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]:
-        return map(lambda row: dict(zip(self.headers, row)), self.data)
+        yield from map(lambda row: dict(zip(self.headers, row)), self.data)
 
     def row_generator(self) -> Generator[List[type], None, None]:
         yield from self.data
diff --git a/FCRgendata/tests/iterpolator/test_interpolator.py b/FCRgendata/tests/iterpolator/test_interpolator.py
index 84e9f85..42ad5f6 100644
--- a/FCRgendata/tests/iterpolator/test_interpolator.py
+++ b/FCRgendata/tests/iterpolator/test_interpolator.py
@@ -90,12 +90,8 @@ def test_no_data_but_timestamps():
         i = 0
         while True:
             yield t0 + dt.timedelta(seconds=i)
-    try:
+    with pytest.raises(EmptyReaderExcpetion):
         interp = InterpolatedDataStream(reader=rowProv, timestamp_generator=ts_gen())
-    except EmptyReaderExcpetion:
-        assert True
-    else:
-        assert False
         
 
 @pytest.mark.interpolation
-- 
GitLab