From 53cf424f0a547bc0aeea75a58b0c52f1f2028f17 Mon Sep 17 00:00:00 2001 From: szymon sadkkowski <szysad108@gmail.com> Date: Thu, 4 Mar 2021 20:29:01 +0100 Subject: [PATCH 01/10] added some interpolator test, more to come (see TODO), few interpolator bugfixes --- FCRgendata/src/FCRGenData/interpolator.py | 23 +++- FCRgendata/tests/iterpolator/mock_data.py | 17 ++- .../tests/iterpolator/test_interpolator.py | 111 ++++++++++++++++-- 3 files changed, 136 insertions(+), 15 deletions(-) diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py index 2f9c285..15f71d5 100644 --- a/FCRgendata/src/FCRGenData/interpolator.py +++ b/FCRgendata/src/FCRGenData/interpolator.py @@ -2,10 +2,19 @@ import datetime as dt from typing import Dict, Generator from numbers import Number from scipy import interpolate +import itertools from FCRGenData.rawDataReader import IRowDataProvider +class InterpolatedDataStreamException(Exception): + pass + + +class EmptyReaderExcpetion(InterpolatedDataStreamException): + pass + + class InterpolatedDataStream: ''' generator that interpolates data given by @@ -46,8 +55,12 @@ class InterpolatedDataStream: timestamp_generator: Generator[dt.datetime, None, None], datasource=None, ): - self.ts_gen = timestamp_generator - self.t0 = next(iter(self.ts_gen)) + + gen = iter(timestamp_generator) + self.t0 = next(gen) + self.ts_gen = itertools.chain([self.t0], gen) + # trick so that we can look ahead for first elem of generator + self.last_ts = self.t0 self.ts_col_name = reader.timestamp_column_name ts_col_embed = reader.column_names.index(self.ts_col_name) @@ -70,6 +83,9 @@ class InterpolatedDataStream: intp_t = self.raw_data[cname] intp_t[0].append(tdelta_embed) intp_t[1].append(row[col_idx]) + + if any(map(lambda v: len(v[0]) == len(v[1]) == 0, self.raw_data.values())): + raise EmptyReaderExcpetion() # interpolate aggregated data self.interpolants = dict() @@ -77,11 +93,10 @@ class InterpolatedDataStream: self.interpolants[cname] = interpolate.interp1d(x=cdata[0], y=cdata[1], kind='linear') def __iter__(self): - self.ts_gen_iter = iter(self.ts_gen) return self def __next__(self) -> Dict: - ts = next(self.ts_gen_iter) + ts = next(self.ts_gen) if ts > self.last_ts: raise StopIteration diff --git a/FCRgendata/tests/iterpolator/mock_data.py b/FCRgendata/tests/iterpolator/mock_data.py index e61fee1..331cee5 100644 --- a/FCRgendata/tests/iterpolator/mock_data.py +++ b/FCRgendata/tests/iterpolator/mock_data.py @@ -1,5 +1,5 @@ import datetime as dt -from typing import TypedDict, List, Any, Iterator, Tuple +from typing import TypedDict, List, Any, Iterator, Tuple, Callable class MockData(TypedDict): @@ -35,3 +35,18 @@ def mock_data_factory( ts_col_name=ts_col_name, data=[row for row in zip(*col_gens)] ) + + +def lambda_generator(lamb: Callable[[int], Any], iters: int) -> Iterator[Any]: + ''' + simple wrapper for generator. + if iters eq -1, generate sequnce is infinite + ''' + + i = 0 + if iters == -1: + iters = float('inf') + + while i < iters: + yield lamb(i) + i += 1 diff --git a/FCRgendata/tests/iterpolator/test_interpolator.py b/FCRgendata/tests/iterpolator/test_interpolator.py index d19bfe7..c7b24ed 100644 --- a/FCRgendata/tests/iterpolator/test_interpolator.py +++ b/FCRgendata/tests/iterpolator/test_interpolator.py @@ -1,11 +1,12 @@ import pytest import datetime as dt +import random from typing import Dict, Any from numbers import Number -from FCRGenData.interpolator import InterpolatedDataStream +from FCRGenData.interpolator import InterpolatedDataStream, EmptyReaderExcpetion from .mock_row_provider import MockRowDataProvider -from .mock_data import mock_data_factory +from .mock_data import mock_data_factory, lambda_generator as lg def dict_eq_w_margin(d1: Dict[str, Any], d2: Dict[str, Any], margin: float) -> bool: @@ -30,17 +31,25 @@ def dict_eq_w_margin(d1: Dict[str, Any], d2: Dict[str, Any], margin: float) -> b @pytest.mark.interpolation def test_same_vals_at_interpolation_nodes(): + ''' + checks if values at interpolation nodes + are still same (with small margin) + ''' + MAX_ERR = 1e-4 - ROWS = 10 + ITERS = 10 t0 = dt.datetime(year=2000, month=1, day=1) - ts_gen = [t0 + dt.timedelta(minutes=m) for m in range(ROWS)] - timestamp_cname = 'ts' data = mock_data_factory( - header_names=[timestamp_cname, 'const1'], - ts_col_name=timestamp_cname, - col_types=[dt.datetime, int], - col_gens=[ts_gen, [1.] * ROWS] + header_names=['ts', 'rand1', 'rand2', 'rand3'], + ts_col_name='ts', + col_types=[dt.datetime, float, float, float], + col_gens=[ + lg(lambda n: t0 + dt.timedelta(minutes=n), ITERS), + lg(lambda n: -random.random(), ITERS), + lg(lambda n: random.random() * 100, ITERS), + lg(lambda n: random.random() * 1000, ITERS), + ] ) rowProvider = MockRowDataProvider( @@ -49,9 +58,91 @@ def test_same_vals_at_interpolation_nodes(): ) interp = InterpolatedDataStream( reader=rowProvider, - timestamp_generator=ts_gen, + timestamp_generator=lg(lambda n: t0 + dt.timedelta(minutes=n), ITERS * 2), ) rowProvGen = rowProvider.reader_annotated() for d1, d2 in zip(interp, rowProvGen): assert dict_eq_w_margin(d1, d2, MAX_ERR) + + +@pytest.mark.interpolation +def test_no_data_but_timestamps(): + ''' + checks if interpolator raises + correct exception when rowProvider returns + no rows + ''' + data = mock_data_factory( + header_names=['ts', 'v1', 'v2'], + ts_col_name='ts', + col_types=[dt.datetime, float], + col_gens=[range(0), range(0)] + ) + + rowProv = MockRowDataProvider( + data=data, + max_time_difference=dt.timedelta(seconds=1) + ) + + t0 = dt.datetime(year=2000, month=1, day=1) + def ts_gen(): + i = 0 + while True: + yield t0 + dt.timedelta(seconds=i) + try: + interp = InterpolatedDataStream(reader=rowProv, timestamp_generator=ts_gen()) + except EmptyReaderExcpetion: + assert True + else: + assert False + + +@pytest.mark.interpolation +def test_only_none_data(): + ''' + checks if interpolator raises + correct exception when rowProvider returns + only None values and correct timestamps + ''' + ITERS = 29 + t0 = dt.datetime(year=2000, month=1, day=1) + data = mock_data_factory( + header_names=['time stamp', 'i1', 'c2'], + ts_col_name='time stamp', + col_types=[dt.datetime, int, complex], + col_gens=[ + lg(lambda n: t0 + dt.timedelta(hours=n), ITERS), + lg(lambda n: None, ITERS), + lg(lambda n: None, ITERS) + ] + ) + + t0 = dt.datetime(year=2000, month=1, day=1) + rowProv = MockRowDataProvider(data, dt.timedelta(seconds=1)) + try: + interp = InterpolatedDataStream( + reader=rowProv, + timestamp_generator=lg(lambda n: t0 + dt.timedelta(days=2*n), -1) + ) + except EmptyReaderExcpetion as e: + assert True + else: + assert False + + +''' + TODO: + test if exception is raised if some cols contain full data + and some cols contain only None data + + test what happends when some col contain so little data + at boundaries that extrapolation is needed + + test if exception is raised if client tries to get thata + from out of bounds of interpolation (and extrapolation) + + test if data taken not from sample node is somewhat ok + + test what happens on larger dataset > 5000 sample nodes +''' -- GitLab From f72476663ab1554080c4a0a36d6ef472edca3b6e Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Thu, 4 Mar 2021 21:10:05 +0100 Subject: [PATCH 02/10] Data aggregator integration with rest of the code --- FCRgendata/src/FCRGenData/__main__.py | 89 ++++++++----------- FCRgendata/src/FCRGenData/config_schemas.py | 6 +- FCRgendata/src/FCRGenData/data_aggregator.py | 41 +++++++-- FCRgendata/src/FCRGenData/interpolator.py | 7 +- .../FCRGenData/rawDataReader/csv_reader.py | 10 ++- .../rawDataReader/row_data_reader.py | 2 + FCRgendata/src/FCRGenData/solv_summoner.py | 21 +++-- .../tests/data_reader/data_reader_template.py | 3 +- .../tests/data_reader/test_csv_reader.py | 7 +- 9 files changed, 102 insertions(+), 84 deletions(-) diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py index a81e509..6c49b6f 100644 --- a/FCRgendata/src/FCRGenData/__main__.py +++ b/FCRgendata/src/FCRGenData/__main__.py @@ -1,81 +1,62 @@ """ Generates data using CP-solver and FCR time series for network training """ - +import argparse import csv import logging -import sys from contextlib import ExitStack from pathlib import Path -import numpy as np -import numpy.lib.recfunctions as rfn -from progress.bar import IncrementalBar as IBar - +from FCRGenData.data_aggregator import DataAggregator from .solv_summoner import CPSolverSolutionSummoner, CPSolverSolutionSummonerError -from .validate_config import validate_config logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) -CONFIG_PATH: Path = None -def run(): - if len(sys.argv) != 2: - logging.critical( - f"received {len(sys.argv) - 1} arguments, required one: config file path.") - sys.exit(1) +def _json_path(path_str: str) -> Path: + if Path(path_str).is_file() and path_str.endswith('.json'): + return Path(path_str) else: - CONFIG_PATH = Path(sys.argv[1]) - - j_conf = validate_config(CONFIG_PATH) - avg_rsp_times = np.genfromtxt( # shape (n, ) - j_conf['AvgResponseTimeTableFilePath'], - delimiter=',', - dtype=np.dtype([('avg_rsp_time', np.float64), ('datetime', 'datetime64[s]')]), - skip_header=1, - usecols=(6, 1) - ) - - avg_rsp_prediction = np.genfromtxt( # shape (n, ) - j_conf['predictionsFilePath'], - delimiter=',', - dtype=np.dtype([('avg_rsp_time_pred', np.float64), ('datetime', 'datetime64[s]'), ('split', np.int)]), - skip_header=1, - usecols=(1, 3, 4), - converters={4: lambda split: {b"train": 0, b"val": 1, b"test": -1}[split]} - ) - - joined = rfn.join_by('datetime', avg_rsp_times, avg_rsp_prediction, jointype='inner', usemask=False) - assert len(joined) == len(avg_rsp_prediction) - # joined is structured array of tuples with dtype - # [('datetime', '<M8[s]'), ('avg_rsp_time', '<f8'), ('avg_rsp_time_pred', '<f8'), ('split', '<i8')] - - solver_host = j_conf['cpSolverHost'] - csv_header = ['timestamp', 'AvgResponseTime', 'AvgResponseTimePrediction', 'split', - 'cardinality_Component_LB', 'provider_Component_LB', 'AppCardinality', - 'cardinality_Component_DB', 'provider_Component_App', 'provider_Component_DB' - ] + raise argparse.ArgumentTypeError(f"{path_str} is not a valid path") + + +def run(): + # Argument parsing + parser = argparse.ArgumentParser(description="Very not advanced data aggregator with interpolation") + + parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file") + parser.add_argument('gendata_config', action='store', type=_json_path, + description="Path to cpsolver json config file") + + args = parser.parse_args() + + cpsolver_config_path = args.cp_config + gendata_config = args.gendata_config + + # Create aggregator + cpsolver = CPSolverSolutionSummoner(cpsolver_config_path) + + aggregator = DataAggregator(json_path=gendata_config) with ExitStack() as stack: # substiture for multiline with statement - summoner = stack.enter_context(CPSolverSolutionSummoner(j_conf)) - csvfile = stack.enter_context(open(j_conf['outpath'], 'w', newline='')) - bar = IBar('Progress', max=len(joined)) + summoner = stack.enter_context(cpsolver) + csvfile = stack.enter_context(open(aggregator.outpath, 'w', newline='')) writer = csv.writer(csvfile, delimiter=',') - writer.writerow(csv_header) + writer.writerow(aggregator.column_names) - for i, row in enumerate(joined): + it = 0 + cpsolver_column_index = aggregator.column_names.index(aggregator.avg_response_time_colname) + for row in aggregator.row_generator(): + it += 1 + print(f'Row {it}') try: - solution = summoner.get_solution(row[1]) + solution = summoner.get_solution(row[cpsolver_column_index]) writer.writerow((*row, *solution)) except CPSolverSolutionSummonerError as err: - bar.finish() logging.error( "CPSolverSolutionSummoner raised exception while " - f"summoning cp-solver near line {i} " + f"summoning cp-solver near line {it} " f"exception:\n{err}" ) - bar = IBar('Progress', max=len(joined) - i) - bar.next() - bar.finish() if __name__ == "__main__": diff --git a/FCRgendata/src/FCRGenData/config_schemas.py b/FCRgendata/src/FCRGenData/config_schemas.py index 9dfcdd5..a11ddf8 100644 --- a/FCRgendata/src/FCRGenData/config_schemas.py +++ b/FCRgendata/src/FCRGenData/config_schemas.py @@ -77,7 +77,9 @@ DATA_CONFIG_SCHEMA = { "timedelta": {"type": "int"}, "max_time_gap": - {"type": "int"} + {"type": "int"}, + "avg_response_time_colname": # temporary, input for cpsolver + {"type": "string"} }, - "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap"] + "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "avg_response_time_colname"] } diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py index 784f46d..0f8618e 100644 --- a/FCRgendata/src/FCRGenData/data_aggregator.py +++ b/FCRgendata/src/FCRGenData/data_aggregator.py @@ -1,7 +1,8 @@ import datetime as dt +import functools import logging from pathlib import Path -from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator +from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator, Iterable from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.interpolator import InterpolatedDataStream @@ -16,7 +17,7 @@ def _timestamp_generator(t0: dt.datetime, tstep: dt.timedelta) -> Generator[dt.d t += tstep -class AggregatedData: +class DataAggregator: """Aggregates data from multiple sources and uneven time periods Args: @@ -42,7 +43,7 @@ class AggregatedData: # Roottime data source roottime_data = None - + max_time_gap = dt.timedelta(seconds=self.__conf['max_time_gap']) # Parse data sources for datasource in self.__conf["datasources"]: desc = datasource['desc'] @@ -53,7 +54,8 @@ class AggregatedData: if source['type'] == 'csv': csv_path = Path(source['path']) - reader = RowCSVReader(csv_path, timestamp_column_name=timestamp_column) + reader = RowCSVReader(csv_path, timestamp_column_name=timestamp_column, + max_time_difference=max_time_gap) self.__datasources.append((datasource, reader)) else: raise NotImplementedError("Unsupported source type") @@ -79,6 +81,32 @@ class AggregatedData: self.__roottime_reader = roottime_data[1] self.__roottime_datasource = roottime_data[0] + @functools.cached_property + def avg_response_time_colname(self) -> str: # todo export + return self.__conf['avg_response_time_colname'] + + @functools.cached_property + def outpath(self) -> Path: + return Path(self.__conf['outpath']) + + @functools.cached_property + def column_names(self) -> List[str]: + """Names of the variable columns""" + header = [] + for value_name in self.__conf["aggregated_columns"]: + for datasource, rowdict in self.__datasources: + for value_info in datasource["values"]: + # if alias present + if 'alias' in value_info: + if value_info['alias'] == value_name: + header.append(value_info['colname']) + break + else: + if value_info['colname'] == value_name: + header.append(value_info['colname']) + break + return header + def row_generator(self) -> Generator[Mapping[str, any], None, None]: """Generates values mapping with correct and even time periods. Combines all values from different sources and unify their timestamps. @@ -91,8 +119,9 @@ class AggregatedData: for datasource, reader in self.__datasources: interpolated_streams.append( (datasource, - InterpolatedDataStream(datasource, reader, _timestamp_generator(self.__t0, self.__delta))), self.__max_time_gap) - + InterpolatedDataStream(datasource=datasource, + reader=reader, + timestamp_generator=_timestamp_generator(self.__t0, self.__delta)))) while True: grouped_values = [] timestamp = None diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py index 15f71d5..fbe06aa 100644 --- a/FCRgendata/src/FCRGenData/interpolator.py +++ b/FCRgendata/src/FCRGenData/interpolator.py @@ -47,8 +47,9 @@ class InterpolatedDataStream: performance issues. ''' - def __ts_embedding(self, ts: dt.datetime, ts0: dt.datetime) -> int: - return (ts - ts0).seconds + @staticmethod + def __ts_embedding(t1: dt.datetime, t2: dt.datetime) -> int: + return (t1 - t2).seconds def __init__(self, reader: IRowDataProvider, @@ -69,7 +70,7 @@ class InterpolatedDataStream: # col name -> (timestamp embedding, given value) self.raw_data = {cname: ([], []) for cname in self.intp_col_embed} - row_generator = reader.reader() + row_generator = reader.reader_annotated() # populate self.inp_data for row in row_generator: diff --git a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py index 66d526e..9bf3591 100644 --- a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py +++ b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py @@ -29,7 +29,7 @@ class RowCSVReader(IRowDataProvider): __column_names: Tuple[str] __column_types: Dict[str, type] - __timestamp_column_name: str + _timestamp_column_name: str def __init__(self, path: Union[Path, str], @@ -130,11 +130,13 @@ class RowCSVReader(IRowDataProvider): Raises: TooBigTimeDifference """ + time_index = list(self.columns).index(self.timestamp_column_name) + for index, row in self.__arr.iterrows(): values = row.values.tolist() - values = map(RowCSVReader._convert_to_pytype, values) - self._check_time_difference(getattr(values, self.__timestamp_column_name)) - yield list(values) + values = list(map(RowCSVReader._convert_to_pytype, values)) + self._check_time_difference(values[time_index]) + yield values def reader_annotated(self) -> Generator[Dict[str, any], None, None]: """Returns dict iterator over rows. diff --git a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py index ff7d436..58686e2 100644 --- a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py +++ b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py @@ -38,6 +38,8 @@ class IRowDataProvider(abc.ABC): if self.__previous_timestamp + self.__max_time_difference < timestamp: raise TooBigTimeDifference( f'Previous timestamp: {self.__previous_timestamp}, actual timestamp: {timestamp}, max difference: {self.__max_time_difference}') + else: + self.__previous_timestamp = timestamp @property @abc.abstractmethod diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py index 0378848..c351288 100644 --- a/FCRgendata/src/FCRGenData/solv_summoner.py +++ b/FCRgendata/src/FCRGenData/solv_summoner.py @@ -1,26 +1,31 @@ from pathlib import Path + from lxml import etree -from tempfile import NamedTemporaryFile +import json +import os from contextlib import ContextDecorator -from typing import List from functools import lru_cache -import logging +from tempfile import NamedTemporaryFile +from typing import List + import requests -import json -import os +from lxml import etree +from FCRGenData.config_schemas import CPSOLVER_CONFIG_SCHEMA +from FCRGenData.validate_config import validate_config -CACHE_SIZE = 2**10 +CACHE_SIZE = 2 ** 10 class CPSolverSolutionSummonerError(Exception): pass -class CPSolverSolutionSummoner(ContextDecorator): +class CPSolverSolutionSummoner: """ calls cp-solver for solution, and manages additional resources """ - def __init__(self, config): + def __init__(self, cpsolver_config_path: Path): + config = validate_config(cpsolver_config_path, CPSOLVER_CONFIG_SCHEMA) self.solver_host = config['cpSolverHost'] self.solution_file = f"{config['request']['camelModelFilePath'][:-4]}-solution.xmi" self.FCRcpxml = etree.parse(config['request']['cpProblemFilePath']) diff --git a/FCRgendata/tests/data_reader/data_reader_template.py b/FCRgendata/tests/data_reader/data_reader_template.py index e984732..ce0ae2f 100644 --- a/FCRgendata/tests/data_reader/data_reader_template.py +++ b/FCRgendata/tests/data_reader/data_reader_template.py @@ -102,8 +102,9 @@ class RawDataProviderTestTemplate(abc.ABC): def test_read(self, reader_factory): header = ['time', 'int', 'str', 'float', 'bool'] content = [] + t0 = dt.datetime.now() for i in range(5): - row = [dt.datetime.now() + dt.timedelta(days=i), i, str(i) + 'pln', i / 10, i % 2 == 0] + row = [t0 + dt.timedelta(days=i), i, str(i) + 'pln', i / 10, i % 2 == 0] assert len(row) == len(header) content.append(row) diff --git a/FCRgendata/tests/data_reader/test_csv_reader.py b/FCRgendata/tests/data_reader/test_csv_reader.py index a0f6ed6..8c92650 100644 --- a/FCRgendata/tests/data_reader/test_csv_reader.py +++ b/FCRgendata/tests/data_reader/test_csv_reader.py @@ -2,6 +2,7 @@ import tempfile from typing import Callable import pytest +import datetime as dt from FCRGenData.rawDataReader import RowCSVReader from .data_reader_template import RawDataProviderTestTemplate @@ -9,12 +10,6 @@ from .data_reader_template import RawDataProviderTestTemplate class TestCSVReader(RawDataProviderTestTemplate): - @pytest.fixture - def empty_reader(self) -> RowCSVReader: - tfile = tempfile.NamedTemporaryFile(suffix='.csv') - yield RowCSVReader(tfile.name) - tfile.close() - @pytest.fixture def reader_factory(self) -> Callable[[str], RowCSVReader]: tfile = tempfile.NamedTemporaryFile(suffix='.csv', mode='w') -- GitLab From fa7d14259656a636342e2cea976665da18de689d Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Fri, 5 Mar 2021 17:25:27 +0100 Subject: [PATCH 03/10] Missing functions definition, integration with aggregator --- FCRgendata/src/FCRGenData/__main__.py | 7 ++- FCRgendata/src/FCRGenData/config_schemas.py | 7 ++- FCRgendata/src/FCRGenData/data_aggregator.py | 45 +++++++++---------- FCRgendata/src/FCRGenData/interpolator.py | 8 ++-- .../FCRGenData/rawDataReader/csv_reader.py | 6 +-- .../rawDataReader/row_data_reader.py | 4 +- FCRgendata/src/FCRGenData/solv_summoner.py | 7 +-- FCRgendata/src/FCRGenData/validate_config.py | 8 ++-- .../tests/data_reader/data_reader_template.py | 4 +- .../tests/iterpolator/mock_row_provider.py | 4 +- .../tests/iterpolator/test_interpolator.py | 2 +- 11 files changed, 51 insertions(+), 51 deletions(-) diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py index 6c49b6f..4db0c2a 100644 --- a/FCRgendata/src/FCRGenData/__main__.py +++ b/FCRgendata/src/FCRGenData/__main__.py @@ -44,12 +44,15 @@ def run(): writer.writerow(aggregator.column_names) it = 0 - cpsolver_column_index = aggregator.column_names.index(aggregator.avg_response_time_colname) + solver_cols_indexes = [aggregator.column_names.index(colname) for colname in aggregator.solver_column_names] + + # We can also get annotated rows (what should be easier?) for row in aggregator.row_generator(): it += 1 print(f'Row {it}') try: - solution = summoner.get_solution(row[cpsolver_column_index]) + solver_params = map(lambda index: row[index], solver_cols_indexes) + solution = summoner.get_solution(solver_params) writer.writerow((*row, *solution)) except CPSolverSolutionSummonerError as err: logging.error( diff --git a/FCRgendata/src/FCRGenData/config_schemas.py b/FCRgendata/src/FCRGenData/config_schemas.py index a11ddf8..31c29eb 100644 --- a/FCRgendata/src/FCRGenData/config_schemas.py +++ b/FCRgendata/src/FCRGenData/config_schemas.py @@ -78,8 +78,11 @@ DATA_CONFIG_SCHEMA = { {"type": "int"}, "max_time_gap": {"type": "int"}, - "avg_response_time_colname": # temporary, input for cpsolver - {"type": "string"} + "solver_cols": + {"type": "array", + "minItems": 1, + "items": {"type": "string"} + } }, "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "avg_response_time_colname"] } diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py index 0f8618e..b56e7ff 100644 --- a/FCRgendata/src/FCRGenData/data_aggregator.py +++ b/FCRgendata/src/FCRGenData/data_aggregator.py @@ -1,8 +1,7 @@ import datetime as dt -import functools import logging from pathlib import Path -from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator, Iterable +from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.interpolator import InterpolatedDataStream @@ -81,33 +80,33 @@ class DataAggregator: self.__roottime_reader = roottime_data[1] self.__roottime_datasource = roottime_data[0] - @functools.cached_property - def avg_response_time_colname(self) -> str: # todo export - return self.__conf['avg_response_time_colname'] - - @functools.cached_property + @property def outpath(self) -> Path: return Path(self.__conf['outpath']) - @functools.cached_property + @property + def solver_column_names(self) -> List[str]: + """List of the column names reuired by the cpsolver""" + return self.__conf['solver_cols'] + + @property def column_names(self) -> List[str]: """Names of the variable columns""" - header = [] - for value_name in self.__conf["aggregated_columns"]: - for datasource, rowdict in self.__datasources: - for value_info in datasource["values"]: - # if alias present - if 'alias' in value_info: - if value_info['alias'] == value_name: - header.append(value_info['colname']) - break - else: - if value_info['colname'] == value_name: - header.append(value_info['colname']) - break - return header + return self.__conf['aggregated_columns'] + + def row_generator(self) -> Generator[List[type], None, None]: + """Generator over raw data, provides rows of data in increasing order (by timestamp). + Returns dict with column names mapping to current values. + """ + for values_dict in self.row_generator_annotated(): + values_list = [] + + for value_name in self.__conf["aggregated_columns"]: + values_list.append(values_dict[value_name]) + + yield values_list - def row_generator(self) -> Generator[Mapping[str, any], None, None]: + def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]: """Generates values mapping with correct and even time periods. Combines all values from different sources and unify their timestamps. diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py index fbe06aa..e06d445 100644 --- a/FCRgendata/src/FCRGenData/interpolator.py +++ b/FCRgendata/src/FCRGenData/interpolator.py @@ -18,15 +18,15 @@ class EmptyReaderExcpetion(InterpolatedDataStreamException): class InterpolatedDataStream: ''' generator that interpolates data given by - reader in points given by timestamp_generator + row_generator in points given by timestamp_generator with additional config from datasource. Interpolator returns dict which includes: - 1. Mapping column names given by reader + 1. Mapping column names given by row_generator (only numerical types) to its interpolated values in points returned by timestamp_generator. - 2. Mapping from reader column name that + 2. Mapping from row_generator column name that contains data timestamp to timestamp given by timestamp_generator @@ -70,7 +70,7 @@ class InterpolatedDataStream: # col name -> (timestamp embedding, given value) self.raw_data = {cname: ([], []) for cname in self.intp_col_embed} - row_generator = reader.reader_annotated() + row_generator = reader.row_generator_annotated() # populate self.inp_data for row in row_generator: diff --git a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py index 9bf3591..4b79499 100644 --- a/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py +++ b/FCRgendata/src/FCRGenData/rawDataReader/csv_reader.py @@ -17,7 +17,7 @@ def _match_columns(column: List[str], keys: List[str]) -> List[str]: class RowCSVReader(IRowDataProvider): - """CSV data reader implementation.""" + """CSV data row_generator implementation.""" __path: Path @@ -121,7 +121,7 @@ class RowCSVReader(IRowDataProvider): else: return value - def reader(self) -> Generator[Iterable[type], None, None]: + def row_generator(self) -> Generator[Iterable[type], None, None]: """Returns iterator over rows. Yields: @@ -138,7 +138,7 @@ class RowCSVReader(IRowDataProvider): self._check_time_difference(values[time_index]) yield values - def reader_annotated(self) -> Generator[Dict[str, any], None, None]: + def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]: """Returns dict iterator over rows. Yields: diff --git a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py index 58686e2..01f7aed 100644 --- a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py +++ b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py @@ -65,14 +65,14 @@ class IRowDataProvider(abc.ABC): pass @abc.abstractmethod - def reader(self) -> Generator[Iterable[type], None, None]: + def row_generator(self) -> Generator[Iterable[type], None, None]: """Generator over raw data, provides rows of data in increasing order (by timestamp) Returns list with values (order same as in column_names). """ pass @abc.abstractmethod - def reader_annotated(self) -> Generator[Dict[str, any], None, None]: + def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]: """Generator over raw data, provides rows of data in increasing order (by timestamp). Returns dict with column names mapping to current values. """ diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py index c351288..ba01b90 100644 --- a/FCRgendata/src/FCRGenData/solv_summoner.py +++ b/FCRgendata/src/FCRGenData/solv_summoner.py @@ -1,10 +1,7 @@ -from pathlib import Path - -from lxml import etree import json import os -from contextlib import ContextDecorator from functools import lru_cache +from pathlib import Path from tempfile import NamedTemporaryFile from typing import List @@ -45,7 +42,7 @@ class CPSolverSolutionSummoner: return False @lru_cache(maxsize=CACHE_SIZE) - def get_solution(self, avg_rsp_time: float) -> List[int]: + def get_solution(self, avg_rsp_time: float) -> List[int]: # TODO fix """ returns solution as list of int """ self.avg_rsp_time_xelem.set('value', str(avg_rsp_time)) diff --git a/FCRgendata/src/FCRGenData/validate_config.py b/FCRgendata/src/FCRGenData/validate_config.py index f7b5a6b..5448a68 100644 --- a/FCRgendata/src/FCRGenData/validate_config.py +++ b/FCRgendata/src/FCRGenData/validate_config.py @@ -4,15 +4,13 @@ Defines validate_config function responsible for validating path to config file, it's structure and content """ -from pathlib import Path -import logging import json +import logging +import sys +from pathlib import Path from typing import Dict import jsonschema -import sys - - # create two custom types: "filepath" (path to exsisting file) # and "creatablepath" (path in which new file can be created) diff --git a/FCRgendata/tests/data_reader/data_reader_template.py b/FCRgendata/tests/data_reader/data_reader_template.py index ce0ae2f..ab4c3ee 100644 --- a/FCRgendata/tests/data_reader/data_reader_template.py +++ b/FCRgendata/tests/data_reader/data_reader_template.py @@ -115,11 +115,11 @@ class RawDataProviderTestTemplate(abc.ABC): reader: IRowDataProvider = reader_factory(s.read(), timestamp_column_name='time', max_time_difference=dt.timedelta(days=1)) - row_gen = reader.reader() + row_gen = reader.row_generator() for row, grow in zip(content, row_gen): assert row == grow - dict_gen = reader.reader_annotated() + dict_gen = reader.row_generator_annotated() for row, row_dict in zip(content, dict_gen): for col_name, value in zip(header, row): assert row_dict[col_name] == value diff --git a/FCRgendata/tests/iterpolator/mock_row_provider.py b/FCRgendata/tests/iterpolator/mock_row_provider.py index 0f694bd..f48afdc 100644 --- a/FCRgendata/tests/iterpolator/mock_row_provider.py +++ b/FCRgendata/tests/iterpolator/mock_row_provider.py @@ -37,8 +37,8 @@ class MockRowDataProvider(IRowDataProvider): def columns(self) -> Dict[str, type]: return {cname: ctype for cname, ctype in zip(self.headers, self.types)} - def reader_annotated(self) -> Generator[Dict[str, any], None, None]: + def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]: return map(lambda row: dict(zip(self.headers, row)), self.data) - def reader(self) -> Generator[List[type], None, None]: + def row_generator(self) -> Generator[List[type], None, None]: yield from self.data diff --git a/FCRgendata/tests/iterpolator/test_interpolator.py b/FCRgendata/tests/iterpolator/test_interpolator.py index c7b24ed..84e9f85 100644 --- a/FCRgendata/tests/iterpolator/test_interpolator.py +++ b/FCRgendata/tests/iterpolator/test_interpolator.py @@ -61,7 +61,7 @@ def test_same_vals_at_interpolation_nodes(): timestamp_generator=lg(lambda n: t0 + dt.timedelta(minutes=n), ITERS * 2), ) - rowProvGen = rowProvider.reader_annotated() + rowProvGen = rowProvider.row_generator_annotated() for d1, d2 in zip(interp, rowProvGen): assert dict_eq_w_margin(d1, d2, MAX_ERR) -- GitLab From 4047411e9758a50995da008d2cd0bfff633b4c2d Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Sat, 6 Mar 2021 19:22:28 +0100 Subject: [PATCH 04/10] Added support for multiple params in solver summoner + docker support --- FCRgendata/src/FCRGenData/solv_summoner.py | 187 ++++++++++++++++++--- 1 file changed, 165 insertions(+), 22 deletions(-) diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py index ba01b90..910c7ee 100644 --- a/FCRgendata/src/FCRGenData/solv_summoner.py +++ b/FCRgendata/src/FCRGenData/solv_summoner.py @@ -1,11 +1,17 @@ +import abc import json -import os +import tarfile +import time from functools import lru_cache +from io import BytesIO from pathlib import Path from tempfile import NamedTemporaryFile -from typing import List +from typing import List, Optional, Dict +from xml.etree.ElementTree import ElementTree, Element +import docker import requests +from docker.models.containers import Container from lxml import etree from FCRGenData.config_schemas import CPSOLVER_CONFIG_SCHEMA @@ -14,49 +20,186 @@ from FCRGenData.validate_config import validate_config CACHE_SIZE = 2 ** 10 +def create_archive(filepath: Path) -> BytesIO: + """Packs content of the file to the tar archive. + + Args: + filepath(Path): path to the file. + + Returns: + (BytesIO) stream of the tarballed file. + """ + pw_tarstream = BytesIO() + pw_tar = tarfile.TarFile(fileobj=pw_tarstream, mode='w') + with open(filepath, 'rb') as file: + file_data = file.read() + tarinfo = tarfile.TarInfo(name=filepath.name) + tarinfo.size = len(file_data) + tarinfo.mtime = time.time() + pw_tar.addfile(tarinfo, BytesIO(file_data)) + pw_tar.close() + pw_tarstream.seek(0) + return pw_tarstream + + +def docker_put(container: Container, src: Path, dst: Path) -> None: + """Copies specified file to the container. + + Args: + container(Container): destination container. + src(Path): path to the source file. + dst(Path): destination path in the container. + """ + with create_archive(src) as archive: + container.put_archive(str(dst), archive) + + +def docker_get(container, src, dst) -> None: + """Copies specified file from container. + + Args: + container(Container): source container. + src(Path): path to the source file inside the container. + dst(Path): destination path where to store file. + """ + with open(dst, 'wb') as file: + file.write(container.get_archive(str(src))) + + class CPSolverSolutionSummonerError(Exception): pass -class CPSolverSolutionSummoner: +class CPSolverSolutionSummoner(abc.ABC): + _config: Dict + + _solver_host: str + _camelModelFilePath: Path + _solutionFilePath: Path + _FCRcpxml_path: Path + _FCRcpxml: ElementTree # TODO VERIFY + _nodeCandidatesFilePath: Path + + _xml_fillings: Dict[str, Element] # TODO check + _request_part: Dict + + def __init__(self, + cpsolver_config_path: Path, + args_names: List[str]): + self._config = validate_config(cpsolver_config_path, CPSOLVER_CONFIG_SCHEMA) + self._solver_host = self._config['cpSolverHost'] + self._camelModelFilePath = Path(self._config['request']['camelModelFilePath']) + + self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi") + self._FCRcpxml_path = Path(self._config['request']['cpProblemFilePath']) + self._FCRcpxml = etree.parse(self._FCRcpxml_path) + self._nodeCandidatesFilePath = Path(self._config['request']['nodeCandidatesFilePath']) + + self._xml_fillings = {} + for name in args_names: + self._xml_fillings[name] = self._FCRcpxml.find( + f"cpMetrics[@id='{name}']")[0] # TODO check + self._request_part = self._config['request'] + + @abc.abstractmethod + def get_solution(self, avg_rsp_time: float) -> List[int]: + pass + + +class LocalCPSolverSolutionSummoner(CPSolverSolutionSummoner): """ calls cp-solver for solution, and manages additional resources """ - def __init__(self, cpsolver_config_path: Path): - config = validate_config(cpsolver_config_path, CPSOLVER_CONFIG_SCHEMA) - self.solver_host = config['cpSolverHost'] - self.solution_file = f"{config['request']['camelModelFilePath'][:-4]}-solution.xmi" - self.FCRcpxml = etree.parse(config['request']['cpProblemFilePath']) - self.avg_rsp_time_xelem, = self.FCRcpxml.find( - "cpMetrics[@id='AvgResponseTime']") - self.req = config['request'] + def __enter__(self): + self.tempfile = NamedTemporaryFile() + self._request_part['cpProblemFilePath'] = self.tempfile.name + return self + + def __exit__(self, *exc): + self.tempfile.close() + if self._solutionFilePath.exists(): + self._solutionFilePath.unlink() + return False + + @lru_cache(maxsize=CACHE_SIZE) + def get_solution(self, solver_params: Dict) -> List[int]: # TODO fix + """ returns solution as list of int """ + for arg_name, arg_loc in self._xml_fillings: + arg_loc.set('value', str(solver_params[arg_name])) + + self._FCRcpxml.write(self.tempfile.name, + xml_declaration=True, + encoding="ASCII") + r = requests.post(f'http://{self._solver_host}/constraintProblemSolutionFromFile', + data=json.dumps(self._request_part), + headers={'Content-Type': 'application/json'} + ) + + if r.status_code != 200: + raise Exception("cp-solver returned non 200 status code") + + sol_xelem = etree.parse(self._solutionFilePath).find("solution") + if sol_xelem is None: + raise CPSolverSolutionSummonerError(f"cp-solver didn't return solution for avg_rsp_time") + return [int(v.get('value', 0)) for v in sol_xelem.iterfind(".//value")] + + +class DockerCPSolverSolutionSummoner(CPSolverSolutionSummoner): + __container: Container = None + + def __init__(self, + cpsolver_config_path: Path, + args_names: List[str], + container_name: str = None, + docker_url: Optional[str] = None): + super().__init__(cpsolver_config_path, args_names) + if docker_url: + docker_client = docker.DockerClient(base_url=docker_url) + else: + docker_client = docker.from_env() + + assert container_name is not None, 'Container name not specified!' + self.__container = docker_client.containers.get(container_name) + self._request_part['camelModelFilePath'] = f'/res/{self._camelModelFilePath.name}' + docker_put(self.__container, self._camelModelFilePath, Path('/res')) + + self._request_part['cpProblemFilePath'] = f'/res/{self._FCRcpxml_path.name}' + docker_put(self.__container, self._FCRcpxml_path, Path('/res')) + + self._request_part['nodeCandidatesFilePath'] = f'/res/{self._nodeCandidatesFilePath.name}' + docker_put(self.__container, self._nodeCandidatesFilePath, Path(f'/res')) def __enter__(self): self.tempfile = NamedTemporaryFile() - self.req['cpProblemFilePath'] = self.tempfile.name + self._request_part['cpProblemFilePath'] = f'/tmp/{Path(self.tempfile.name).name}' return self def __exit__(self, *exc): self.tempfile.close() - if os.path.exists(self.solution_file): - os.remove(self.solution_file) + if self._solutionFilePath.exists(): + self._solutionFilePath.unlink() return False @lru_cache(maxsize=CACHE_SIZE) - def get_solution(self, avg_rsp_time: float) -> List[int]: # TODO fix + def get_solution(self, solver_params: Dict) -> List[int]: """ returns solution as list of int """ - self.avg_rsp_time_xelem.set('value', str(avg_rsp_time)) - self.FCRcpxml.write(self.tempfile.name, - xml_declaration=True, encoding="ASCII") - r = requests.post(f'http://{self.solver_host}/constraintProblemSolutionFromFile', - data=json.dumps(self.req), + for arg_name, arg_loc in self._xml_fillings: + arg_loc.set('value', str(solver_params[arg_name])) + self._FCRcpxml.write(self.tempfile.name, + xml_declaration=True, encoding="ASCII") + docker_put(self.__container, src=Path(self.tempfile.name), dst=Path('/tmp')) + + r = requests.post(f'http://{self._solver_host}/constraintProblemSolutionFromFile', + data=json.dumps(self._request_part), headers={'Content-Type': 'application/json'} ) if r.status_code != 200: raise Exception("cp-solver returned non 200 status code") - sol_xelem = etree.parse(self.solution_file).find("solution") + docker_get(self.__container, Path(f'/res/{self._solutionFilePath.name}'), self._solutionFilePath) + + sol_xelem = etree.parse(self._solutionFilePath).find("solution") if sol_xelem is None: - raise CPSolverSolutionSummonerError(f"cp-solver didn't return solution for avg_rsp_time = {avg_rsp_time}") + raise CPSolverSolutionSummonerError(f"cp-solver didn't return solution") return [int(v.get('value', 0)) for v in sol_xelem.iterfind(".//value")] -- GitLab From 4c8eb68490a60d6ee717513b617d674b72ac68c8 Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Sat, 6 Mar 2021 21:10:11 +0100 Subject: [PATCH 05/10] Main script compatible with new interface --- FCRgendata/data_config.json | 1 + FCRgendata/src/FCRGenData/__main__.py | 35 +++++++++++++++----- FCRgendata/src/FCRGenData/data_aggregator.py | 18 ++-------- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/FCRgendata/data_config.json b/FCRgendata/data_config.json index f563394..29017d0 100644 --- a/FCRgendata/data_config.json +++ b/FCRgendata/data_config.json @@ -21,6 +21,7 @@ } ], "aggregated_columns": ["value1", "value2", "value3"], + "solver_cols":["value1", "value2"], "timedelta": 10, "max_time_gap": 100 } \ No newline at end of file diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py index 4db0c2a..f51ae26 100644 --- a/FCRgendata/src/FCRGenData/__main__.py +++ b/FCRgendata/src/FCRGenData/__main__.py @@ -5,8 +5,11 @@ import logging from contextlib import ExitStack from pathlib import Path +from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.data_aggregator import DataAggregator -from .solv_summoner import CPSolverSolutionSummoner, CPSolverSolutionSummonerError +from FCRGenData.validate_config import validate_config +from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \ + LocalCPSolverSolutionSummoner logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) @@ -25,33 +28,47 @@ def run(): parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file") parser.add_argument('gendata_config', action='store', type=_json_path, description="Path to cpsolver json config file") + parser.add_argument('-r', '--docker', action='store_true', type=bool, default=False, + description="Use dockerized cpsolver instead of localhost") + parser.add_argument('--container', action='store', type=str, default=None, + description='Name of the docker container') + parser.add_argument('--docker-url', action='store', type=str, default=None, + description='URL to the remote docker (local by default)') args = parser.parse_args() cpsolver_config_path = args.cp_config - gendata_config = args.gendata_config + gendata_config_path = Path(args.gendata_config) + + gendata_config = validate_config(gendata_config_path, DATA_CONFIG_SCHEMA) # Create aggregator - cpsolver = CPSolverSolutionSummoner(cpsolver_config_path) + if args.docker: + cpsolver = DockerCPSolverSolutionSummoner(cpsolver_config_path, + gendata_config['solver_cols'], + args.container, + args.docker_url) + else: + + cpsolver = LocalCPSolverSolutionSummoner(cpsolver_config_path, + gendata_config['solver_cols']) - aggregator = DataAggregator(json_path=gendata_config) + aggregator = DataAggregator(gendata_config) with ExitStack() as stack: # substiture for multiline with statement summoner = stack.enter_context(cpsolver) - csvfile = stack.enter_context(open(aggregator.outpath, 'w', newline='')) + csvfile = stack.enter_context(open(gendata_config['outpath'], 'w', newline='')) writer = csv.writer(csvfile, delimiter=',') writer.writerow(aggregator.column_names) it = 0 - solver_cols_indexes = [aggregator.column_names.index(colname) for colname in aggregator.solver_column_names] - # We can also get annotated rows (what should be easier?) - for row in aggregator.row_generator(): + for row in aggregator.row_generator_annotated(): it += 1 print(f'Row {it}') try: - solver_params = map(lambda index: row[index], solver_cols_indexes) + solver_params = {name: row[name] for name in gendata_config['solver_cols']} solution = summoner.get_solution(solver_params) writer.writerow((*row, *solution)) except CPSolverSolutionSummonerError as err: diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py index b56e7ff..80b45dc 100644 --- a/FCRgendata/src/FCRGenData/data_aggregator.py +++ b/FCRgendata/src/FCRGenData/data_aggregator.py @@ -3,10 +3,8 @@ import logging from pathlib import Path from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator -from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.interpolator import InterpolatedDataStream from FCRGenData.rawDataReader import RowCSVReader, IRowDataProvider -from FCRGenData.validate_config import validate_config def _timestamp_generator(t0: dt.datetime, tstep: dt.timedelta) -> Generator[dt.datetime, None, None]: @@ -33,11 +31,8 @@ class DataAggregator: __delta: dt.timedelta __max_time_gap: dt.timedelta - def __init__(self, json_path: Union[str, Path]): - if isinstance(json_path, str): - json_path = Path(json_path) - - self.__conf = validate_config(json_path, DATA_CONFIG_SCHEMA) + def __init__(self, config: Dict): + self.__conf = config self.__datasources = [] # Roottime data source @@ -80,15 +75,6 @@ class DataAggregator: self.__roottime_reader = roottime_data[1] self.__roottime_datasource = roottime_data[0] - @property - def outpath(self) -> Path: - return Path(self.__conf['outpath']) - - @property - def solver_column_names(self) -> List[str]: - """List of the column names reuired by the cpsolver""" - return self.__conf['solver_cols'] - @property def column_names(self) -> List[str]: """Names of the variable columns""" -- GitLab From 847929d3eb1ac5cff685a92d924a62d67da4f96e Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Sat, 6 Mar 2021 21:10:11 +0100 Subject: [PATCH 06/10] Main script compatible with new interface --- FCRgendata/data_config.json | 3 +- FCRgendata/src/FCRGenData/__main__.py | 35 +++++++++++++++----- FCRgendata/src/FCRGenData/data_aggregator.py | 18 ++-------- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/FCRgendata/data_config.json b/FCRgendata/data_config.json index f563394..9be9b42 100644 --- a/FCRgendata/data_config.json +++ b/FCRgendata/data_config.json @@ -1,5 +1,5 @@ { - "outpath": "/home/szysad/mimuw/3rok/ZPP/my-time-series/FCR-time-series/output/generated-data4.csv", + "outpath": "output.csv", "datasources": [ { "desc": "desc <optinal>", @@ -21,6 +21,7 @@ } ], "aggregated_columns": ["value1", "value2", "value3"], + "solver_cols":["value1", "value2"], "timedelta": 10, "max_time_gap": 100 } \ No newline at end of file diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py index 4db0c2a..f51ae26 100644 --- a/FCRgendata/src/FCRGenData/__main__.py +++ b/FCRgendata/src/FCRGenData/__main__.py @@ -5,8 +5,11 @@ import logging from contextlib import ExitStack from pathlib import Path +from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.data_aggregator import DataAggregator -from .solv_summoner import CPSolverSolutionSummoner, CPSolverSolutionSummonerError +from FCRGenData.validate_config import validate_config +from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \ + LocalCPSolverSolutionSummoner logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) @@ -25,33 +28,47 @@ def run(): parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file") parser.add_argument('gendata_config', action='store', type=_json_path, description="Path to cpsolver json config file") + parser.add_argument('-r', '--docker', action='store_true', type=bool, default=False, + description="Use dockerized cpsolver instead of localhost") + parser.add_argument('--container', action='store', type=str, default=None, + description='Name of the docker container') + parser.add_argument('--docker-url', action='store', type=str, default=None, + description='URL to the remote docker (local by default)') args = parser.parse_args() cpsolver_config_path = args.cp_config - gendata_config = args.gendata_config + gendata_config_path = Path(args.gendata_config) + + gendata_config = validate_config(gendata_config_path, DATA_CONFIG_SCHEMA) # Create aggregator - cpsolver = CPSolverSolutionSummoner(cpsolver_config_path) + if args.docker: + cpsolver = DockerCPSolverSolutionSummoner(cpsolver_config_path, + gendata_config['solver_cols'], + args.container, + args.docker_url) + else: + + cpsolver = LocalCPSolverSolutionSummoner(cpsolver_config_path, + gendata_config['solver_cols']) - aggregator = DataAggregator(json_path=gendata_config) + aggregator = DataAggregator(gendata_config) with ExitStack() as stack: # substiture for multiline with statement summoner = stack.enter_context(cpsolver) - csvfile = stack.enter_context(open(aggregator.outpath, 'w', newline='')) + csvfile = stack.enter_context(open(gendata_config['outpath'], 'w', newline='')) writer = csv.writer(csvfile, delimiter=',') writer.writerow(aggregator.column_names) it = 0 - solver_cols_indexes = [aggregator.column_names.index(colname) for colname in aggregator.solver_column_names] - # We can also get annotated rows (what should be easier?) - for row in aggregator.row_generator(): + for row in aggregator.row_generator_annotated(): it += 1 print(f'Row {it}') try: - solver_params = map(lambda index: row[index], solver_cols_indexes) + solver_params = {name: row[name] for name in gendata_config['solver_cols']} solution = summoner.get_solution(solver_params) writer.writerow((*row, *solution)) except CPSolverSolutionSummonerError as err: diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py index b56e7ff..80b45dc 100644 --- a/FCRgendata/src/FCRGenData/data_aggregator.py +++ b/FCRgendata/src/FCRGenData/data_aggregator.py @@ -3,10 +3,8 @@ import logging from pathlib import Path from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator -from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.interpolator import InterpolatedDataStream from FCRGenData.rawDataReader import RowCSVReader, IRowDataProvider -from FCRGenData.validate_config import validate_config def _timestamp_generator(t0: dt.datetime, tstep: dt.timedelta) -> Generator[dt.datetime, None, None]: @@ -33,11 +31,8 @@ class DataAggregator: __delta: dt.timedelta __max_time_gap: dt.timedelta - def __init__(self, json_path: Union[str, Path]): - if isinstance(json_path, str): - json_path = Path(json_path) - - self.__conf = validate_config(json_path, DATA_CONFIG_SCHEMA) + def __init__(self, config: Dict): + self.__conf = config self.__datasources = [] # Roottime data source @@ -80,15 +75,6 @@ class DataAggregator: self.__roottime_reader = roottime_data[1] self.__roottime_datasource = roottime_data[0] - @property - def outpath(self) -> Path: - return Path(self.__conf['outpath']) - - @property - def solver_column_names(self) -> List[str]: - """List of the column names reuired by the cpsolver""" - return self.__conf['solver_cols'] - @property def column_names(self) -> List[str]: """Names of the variable columns""" -- GitLab From aed967bd7263272d2e543400385b5b410f98b80a Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Sun, 7 Mar 2021 01:19:50 +0100 Subject: [PATCH 07/10] Bug fixes --- FCRgendata/data_config.json | 4 ++-- FCRgendata/src/FCRGenData/solv_summoner.py | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/FCRgendata/data_config.json b/FCRgendata/data_config.json index 9be9b42..b72a2e6 100644 --- a/FCRgendata/data_config.json +++ b/FCRgendata/data_config.json @@ -2,7 +2,7 @@ "outpath": "output.csv", "datasources": [ { - "desc": "desc <optinal>", + "desc": "desc <optional>", "files": [ { "desc": "desc", @@ -13,7 +13,7 @@ "values": [ { "colname": "value", - "alias": "valiue1", + "alias": "value1", "cumulation_period": 60 } ], diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py index 7928a1f..910c7ee 100644 --- a/FCRgendata/src/FCRGenData/solv_summoner.py +++ b/FCRgendata/src/FCRGenData/solv_summoner.py @@ -19,8 +19,6 @@ from FCRGenData.validate_config import validate_config CACHE_SIZE = 2 ** 10 -def create_archive(filepath: Path) -> BytesIO: - """Packs content of the file to the tar archive. def create_archive(filepath: Path) -> BytesIO: """Packs content of the file to the tar archive. -- GitLab From df1e013a7721f2199c47faf086ad3a95d7c3d642 Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Sun, 7 Mar 2021 02:13:25 +0100 Subject: [PATCH 08/10] Bug fixes --- FCRgendata/config.json | 5 +- FCRgendata/setup.py | 79 +++----------------- FCRgendata/src/FCRGenData/__main__.py | 15 ++-- FCRgendata/src/FCRGenData/config_schemas.py | 13 ++-- FCRgendata/src/FCRGenData/data_aggregator.py | 2 +- FCRgendata/src/FCRGenData/solv_summoner.py | 8 +- 6 files changed, 31 insertions(+), 91 deletions(-) diff --git a/FCRgendata/config.json b/FCRgendata/config.json index 18f8785..6a6144f 100644 --- a/FCRgendata/config.json +++ b/FCRgendata/config.json @@ -11,8 +11,5 @@ "uuid": "fb6280ec-1ab8-11e7-93ae-92361f002671" } }, - "cpSolverHost": "localhost:8080", - "outpath": "<path>/generated_data.csv", - "AvgResponseTimeTableFilePath": "<path to avgresponsedata>", - "predictionsFilePath": "<path to predictions>" + "cpSolverHost": "localhost:8080" } \ No newline at end of file diff --git a/FCRgendata/setup.py b/FCRgendata/setup.py index 30f974b..7f578e9 100644 --- a/FCRgendata/setup.py +++ b/FCRgendata/setup.py @@ -5,15 +5,15 @@ https://github.com/pypa/sampleproject Modified by Madoshakalaka@Github (dependency links added) """ -# Always prefer setuptools over distutils -from setuptools import setup, find_packages -from os import path - # io.open is needed for projects that support Python 2.7 # It ensures open() defaults to text mode with universal newlines, # and accepts an argument to specify the text encoding # Python 3 only projects can skip this import from io import open +from os import path + +# Always prefer setuptools over distutils +from setuptools import setup here = path.abspath(path.dirname(__file__)) @@ -25,63 +25,8 @@ with open(path.join(here, "README.md"), encoding="utf-8") as f: # Fields marked as "Optional" may be commented out. setup( - # This is the name of your project. The first time you publish this - # package, this name will be registered for you. It will determine how - # users can install this project, e.g.: - # - # $ pip install sampleproject - # - # And where it will live on PyPI: https://pypi.org/project/sampleproject/ - # - # There are some restrictions on what makes a valid project name - # specification here: - # https://packaging.python.org/specifications/core-metadata/#name - name="FCRgenData", # Required - # Versions should comply with PEP 440: - # https://www.python.org/dev/peps/pep-0440/ - # - # For a discussion on single-sourcing the version across setup.py and the - # project code, see - # https://packaging.python.org/en/latest/single_source_version.html - version="0.0.0", # Required - # This is a one-line description or tagline of what your project does. This - # corresponds to the "Summary" metadata field: - # https://packaging.python.org/specifications/core-metadata/#summary - #description="A sample Python project", # Optional - # This is an optional longer description of your project that represents - # the body of text which users will see when they visit PyPI. - # - # Often, this is the same as your README, so you can just read it in from - # that file directly (as we have already done above) - # - # This field corresponds to the "Description" metadata field: - # https://packaging.python.org/specifications/core-metadata/#description-optional - #long_description=long_description, # Optional - # Denotes that our long_description is in Markdown; valid values are - # text/plain, text/x-rst, and text/markdown - # - # Optional if long_description is written in reStructuredText (rst) but - # required for plain-text or Markdown; if unspecified, "applications should - # attempt to render [the long_description] as text/x-rst; charset=UTF-8 and - # fall back to text/plain if it is not valid rst" (see link below) - # - # This field corresponds to the "Description-Content-Type" metadata field: - # https://packaging.python.org/specifications/core-metadata/#description-content-type-optional - #long_description_content_type="text/markdown", # Optional (see note above) - # This should be a valid link to your project's main homepage. - # - # This field corresponds to the "Home-Page" metadata field: - # https://packaging.python.org/specifications/core-metadata/#home-page-optional - #url="https://github.com/pypa/sampleproject", # Optional - # This should be your name or the name of the organization which owns the - # project. - #author="The Python Packaging Authority", # Optional - # This should be a valid email address corresponding to the author listed - # above. - #author_email="pypa-dev@googlegroups.com", # Optional - # Classifiers help users find your project by categorizing it. - # - # For a list of valid classifiers, see https://pypi.org/classifiers/ + name="FCRgenData", + version="0.0.1", classifiers=[ # Optional # How mature is this project? Common values are # 3 - Alpha @@ -108,8 +53,8 @@ setup( # project page. What does your project relate to? # # Note that this is a string of words separated by whitespace, not a list. - #keywords="sample setuptools development", # Optional - #keywords="sample setuptools development", # Optional + # keywords="sample setuptools development", # Optional + # keywords="sample setuptools development", # Optional # You can just specify package directories manually here if your project is # simple. Or you can use find_packages(). # @@ -119,12 +64,8 @@ setup( # # py_modules=["my_module"], # - packages=find_packages(), # Required - # Specify which Python versions you support. In contrast to the - # 'Programming Language' classifiers above, 'pip install' will check this - # and refuse to install the project if the version does not match. If you - # do not support Python 2, you can simplify this to '>=3.5' or similar, see - # https://packaging.python.org/guides/distributing-packages-using-setuptools/#python-requires + packages=['FCRGenData'], + packages_dir={'FCRGenData': 'src'}, python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4", # This field lists other packages that your project depends on to run. # Any package you put here will be installed by pip when your project is diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py index f51ae26..10a6fa1 100644 --- a/FCRgendata/src/FCRGenData/__main__.py +++ b/FCRgendata/src/FCRGenData/__main__.py @@ -8,7 +8,7 @@ from pathlib import Path from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.data_aggregator import DataAggregator from FCRGenData.validate_config import validate_config -from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \ +from FCRGenData.solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \ LocalCPSolverSolutionSummoner logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) @@ -25,15 +25,16 @@ def run(): # Argument parsing parser = argparse.ArgumentParser(description="Very not advanced data aggregator with interpolation") - parser.add_argument('cp_config', action='store', type=_json_path, description="Path to cpsolver json config file") + parser.add_argument('cp_config', action='store', type=_json_path, + help="Path to cpsolver json config file") parser.add_argument('gendata_config', action='store', type=_json_path, - description="Path to cpsolver json config file") - parser.add_argument('-r', '--docker', action='store_true', type=bool, default=False, - description="Use dockerized cpsolver instead of localhost") + help="Path to cpsolver json config file") + parser.add_argument('-r', '--docker', action='store_true', default=False, + help="Use dockerized cpsolver instead of localhost") parser.add_argument('--container', action='store', type=str, default=None, - description='Name of the docker container') + help='Name of the docker container') parser.add_argument('--docker-url', action='store', type=str, default=None, - description='URL to the remote docker (local by default)') + help='URL to the remote docker (local by default)') args = parser.parse_args() diff --git a/FCRgendata/src/FCRGenData/config_schemas.py b/FCRgendata/src/FCRGenData/config_schemas.py index 31c29eb..c6dbc79 100644 --- a/FCRgendata/src/FCRGenData/config_schemas.py +++ b/FCRgendata/src/FCRGenData/config_schemas.py @@ -39,6 +39,7 @@ DATA_CONFIG_SCHEMA = { "type": "object", "properties": { "desc": {"type": "string"}, + "type": {"type": "string"}, "source": { "type": "object", @@ -57,15 +58,15 @@ DATA_CONFIG_SCHEMA = { "properties": { "column_name": {"type": "string"}, "alias": {"type": "string"}, - "cumulation_period": {"type": "int"} + "cumulation_period": {"type": "integer"} }, "required": ["column_name"] } }, "time_root": - {"type": "bool"} + {"type": "boolean"} }, - "required": ["desc", "source", "values"] + "required": ["desc", "type", "source", "values"] } }, "aggregated_columns": @@ -75,14 +76,14 @@ DATA_CONFIG_SCHEMA = { "items": {"type": "string"} }, "timedelta": - {"type": "int"}, + {"type": "integer"}, "max_time_gap": - {"type": "int"}, + {"type": "integer"}, "solver_cols": {"type": "array", "minItems": 1, "items": {"type": "string"} } }, - "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "avg_response_time_colname"] + "required": ["outpath", "datafiles", "aggregated_columns", "timedelta", "max_time_gap", "solver_cols"] } diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py index 80b45dc..6891278 100644 --- a/FCRgendata/src/FCRGenData/data_aggregator.py +++ b/FCRgendata/src/FCRGenData/data_aggregator.py @@ -34,7 +34,7 @@ class DataAggregator: def __init__(self, config: Dict): self.__conf = config self.__datasources = [] - + self.__header = [] # Roottime data source roottime_data = None max_time_gap = dt.timedelta(seconds=self.__conf['max_time_gap']) diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py index 910c7ee..939a9f9 100644 --- a/FCRgendata/src/FCRGenData/solv_summoner.py +++ b/FCRgendata/src/FCRGenData/solv_summoner.py @@ -77,10 +77,10 @@ class CPSolverSolutionSummoner(abc.ABC): _camelModelFilePath: Path _solutionFilePath: Path _FCRcpxml_path: Path - _FCRcpxml: ElementTree # TODO VERIFY + _FCRcpxml: ElementTree _nodeCandidatesFilePath: Path - _xml_fillings: Dict[str, Element] # TODO check + _xml_fillings: Dict[str, Element] _request_part: Dict def __init__(self, @@ -92,13 +92,13 @@ class CPSolverSolutionSummoner(abc.ABC): self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi") self._FCRcpxml_path = Path(self._config['request']['cpProblemFilePath']) - self._FCRcpxml = etree.parse(self._FCRcpxml_path) + self._FCRcpxml = etree.parse(str(self._FCRcpxml_path)) self._nodeCandidatesFilePath = Path(self._config['request']['nodeCandidatesFilePath']) self._xml_fillings = {} for name in args_names: self._xml_fillings[name] = self._FCRcpxml.find( - f"cpMetrics[@id='{name}']")[0] # TODO check + f"cpMetrics[@id='{name}']")[0] self._request_part = self._config['request'] @abc.abstractmethod -- GitLab From 040a52443859ed232fdd13267aa32c89238786d7 Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Sun, 7 Mar 2021 20:43:15 +0100 Subject: [PATCH 09/10] Bug fixes --- FCRgendata/src/FCRGenData/__main__.py | 16 ++-- FCRgendata/src/FCRGenData/data_aggregator.py | 10 ++- FCRgendata/src/FCRGenData/interpolator.py | 88 +++++++++++-------- .../rawDataReader/row_data_reader.py | 2 +- FCRgendata/src/FCRGenData/solv_summoner.py | 27 +++--- .../tests/iterpolator/mock_row_provider.py | 4 +- 6 files changed, 90 insertions(+), 57 deletions(-) diff --git a/FCRgendata/src/FCRGenData/__main__.py b/FCRgendata/src/FCRGenData/__main__.py index 10a6fa1..13199ef 100644 --- a/FCRgendata/src/FCRGenData/__main__.py +++ b/FCRgendata/src/FCRGenData/__main__.py @@ -8,7 +8,7 @@ from pathlib import Path from FCRGenData.config_schemas import DATA_CONFIG_SCHEMA from FCRGenData.data_aggregator import DataAggregator from FCRGenData.validate_config import validate_config -from FCRGenData.solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \ +from .solv_summoner import CPSolverSolutionSummonerError, DockerCPSolverSolutionSummoner, \ LocalCPSolverSolutionSummoner logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) @@ -25,11 +25,10 @@ def run(): # Argument parsing parser = argparse.ArgumentParser(description="Very not advanced data aggregator with interpolation") - parser.add_argument('cp_config', action='store', type=_json_path, - help="Path to cpsolver json config file") + parser.add_argument('cp_config', action='store', type=_json_path, help="Path to cpsolver json config file") parser.add_argument('gendata_config', action='store', type=_json_path, help="Path to cpsolver json config file") - parser.add_argument('-r', '--docker', action='store_true', default=False, + parser.add_argument('-r', '--docker', action='store_true', default=False, help="Use dockerized cpsolver instead of localhost") parser.add_argument('--container', action='store', type=str, default=None, help='Name of the docker container') @@ -57,7 +56,7 @@ def run(): aggregator = DataAggregator(gendata_config) with ExitStack() as stack: # substiture for multiline with statement - summoner = stack.enter_context(cpsolver) + # summoner = stack.enter_context(cpsolver) csvfile = stack.enter_context(open(gendata_config['outpath'], 'w', newline='')) writer = csv.writer(csvfile, delimiter=',') @@ -69,9 +68,10 @@ def run(): it += 1 print(f'Row {it}') try: - solver_params = {name: row[name] for name in gendata_config['solver_cols']} - solution = summoner.get_solution(solver_params) - writer.writerow((*row, *solution)) + with cpsolver as summoner: + solver_params = [row[name] for name in gendata_config['solver_cols']] + solution = summoner.get_solution(*solver_params) + writer.writerow((*row, *solution)) except CPSolverSolutionSummonerError as err: logging.error( "CPSolverSolutionSummoner raised exception while " diff --git a/FCRgendata/src/FCRGenData/data_aggregator.py b/FCRgendata/src/FCRGenData/data_aggregator.py index 6891278..7bd5f63 100644 --- a/FCRgendata/src/FCRGenData/data_aggregator.py +++ b/FCRgendata/src/FCRGenData/data_aggregator.py @@ -3,6 +3,8 @@ import logging from pathlib import Path from typing import Union, Optional, Dict, List, Tuple, Mapping, Generator +import numpy as np + from FCRGenData.interpolator import InterpolatedDataStream from FCRGenData.rawDataReader import RowCSVReader, IRowDataProvider @@ -132,11 +134,17 @@ class DataAggregator: break else: if value_info['colname'] == value_name: - filtered_row[value_name] = rowdict[value_info['colname']] + filtered_row[value_name] = rowdict[value_name] break if value_name in filtered_row: break if value_name not in filtered_row: raise KeyError("Value specified in aggregated_values not found among provided datasources") + # Convert values to pythonic + for key, value in filtered_row.items(): + if any(map(lambda numpy_type: isinstance(value, numpy_type), + [np.float64, np.float32, np.int16, np.uint32])): + filtered_row[key] = value.item() + yield filtered_row diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py index e06d445..aac3319 100644 --- a/FCRgendata/src/FCRGenData/interpolator.py +++ b/FCRgendata/src/FCRGenData/interpolator.py @@ -1,8 +1,8 @@ import datetime as dt -from typing import Dict, Generator from numbers import Number +from typing import Dict, Generator, Optional + from scipy import interpolate -import itertools from FCRGenData.rawDataReader import IRowDataProvider @@ -47,6 +47,9 @@ class InterpolatedDataStream: performance issues. ''' + _last_ts: dt.datetime + _t0: dt.datetime + @staticmethod def __ts_embedding(t1: dt.datetime, t2: dt.datetime) -> int: return (t1 - t2).seconds @@ -54,57 +57,72 @@ class InterpolatedDataStream: def __init__(self, reader: IRowDataProvider, timestamp_generator: Generator[dt.datetime, None, None], - datasource=None, - ): - - gen = iter(timestamp_generator) - self.t0 = next(gen) - self.ts_gen = itertools.chain([self.t0], gen) - # trick so that we can look ahead for first elem of generator - - self.last_ts = self.t0 - self.ts_col_name = reader.timestamp_column_name - ts_col_embed = reader.column_names.index(self.ts_col_name) + datasource=None): + + self._gen = timestamp_generator + self._t0 = self._last_ts = reader.peek_t0() + previous_timestamp: Optional[dt.datetime] = None + + self._ts_col_name = reader.timestamp_column_name col_types = reader.columns - self.intp_col_embed = {cname: i for i, cname in enumerate(col_types) if issubclass(col_types[cname], Number)} + self.intp_col_embed = [colname for colname, coltype in col_types.items() if issubclass(coltype, Number)] + self.intp_col_str = [colname for colname, coltype in col_types.items() if issubclass(coltype, str)] # col name -> (timestamp embedding, given value) - self.raw_data = {cname: ([], []) for cname in self.intp_col_embed} - row_generator = reader.row_generator_annotated() + self.raw_data = {colname: ([], []) for colname in self.intp_col_embed} + self._str_interpolants = {colname: [] for colname in self.intp_col_str} # populate self.inp_data - for row in row_generator: - for cname, col_idx in self.intp_col_embed.items(): - if row[col_idx] is None: + for row in reader.row_generator_annotated(): + + row_ts = row[self._ts_col_name] + if previous_timestamp: + tdelta_embed = self.__ts_embedding(row_ts, previous_timestamp) + else: + tdelta_embed = 0 + + previous_timestamp = row_ts + for colname in self.intp_col_embed: + if row[colname] is None: continue - row_ts = row[ts_col_embed] - self.last_ts = max(self.last_ts, row_ts) - tdelta_embed = self.__ts_embedding(row_ts, self.t0) - intp_t = self.raw_data[cname] - intp_t[0].append(tdelta_embed) - intp_t[1].append(row[col_idx]) - + self.raw_data[colname][0].append(tdelta_embed) + self.raw_data[colname][1].append(row[colname]) + + for colname_str in self.intp_col_str: + if row[colname_str] is None: + continue + + row_ts = row[self._ts_col_name] + self._str_interpolants[colname_str].append((row_ts, row[colname_str])) + if any(map(lambda v: len(v[0]) == len(v[1]) == 0, self.raw_data.values())): raise EmptyReaderExcpetion() # interpolate aggregated data - self.interpolants = dict() - for cname, cdata in self.raw_data.items(): - self.interpolants[cname] = interpolate.interp1d(x=cdata[0], y=cdata[1], kind='linear') + self.interpolants = {} + for colname, cdata in self.raw_data.items(): + self.interpolants[colname] = interpolate.interp1d(x=cdata[0], y=cdata[1], kind='linear') def __iter__(self): return self def __next__(self) -> Dict: - ts = next(self.ts_gen) - if ts > self.last_ts: + ts = next(self._gen) + if ts > self._last_ts: raise StopIteration - row_dict = {self.ts_col_name: ts} - ts_embed = self.__ts_embedding(ts, self.t0) + row_dict = {self._ts_col_name: ts} + ts_embed = self.__ts_embedding(ts, self._t0) + + for colname, interpolant in self.interpolants.items(): + row_dict[colname] = interpolant(ts_embed).flat[0] - for cname in self.interpolants: - row_dict[cname] = self.interpolants[cname](ts_embed).flat[0] + for colname, interpolant in self._str_interpolants.items(): + timestamp, value = interpolant[0] + while ts > timestamp and len(interpolant) > 1: + interpolant.pop(0) + timestamp, value = interpolant[0] + row_dict[colname] = value return row_dict diff --git a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py index 01f7aed..4bb1b2e 100644 --- a/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py +++ b/FCRgendata/src/FCRGenData/rawDataReader/row_data_reader.py @@ -65,7 +65,7 @@ class IRowDataProvider(abc.ABC): pass @abc.abstractmethod - def row_generator(self) -> Generator[Iterable[type], None, None]: + def row_generator(self) -> Generator[List[any], None, None]: """Generator over raw data, provides rows of data in increasing order (by timestamp) Returns list with values (order same as in column_names). """ diff --git a/FCRgendata/src/FCRGenData/solv_summoner.py b/FCRgendata/src/FCRGenData/solv_summoner.py index 939a9f9..a26b434 100644 --- a/FCRgendata/src/FCRGenData/solv_summoner.py +++ b/FCRgendata/src/FCRGenData/solv_summoner.py @@ -77,10 +77,11 @@ class CPSolverSolutionSummoner(abc.ABC): _camelModelFilePath: Path _solutionFilePath: Path _FCRcpxml_path: Path - _FCRcpxml: ElementTree + _FCRcpxml: ElementTree # TODO VERIFY _nodeCandidatesFilePath: Path - _xml_fillings: Dict[str, Element] + _args_names: List[str] + _xml_fillings: Dict[str, Element] # TODO check _request_part: Dict def __init__(self, @@ -90,11 +91,13 @@ class CPSolverSolutionSummoner(abc.ABC): self._solver_host = self._config['cpSolverHost'] self._camelModelFilePath = Path(self._config['request']['camelModelFilePath']) - self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi") + # self._solutionFilePath = Path(f"{self._config['request']['camelModelFilePath'][:-4]}-solution.xmi").name + self._solutionFilePath = self._camelModelFilePath.parent.joinpath(f"{self._camelModelFilePath.name}-solution.xmi") self._FCRcpxml_path = Path(self._config['request']['cpProblemFilePath']) self._FCRcpxml = etree.parse(str(self._FCRcpxml_path)) self._nodeCandidatesFilePath = Path(self._config['request']['nodeCandidatesFilePath']) + self._args_names = args_names self._xml_fillings = {} for name in args_names: self._xml_fillings[name] = self._FCRcpxml.find( @@ -102,7 +105,7 @@ class CPSolverSolutionSummoner(abc.ABC): self._request_part = self._config['request'] @abc.abstractmethod - def get_solution(self, avg_rsp_time: float) -> List[int]: + def get_solution(self, *params_list) -> List[int]: pass @@ -121,10 +124,12 @@ class LocalCPSolverSolutionSummoner(CPSolverSolutionSummoner): return False @lru_cache(maxsize=CACHE_SIZE) - def get_solution(self, solver_params: Dict) -> List[int]: # TODO fix + def get_solution(self, *params_list) -> List[int]: # TODO fix """ returns solution as list of int """ - for arg_name, arg_loc in self._xml_fillings: - arg_loc.set('value', str(solver_params[arg_name])) + + for arg_name, arg_value in zip(self._args_names, params_list): + arg_loc = self._xml_fillings[arg_name] + arg_loc.set('value', str(arg_value)) self._FCRcpxml.write(self.tempfile.name, xml_declaration=True, @@ -180,11 +185,13 @@ class DockerCPSolverSolutionSummoner(CPSolverSolutionSummoner): return False @lru_cache(maxsize=CACHE_SIZE) - def get_solution(self, solver_params: Dict) -> List[int]: + def get_solution(self, *params_list) -> List[int]: """ returns solution as list of int """ - for arg_name, arg_loc in self._xml_fillings: - arg_loc.set('value', str(solver_params[arg_name])) + for arg_name, arg_value in zip(self._args_names, params_list): + arg_loc = self._xml_fillings[arg_name] + arg_loc.set('value', str(arg_value)) + self._FCRcpxml.write(self.tempfile.name, xml_declaration=True, encoding="ASCII") docker_put(self.__container, src=Path(self.tempfile.name), dst=Path('/tmp')) diff --git a/FCRgendata/tests/iterpolator/mock_row_provider.py b/FCRgendata/tests/iterpolator/mock_row_provider.py index f48afdc..388f365 100644 --- a/FCRgendata/tests/iterpolator/mock_row_provider.py +++ b/FCRgendata/tests/iterpolator/mock_row_provider.py @@ -14,11 +14,11 @@ class MockRowDataProvider(IRowDataProvider): def __init__(self, data: MockData, max_time_difference: dt.timedelta): - super().__init__(timestamp_column_name=data['ts_col_name'], + super().__init__(timestamp_column_name=data['_ts_col_name'], max_time_difference=max_time_difference) self.data = data['data'] self.headers = data['headers'] - self.ts_col_name = data['ts_col_name'] + self.ts_col_name = data['_ts_col_name'] self.types = data['types'] @property -- GitLab From bcff3d1b30ec583ccf5c8affa1799c4050a8f328 Mon Sep 17 00:00:00 2001 From: jkk <jk394387@students.mimuw.edu.pl> Date: Sun, 7 Mar 2021 22:58:16 +0100 Subject: [PATCH 10/10] test fixes --- FCRgendata/src/FCRGenData/interpolator.py | 11 ++++++++--- FCRgendata/tests/iterpolator/mock_row_provider.py | 8 ++++---- FCRgendata/tests/iterpolator/test_interpolator.py | 6 +----- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/FCRgendata/src/FCRGenData/interpolator.py b/FCRgendata/src/FCRGenData/interpolator.py index aac3319..b80bd38 100644 --- a/FCRgendata/src/FCRGenData/interpolator.py +++ b/FCRgendata/src/FCRGenData/interpolator.py @@ -47,8 +47,8 @@ class InterpolatedDataStream: performance issues. ''' - _last_ts: dt.datetime - _t0: dt.datetime + _last_ts: Optional[dt.datetime] + _t0: Optional[dt.datetime] @staticmethod def __ts_embedding(t1: dt.datetime, t2: dt.datetime) -> int: @@ -60,7 +60,7 @@ class InterpolatedDataStream: datasource=None): self._gen = timestamp_generator - self._t0 = self._last_ts = reader.peek_t0() + self._t0 = self._last_ts = None previous_timestamp: Optional[dt.datetime] = None self._ts_col_name = reader.timestamp_column_name @@ -109,6 +109,11 @@ class InterpolatedDataStream: def __next__(self) -> Dict: ts = next(self._gen) + if self._t0 is None: + self._t0 = ts + if self._last_ts is None: + self._last_ts = ts + if ts > self._last_ts: raise StopIteration diff --git a/FCRgendata/tests/iterpolator/mock_row_provider.py b/FCRgendata/tests/iterpolator/mock_row_provider.py index 388f365..3000671 100644 --- a/FCRgendata/tests/iterpolator/mock_row_provider.py +++ b/FCRgendata/tests/iterpolator/mock_row_provider.py @@ -14,11 +14,11 @@ class MockRowDataProvider(IRowDataProvider): def __init__(self, data: MockData, max_time_difference: dt.timedelta): - super().__init__(timestamp_column_name=data['_ts_col_name'], + super().__init__(timestamp_column_name=data['ts_col_name'], max_time_difference=max_time_difference) self.data = data['data'] self.headers = data['headers'] - self.ts_col_name = data['_ts_col_name'] + self.ts_col_name = data['ts_col_name'] self.types = data['types'] @property @@ -30,7 +30,7 @@ class MockRowDataProvider(IRowDataProvider): return self.data[0][ts_col_idx] @property - def column_names(self) -> Tuple[str]: + def column_names(self) -> List[str]: return self.headers @property @@ -38,7 +38,7 @@ class MockRowDataProvider(IRowDataProvider): return {cname: ctype for cname, ctype in zip(self.headers, self.types)} def row_generator_annotated(self) -> Generator[Dict[str, any], None, None]: - return map(lambda row: dict(zip(self.headers, row)), self.data) + yield from map(lambda row: dict(zip(self.headers, row)), self.data) def row_generator(self) -> Generator[List[type], None, None]: yield from self.data diff --git a/FCRgendata/tests/iterpolator/test_interpolator.py b/FCRgendata/tests/iterpolator/test_interpolator.py index 84e9f85..42ad5f6 100644 --- a/FCRgendata/tests/iterpolator/test_interpolator.py +++ b/FCRgendata/tests/iterpolator/test_interpolator.py @@ -90,12 +90,8 @@ def test_no_data_but_timestamps(): i = 0 while True: yield t0 + dt.timedelta(seconds=i) - try: + with pytest.raises(EmptyReaderExcpetion): interp = InterpolatedDataStream(reader=rowProv, timestamp_generator=ts_gen()) - except EmptyReaderExcpetion: - assert True - else: - assert False @pytest.mark.interpolation -- GitLab