From 6773d1af828de4143f6bbf31887942d5900e5c50 Mon Sep 17 00:00:00 2001 From: Jakub Kowalski Date: Fri, 18 Jun 2021 01:39:07 +0200 Subject: [PATCH 1/2] Influx mock added --- DataGenerator/Dockerfile | 3 +- DataGenerator/setup.cfg | 2 +- DataGenerator/src/DataGenerator/config.py | 2 +- DataGenerator/src/DataGenerator/models.py | 40 -------- .../src/DataGenerator/solution_cache.py | 2 +- .../src/DataGeneratorService/config.py | 4 +- .../src/DataGeneratorService/cptask_cache.py | 32 +++---- .../src/DataGeneratorService/fastapi_app.py | 31 ++++--- .../src/DataGeneratorService/models.py | 92 ++++++++++++++++--- .../DataGeneratorService/remote_summoner.py | 14 ++- .../src/DataGeneratorService/tasks.py | 5 +- .../src/DataReading/influx_data_provider.py | 4 +- DataGenerator/src/DataReading/interpolator.py | 4 +- testing-docker-compose.yml | 1 + testing-generator-docker-compose.yml | 6 +- tests/MockInflux/MockInflux/config.py | 2 +- tests/test.py | 7 +- 17 files changed, 151 insertions(+), 100 deletions(-) diff --git a/DataGenerator/Dockerfile b/DataGenerator/Dockerfile index 781dfd6..d1382d4 100644 --- a/DataGenerator/Dockerfile +++ b/DataGenerator/Dockerfile @@ -1,5 +1,5 @@ FROM openjdk:slim -COPY --from=python:3.8-slim / / +COPY --from=python:3.8 / / RUN apt update -y RUN apt install git -y @@ -11,4 +11,5 @@ WORKDIR /app/ RUN pip install -r requirements.txt # Feature multi-step: remove git etc COPY src /app/ +RUN mkdir /data ENTRYPOINT ["uvicorn", "DataGeneratorService.fastapi_app:app", "--port", "80", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/DataGenerator/setup.cfg b/DataGenerator/setup.cfg index 56d61d1..1ddf036 100644 --- a/DataGenerator/setup.cfg +++ b/DataGenerator/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = DataGenerator -version = 0.1.3 +version = 0.1.4 description = DataGeneration Tools for CPSolver long_description = file: readme.md classifiers = diff --git a/DataGenerator/src/DataGenerator/config.py b/DataGenerator/src/DataGenerator/config.py index 4f57fa2..3cbbca8 100644 --- a/DataGenerator/src/DataGenerator/config.py +++ b/DataGenerator/src/DataGenerator/config.py @@ -1,7 +1,7 @@ import datetime as dt import os -CPSOLVER_URL = os.getenv("CPSOLVER_ENDPOINT") +CPSOLVER_URL = os.getenv("CPSOLVER_ENDPOINT", "cpsolver") CPSOLVER_TIMEOUT = int(os.getenv("CPSOLVER_TIMEOUT", "300")) MODELDB_HOST: str = os.getenv("MODELDB_HOST", "modeldb") diff --git a/DataGenerator/src/DataGenerator/models.py b/DataGenerator/src/DataGenerator/models.py index 62c49a5..b416d22 100644 --- a/DataGenerator/src/DataGenerator/models.py +++ b/DataGenerator/src/DataGenerator/models.py @@ -18,43 +18,3 @@ class CPSolution(BaseModel): variables: Dict[str, str] configuration: Dict[str, str] - -class CPSolverWatermark(BaseModel): - """CP-solver watermark data model. - - Used in direct communication with CP-solver - """ - user: str - system: str - date: dt.datetime - uuid: str - - -class CPSolverNotificationResult(BaseModel): - """CP-solver notification result data model.""" - status: str - errorCode: str - errorDescription: str - - -class CPSolverNotification(BaseModel): - """Cp-solver notification data model. - - Received from cp-solver when problem solution is created (or some error has occured). - """ - applicationId: str - cdoResourcePath: str - result: CPSolverNotificationResult - watermark: CPSolverWatermark - - -class CPSolverRequest(BaseModel): - """CP-solver request data model. - - Used in communicatio to cp-solver, to schedule problem solving. - """ - applicationId: str - cdoModelsPath: str - notificationURI: str - timeLimit: Optional[int] = None - watermark: CPSolverWatermark diff --git a/DataGenerator/src/DataGenerator/solution_cache.py b/DataGenerator/src/DataGenerator/solution_cache.py index 223d105..e1413f4 100644 --- a/DataGenerator/src/DataGenerator/solution_cache.py +++ b/DataGenerator/src/DataGenerator/solution_cache.py @@ -40,7 +40,7 @@ class SolutionCache: solution if found. """ solution = None - query_result = self._collection.find_one({'query_json': query}) + query_result = self._collection.find_one({'query_json': query.json()}) if query_result is not None: solution = CPSolution.parse_raw(query_result['solution_json']) diff --git a/DataGenerator/src/DataGeneratorService/config.py b/DataGenerator/src/DataGeneratorService/config.py index 6f67722..eea1672 100644 --- a/DataGenerator/src/DataGeneratorService/config.py +++ b/DataGenerator/src/DataGeneratorService/config.py @@ -18,4 +18,6 @@ MAX_PARALLEL_TASKS = int(os.environ.get("MAX_PARALLEL_TASKS", "3")) DEFAULT_DELTA = dt.timedelta(seconds=int(os.getenv("DEFAULT_DELTA"))) if os.getenv("DEFAULT_DELTA") \ else dt.timedelta(seconds=30) -LOGLEVEL = os.environ.get("LOGLEVEL", "DEBUG") \ No newline at end of file +LOGLEVEL = os.environ.get("LOGLEVEL", "DEBUG") +MOCK_INFLUX = os.environ.get("MOCK_INFLUX") == "True" +MOCK_CSV_PATH = os.environ.get("MOCK_CSV_PATH") \ No newline at end of file diff --git a/DataGenerator/src/DataGeneratorService/cptask_cache.py b/DataGenerator/src/DataGeneratorService/cptask_cache.py index 1d63042..cc4a03c 100644 --- a/DataGenerator/src/DataGeneratorService/cptask_cache.py +++ b/DataGenerator/src/DataGeneratorService/cptask_cache.py @@ -1,13 +1,12 @@ import datetime as dt import logging import uuid as uuid -from typing import Optional, List, Tuple, Dict, Union +from typing import Optional, List, Dict, Union import pymongo from DataGeneratorService.config import MODELDB_HOST, MODELDB_PORT -from DataGeneratorService.models import CPSolverTaskInfo, CPSolverJobInfo, CPSolution, CPQuery, DataGenerationInfo, \ - State +from DataGeneratorService.models import CPSolverTaskInfo, CPSolverJobInfo, CPSolution, CPQuery, DataGenerationInfo DEFAULT_TABLE_NAME: str = 'generator_cptasks' logger = logging.getLogger(__name__) @@ -49,7 +48,7 @@ class CPTaskCache: remainingJobs=jobAmount, dataGenerationInfo=data_generation_info) collection = self._db[f'generatorjobs'] - collection.insert_one(job.dict()) + collection.insert_one(job.to_mongo()) return job def create_task(self, @@ -75,8 +74,9 @@ class CPTaskCache: taskId=str(uuid.uuid4()), jobId=jobId, predictions=predictions, - timestamp=timestamp) - collection.insert_one(task_info.json()) + timestamp=timestamp, + state="scheduled") + collection.insert_one(task_info.to_mongo()) return task_info def retrieve_task(self, taskId: str) -> CPSolverTaskInfo: @@ -98,7 +98,7 @@ class CPTaskCache: if res is None: raise KeyError(f'Task with task_id: {taskId} was not found in db.') else: - return CPSolverTaskInfo.parse_raw(res) + return CPSolverTaskInfo.from_mongo(res) def retrieve_job(self, jobId: str) -> CPSolverJobInfo: """ @@ -119,7 +119,7 @@ class CPTaskCache: if res is None: raise KeyError(f'Job with job_id: {jobId} was not found in db.') else: - return CPSolverJobInfo.parse_raw(res) + return CPSolverJobInfo.from_mongo(res) def retrieve_undone_tasks(self, jobId: str, @@ -144,13 +144,13 @@ class CPTaskCache: res = [] for raw_task in cursor: - res.append(CPSolverTaskInfo.parse_raw(raw_task)) + res.append(CPSolverTaskInfo.from_mongo(raw_task)) return res def update_task_state(self, taskId: str, - new_state: State) -> None: + new_state: str) -> None: """ Updates task state. @@ -159,7 +159,7 @@ class CPTaskCache: new_state: new task state. """ collection = self._db[f'generatortasks'] - collection.find_one_and_update(filter={"taskId": taskId}, update={'$set': {'state', new_state}}) + collection.find_one_and_update({"taskId": taskId}, {'$set': {'state': new_state}}) def add_solution(self, taskId: str, @@ -177,10 +177,10 @@ class CPTaskCache: collection.remove(query={ "taskId": taskId }) - task_info = CPSolverTaskInfo.parse_raw(task_info_raw) + task_info = CPSolverTaskInfo.from_mongo(task_info_raw) task_info.solution = solution - task_info.state = State.done - collection.insert_one(task_info.dict()) + task_info.state = "done" + collection.insert_one(task_info.to_mongo()) def update_job(self, jobId: str) -> bool: """ @@ -194,7 +194,7 @@ class CPTaskCache: """ collection = self._db[f'generatorjobs'] collection.find_one_and_update({'jobId': jobId}, {'$inc': {'remainingJobs': -1}}) - job_info = CPSolverJobInfo.parse_raw(self._db[f'generatorjobs'].find_one({'jobId': jobId})) + job_info = CPSolverJobInfo.from_mongo(self._db[f'generatorjobs'].find_one({'jobId': jobId})) return job_info.remainingJobs == 0 def is_job_ok(self, jobId: str) -> bool: @@ -234,7 +234,7 @@ class CPTaskCache: """ collection = self._db[f'generatortasks'] tasks_raw = collection.find({'jobId': jobId}) - tasks = list(map(CPSolverTaskInfo.parse_raw, tasks_raw)) + tasks = list(map(CPSolverTaskInfo.from_mongo, tasks_raw)) tasks.sort(key=lambda task: task.query.it) return tasks diff --git a/DataGenerator/src/DataGeneratorService/fastapi_app.py b/DataGenerator/src/DataGeneratorService/fastapi_app.py index 2e3db21..e8a5312 100644 --- a/DataGenerator/src/DataGeneratorService/fastapi_app.py +++ b/DataGenerator/src/DataGeneratorService/fastapi_app.py @@ -1,6 +1,7 @@ import datetime as dt import logging import tempfile +import traceback from pathlib import Path from typing import List, Tuple @@ -10,14 +11,15 @@ from fastapi import FastAPI from fastapi import Response, status, BackgroundTasks from lxml import etree -from DataGenerator.models import CPSolverNotification, CPSolution +from DataGenerator.models import CPSolution from DataGenerator.solution_cache import CPQuery, SolutionCache from DataGeneratorService.config import INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USERNAME, INFLUXDB_PASSWORD, \ - INFLUXDB_DBNAME, MAX_PARALLEL_TASKS, LOGLEVEL + INFLUXDB_DBNAME, MAX_PARALLEL_TASKS, LOGLEVEL, MOCK_INFLUX, MOCK_CSV_PATH from DataGeneratorService.cptask_cache import CPTaskCache -from DataGeneratorService.models import DataGenerationInfo, DataGenerationResult, JobResult +from DataGeneratorService.models import DataGenerationInfo, DataGenerationResult, JobResult, CPSolverNotification from DataGeneratorService.tasks import collect, run_task from DataReading.aggregator import _timestamp_generator +from DataReading.csv_reader import CSVReader from DataReading.influx_data_provider import InfluxDataProvider from DataReading.interpolator import InterpolatedDataStream @@ -33,7 +35,7 @@ logger.setLevel(level=LOGLEVEL) logger.addHandler(logging.StreamHandler()) -def generate_data(data_generation_info: DataGenerationInfo) -> None: +async def generate_data(data_generation_info: DataGenerationInfo) -> None: """Retrieves data from InfluxDB, interpolates it and splits to separate queries, sent sequentially to the cpsolver. @@ -53,10 +55,14 @@ def generate_data(data_generation_info: DataGenerationInfo) -> None: 'path_dataset': tempdir } - data_provider = InfluxDataProvider(applicationId=data_generation_info.applicationId, - startTimestamp=data_generation_info.startTimestamp, - finishTimestamp=data_generation_info.finishTimestamp, - influxConfigs=configs) + if MOCK_INFLUX: + data_provider = CSVReader(MOCK_CSV_PATH) + else: + data_provider = InfluxDataProvider(applicationId=data_generation_info.applicationId, + startTimestamp=data_generation_info.startTimestamp, + finishTimestamp=data_generation_info.finishTimestamp, + influxConfigs=configs) + # data interpolation interpolated_data = InterpolatedDataStream(data_provider, _timestamp_generator(data_provider.peek_t0(), @@ -100,11 +106,12 @@ def generate_data(data_generation_info: DataGenerationInfo) -> None: except Exception as exc: try: logger.error( - f'Error occurred during preparation of data generation {exc}. ' + f'Error occurred during preparation of data generation {exc}.' f'Replying to {data_generation_info.returnEndpoint}') + logger.debug(traceback.print_exc()) requests.post(data_generation_info.returnEndpoint, DataGenerationResult(dataGenerationInfo=data_generation_info, result=JobResult.error, - error_msg=str(exc))) + error_msg=str(exc)).json().encode('ascii')) except Exception as exc: logger.error( f'Unable to send fail message to the return endpoint {data_generation_info.returnEndpoint}: {exc}') @@ -126,13 +133,13 @@ def generate_data(data_generation_info: DataGenerationInfo) -> None: } } }) -def gen_data(data_generation_info: DataGenerationInfo, background_tasks: BackgroundTasks): +async def gen_data(data_generation_info: DataGenerationInfo, background_tasks: BackgroundTasks): background_tasks.add_task(generate_data, data_generation_info) return Response(status_code=status.HTTP_202_ACCEPTED) @app.post('/ack/{taskId}') -def ack_notify(taskId: str, notification: CPSolverNotification): +async def ack_notify(taskId: str, notification: CPSolverNotification): task_info = task_cache.retrieve_task(taskId=taskId) job_info = task_cache.retrieve_job(task_info.jobId) diff --git a/DataGenerator/src/DataGeneratorService/models.py b/DataGenerator/src/DataGeneratorService/models.py index 33b54d2..9a64a76 100644 --- a/DataGenerator/src/DataGeneratorService/models.py +++ b/DataGenerator/src/DataGeneratorService/models.py @@ -2,18 +2,47 @@ import datetime as dt from enum import Enum from typing import Optional, Dict, Union -from DataGenerator.models import CPQuery, CPSolution from pydantic import validator from pydantic.main import BaseModel +from DataGenerator.models import CPQuery, CPSolution from .config import DEFAULT_DELTA -class State(Enum): - """Possible task states.""" - scheduled = "scheduled" - pending = "pending" - done = "done" +class MongoModel(BaseModel): + + def to_mongo(self): + def _format(dump: Dict): + dumped = {} + for key, value in dump.items(): + if isinstance(value, dt.datetime): + dumped[key] = value.isoformat() + elif isinstance(value, dt.timedelta): + dumped[key] = value.total_seconds() + elif isinstance(value, Dict): + dumped[key] = _format(value) + else: + dumped[key] = value + return dumped + + return _format(self.dict()) + + @classmethod + def from_mongo(cls, raw: Dict): + def _format(dump: Dict): + dumped = {} + for key, value in dump.items(): + if isinstance(key, dt.datetime): + dumped[key] = dt.datetime.fromisoformat(value) + elif isinstance(key, dt.timedelta): + dumped[key] = dt.timedelta(seconds=value) + elif isinstance(key, Dict): + dumped[key] = _format(value) + else: + dumped[key] = value + return dumped + + return cls(**_format(raw)) class JobResult(Enum): @@ -22,7 +51,7 @@ class JobResult(Enum): error = "error" -class DataGenerationInfo(BaseModel): +class DataGenerationInfo(MongoModel): """Data generation job related information.""" applicationId: str trainRatio: float # [0;1] range @@ -42,14 +71,14 @@ class DataGenerationInfo(BaseModel): return v -class DataGenerationResult(BaseModel): +class DataGenerationResult(MongoModel): """Data generation job status""" dataGenerationInfo: DataGenerationInfo result: JobResult error_msg: Optional[str] = None -class CPSolverJobInfo(BaseModel): +class CPSolverJobInfo(MongoModel): """Data generation job information. Scheduled by external source, consists of many tasks. @@ -60,7 +89,7 @@ class CPSolverJobInfo(BaseModel): errorInfo: Optional[str] = None -class CPSolverTaskInfo(BaseModel): +class CPSolverTaskInfo(MongoModel): """Data model about single request to the CPSolver. Stores both query and solution (if already generated).""" @@ -68,6 +97,47 @@ class CPSolverTaskInfo(BaseModel): solution: Optional[CPSolution] jobId: str taskId: str - state: State = State.scheduled + state: str # TODO replace with enum timestamp: dt.datetime predictions: Dict[str, Union[str, float, int]] + + +class CPSolverWatermark(MongoModel): + """CP-solver watermark data model. + + Used in direct communication with CP-solver + """ + user: str + system: str + date: dt.datetime + uuid: str + + +class CPSolverNotificationResult(MongoModel): + """CP-solver notification result data model.""" + status: str + errorCode: str + errorDescription: str + + +class CPSolverNotification(MongoModel): + """Cp-solver notification data model. + + Received from cp-solver when problem solution is created (or some error has occured). + """ + applicationId: str + cdoResourcePath: str + result: CPSolverNotificationResult + watermark: CPSolverWatermark + + +class CPSolverRequest(MongoModel): + """CP-solver request data model. + + Used in communication to cp-solver, to schedule problem solving. + """ + applicationId: str + cdoModelsPath: str + notificationURI: str + timeLimit: Optional[int] = None + watermark: CPSolverWatermark diff --git a/DataGenerator/src/DataGeneratorService/remote_summoner.py b/DataGenerator/src/DataGeneratorService/remote_summoner.py index 9878746..8f7d446 100644 --- a/DataGenerator/src/DataGeneratorService/remote_summoner.py +++ b/DataGenerator/src/DataGeneratorService/remote_summoner.py @@ -1,18 +1,21 @@ import datetime as dt import itertools +import logging import tempfile import uuid from pathlib import Path import requests +from cdoAdapter import CDOAdapter from lxml import etree from DataGenerator.config import CPSOLVER_URL, WATERMARK_USER, WATERMARK_SYSTEM, CPSOLVER_TIMEOUT -from DataGenerator.models import CPSolverRequest, CPSolverWatermark from DataGenerator.solution_cache import CPQuery, SolutionCache from DataGenerator.summoners.solv_summoner import CPSolverSolutionSummonerError from DataGeneratorService.config import HOSTNAME -from cdoAdapter import CDOAdapter +from DataGeneratorService.models import CPSolverRequest, CPSolverWatermark + +logger = logging.getLogger(__name__) class RemoteCPSolverSolutionSummoner: @@ -48,7 +51,8 @@ class RemoteCPSolverSolutionSummoner: # Retrieve cpproblem from cdo and modify it with custom problem data with tempfile.TemporaryDirectory() as cpproblem_template_tempdir: - cpproblem_template_path = Path(cpproblem_template_tempdir.name).joinpath('cpproblem_template.xmi') + cpproblem_template_path = Path(cpproblem_template_tempdir).joinpath('cpproblem_template.xmi') + logger.info(f'Retrieving {cdo_path} to {cpproblem_template_path}') if not CDOAdapter.exportResourceToFile(cdo_path, cpproblem_template_path): raise KeyError(f"Problem not found in cdo: {cpproblem_template_path}") @@ -58,6 +62,7 @@ class RemoteCPSolverSolutionSummoner: problem_local_path = Path(cpproblem_tempfile.name) problem_cdo_path = f'generator_tmp_cpproblem/{task_id}' # Upload task + logger.info(f'Uploading {cpproblem_template_path} to {problem_cdo_path}') if not CDOAdapter.importResourceFromFile(problem_local_path, problem_cdo_path): raise KeyError(f"Unable to send problem to cdo: {problem_cdo_path}") @@ -77,6 +82,7 @@ class RemoteCPSolverSolutionSummoner: watermark=cpwatermark ) # Upload to cpsolver + logger.info(f'Sending request to the CPSolver: {cprequest.json()}') response = requests.post(url=f'{CPSOLVER_URL}/constraintProblemSolution', data=cprequest.json(), timeout=CPSOLVER_TIMEOUT, @@ -99,11 +105,11 @@ class RemoteCPSolverSolutionSummoner: """ cpproblem_xml = etree.parse(str(cpproblem_template_path)) - for name, value in itertools.chain(query.parameters.items(), query.configuration.items()): cpproblem_xml.find(f"cpMetrics[@id='{name}']")[0].set('value', str(value)) output_problem = tempfile.NamedTemporaryFile() + logger.debug(f'Creating CPProblem {output_problem.name} with query: {query}') cpproblem_xml.write(output_problem.name, xml_declaration=True, encoding="ASCII") diff --git a/DataGenerator/src/DataGeneratorService/tasks.py b/DataGenerator/src/DataGeneratorService/tasks.py index 8c52d70..d2ea4bb 100644 --- a/DataGenerator/src/DataGeneratorService/tasks.py +++ b/DataGenerator/src/DataGeneratorService/tasks.py @@ -5,9 +5,10 @@ from pathlib import Path import requests from modeldb import ModelDBClient +from DataGenerator.models import CPSolution from DataGenerator.solution_cache import SolutionCache from DataGeneratorService.cptask_cache import CPTaskCache -from DataGeneratorService.models import CPSolution, CPSolverJobInfo, State, JobResult +from DataGeneratorService.models import CPSolverJobInfo, JobResult from DataGeneratorService.models import DataGenerationResult from DataGeneratorService.remote_summoner import RemoteCPSolverSolutionSummoner @@ -35,7 +36,7 @@ def run_task(job_info: CPSolverJobInfo, if cached_solution is not None: ack_task(solution=cached_solution, task_id=task.taskId, job_id=job_info.jobId) else: - task_cache.update_task_state(task.taskId, State.pending) + task_cache.update_task_state(task.taskId, "pending") summoner.ask_solver(applicationId=task.query.applicationId, cdo_path=job_info.dataGenerationInfo.cdoProblemPath, query=task.query, diff --git a/DataGenerator/src/DataReading/influx_data_provider.py b/DataGenerator/src/DataReading/influx_data_provider.py index d323186..7e79989 100644 --- a/DataGenerator/src/DataReading/influx_data_provider.py +++ b/DataGenerator/src/DataReading/influx_data_provider.py @@ -42,8 +42,8 @@ class InfluxDataProvider(IRowDataProvider): def __init__(self, applicationId: str, influxConfigs: Dict[str, str], - startTimestamp: Optional[dt.datetime], - finishTimestamp: Optional[dt.datetime], + startTimestamp: Optional[dt.datetime] = None, + finishTimestamp: Optional[dt.datetime] = None, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/DataGenerator/src/DataReading/interpolator.py b/DataGenerator/src/DataReading/interpolator.py index 8a13602..6fde5c3 100644 --- a/DataGenerator/src/DataReading/interpolator.py +++ b/DataGenerator/src/DataReading/interpolator.py @@ -6,7 +6,7 @@ from typing import Dict, Generator, Optional, List from DataReading.abstractreader import IRowDataProvider -class EmptyReaderExcpetion(Exception): +class EmptyReaderException(Exception): """Raised when reader has no values in it.""" @@ -105,7 +105,7 @@ class InterpolatedDataStream: self._str_interpolants[colname_str].append((row_ts, row[colname_str])) if any(map(lambda v: len(v[0]) == len(v[1]) == 0, self.raw_data.values())): - raise EmptyReaderExcpetion + raise EmptyReaderException # interpolate aggregated data self.interpolants = {} diff --git a/testing-docker-compose.yml b/testing-docker-compose.yml index 481236a..224bf2c 100644 --- a/testing-docker-compose.yml +++ b/testing-docker-compose.yml @@ -145,6 +145,7 @@ services: dockerfile: Dockerfile volumes: - ./javaLibrary:/javaLibrary + - ./config:/config env_file: - config/testing-sl-solver-properties.env ports: diff --git a/testing-generator-docker-compose.yml b/testing-generator-docker-compose.yml index b19eb16..017007c 100644 --- a/testing-generator-docker-compose.yml +++ b/testing-generator-docker-compose.yml @@ -71,7 +71,7 @@ services: environment: ACTIVEMQ_HOST: activemq CSV_INPUT_PATH: /input - APPLICATION_NAME: demo + APPLICATION_NAME: demofcr influxdb: build: @@ -101,6 +101,8 @@ services: dockerfile: Dockerfile volumes: - ./javaLibrary:/javaLibrary + - ./config:/config + - ./tests/data_input:/input env_file: - config/testing-sl-solver-properties.env ports: @@ -109,6 +111,8 @@ services: - modeldb environment: - WORKERS_PER_CORE=3 # FastApi requirements + - MOCK_INFLUX=True + - MOCK_CSV_PATH=/input/test.csv listener: diff --git a/tests/MockInflux/MockInflux/config.py b/tests/MockInflux/MockInflux/config.py index 40f9388..5f067d9 100644 --- a/tests/MockInflux/MockInflux/config.py +++ b/tests/MockInflux/MockInflux/config.py @@ -1,6 +1,6 @@ import os -APPLICATION_NAME = os.environ.get("APPLICATION_NAME", "demo") +APPLICATION_NAME = os.environ.get("APPLICATION_NAME", "demofcr") ACTIVEMQ_HOSTNAME = os.environ.get("ACTIVEMQ_HOSTNAME", "activemq") ACTIVEMQ_PORT = int(os.environ.get("ACTIVEMQ_PORT", "61613")) diff --git a/tests/test.py b/tests/test.py index 313c9df..0a89f4e 100644 --- a/tests/test.py +++ b/tests/test.py @@ -13,14 +13,13 @@ from cdoAdapter import CDOAdapter def run_generator(): cpproblem = Path("FCRclear-CP.xmi") - CDOAdapter.importResourceFromFile(cpproblem, "demo-fcr") - + CDOAdapter.importResourceFromFile(cpproblem, "demofcr") params = { "applicationId": "demo", "trainRatio": 0.9, - "cdoProblemPath": "demo-fcr", - "returnEndpoint": "http://127.0.0.1:8081", + "cdoProblemPath": "demofcr", + "returnEndpoint": "http://listener:8081/", # "startTimestamp": "2021-06-17T12:21:39.643Z", # "finishTimestamp": "2021-06-17T12:21:39.643Z", "delta": 30, -- GitLab From b559baa57cc5c183f7e8023862cb2131273bc743 Mon Sep 17 00:00:00 2001 From: Jakub Kowalski Date: Fri, 18 Jun 2021 01:40:54 +0200 Subject: [PATCH 2/2] Merged env vars --- DataGenerator/src/DataGeneratorService/config.py | 3 +-- DataGenerator/src/DataGeneratorService/fastapi_app.py | 6 +++--- testing-generator-docker-compose.yml | 3 +-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/DataGenerator/src/DataGeneratorService/config.py b/DataGenerator/src/DataGeneratorService/config.py index eea1672..cc00c6e 100644 --- a/DataGenerator/src/DataGeneratorService/config.py +++ b/DataGenerator/src/DataGeneratorService/config.py @@ -19,5 +19,4 @@ DEFAULT_DELTA = dt.timedelta(seconds=int(os.getenv("DEFAULT_DELTA"))) if os.gete else dt.timedelta(seconds=30) LOGLEVEL = os.environ.get("LOGLEVEL", "DEBUG") -MOCK_INFLUX = os.environ.get("MOCK_INFLUX") == "True" -MOCK_CSV_PATH = os.environ.get("MOCK_CSV_PATH") \ No newline at end of file +MOCK_INFLUX_CSV_PATH = os.environ.get("MOCK_INFLUX_CSV_PATH") \ No newline at end of file diff --git a/DataGenerator/src/DataGeneratorService/fastapi_app.py b/DataGenerator/src/DataGeneratorService/fastapi_app.py index e8a5312..6045b61 100644 --- a/DataGenerator/src/DataGeneratorService/fastapi_app.py +++ b/DataGenerator/src/DataGeneratorService/fastapi_app.py @@ -14,7 +14,7 @@ from lxml import etree from DataGenerator.models import CPSolution from DataGenerator.solution_cache import CPQuery, SolutionCache from DataGeneratorService.config import INFLUXDB_HOSTNAME, INFLUXDB_PORT, INFLUXDB_USERNAME, INFLUXDB_PASSWORD, \ - INFLUXDB_DBNAME, MAX_PARALLEL_TASKS, LOGLEVEL, MOCK_INFLUX, MOCK_CSV_PATH + INFLUXDB_DBNAME, MAX_PARALLEL_TASKS, LOGLEVEL, MOCK_INFLUX_CSV_PATH from DataGeneratorService.cptask_cache import CPTaskCache from DataGeneratorService.models import DataGenerationInfo, DataGenerationResult, JobResult, CPSolverNotification from DataGeneratorService.tasks import collect, run_task @@ -55,8 +55,8 @@ async def generate_data(data_generation_info: DataGenerationInfo) -> None: 'path_dataset': tempdir } - if MOCK_INFLUX: - data_provider = CSVReader(MOCK_CSV_PATH) + if MOCK_INFLUX_CSV_PATH is not None: + data_provider = CSVReader(MOCK_INFLUX_CSV_PATH) else: data_provider = InfluxDataProvider(applicationId=data_generation_info.applicationId, startTimestamp=data_generation_info.startTimestamp, diff --git a/testing-generator-docker-compose.yml b/testing-generator-docker-compose.yml index 017007c..1bc21cd 100644 --- a/testing-generator-docker-compose.yml +++ b/testing-generator-docker-compose.yml @@ -111,8 +111,7 @@ services: - modeldb environment: - WORKERS_PER_CORE=3 # FastApi requirements - - MOCK_INFLUX=True - - MOCK_CSV_PATH=/input/test.csv + - MOCK_INFLUX_CSV_PATH=/input/test.csv listener: -- GitLab