diff --git a/FCRgendata/Dockerfile b/FCRgendata/Dockerfile index 43d3deddf1377cbe38dedd1623ff7883d37f0962..1c86250911114916e9f08149568f270f38225cd3 100644 --- a/FCRgendata/Dockerfile +++ b/FCRgendata/Dockerfile @@ -9,4 +9,4 @@ COPY ./src /app/ COPY utility-generator-jar-with-dependencies.jar /app/ ENV CLASSPATH /app/utility-generator-jar-with-dependencies.jar -ENTRYPOINT ["uvicorn", "FCRgendata.fastapi_app:app"] \ No newline at end of file +ENTRYPOINT ["uvicorn", "FCRgendata.fastapi_app:app", "--port", "80", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/FCRgendata/src/FCRgendata/cptask_cache.py b/FCRgendata/src/FCRgendata/cptask_cache.py index aa8b0efe730fae134d40cfaea09e44701057c83a..183809767b2c91fd1c245bc7f1b9aa8009c22913 100644 --- a/FCRgendata/src/FCRgendata/cptask_cache.py +++ b/FCRgendata/src/FCRgendata/cptask_cache.py @@ -8,7 +8,7 @@ import pymongo from FCRgendata.config import DEFAULT_MONGODB_URI from FCRgendata.models import CPSolverTaskInfo, CPSolverJobInfo, CPSolution, CPQuery, DataGenerationInfo -DEFAULT_TABLE_NAME: str = 'cptasks' +DEFAULT_TABLE_NAME: str = 'generator_cptasks' class CPTaskCache: diff --git a/FCRgendata/src/FCRgendata/fastapi_app.py b/FCRgendata/src/FCRgendata/fastapi_app.py index 0aac014ddb94adc3e56a65a3da3a8c377b20d829..48224d90e42d7526a5f452c24422bc0539cb0038 100644 --- a/FCRgendata/src/FCRgendata/fastapi_app.py +++ b/FCRgendata/src/FCRgendata/fastapi_app.py @@ -32,43 +32,46 @@ def generate_data(data_generation_info: DataGenerationInfo) -> None: Args: data_generation_info: DataModel cotaining all information needed to generate data. """ - configs = { - 'hostname': INFLUXDB_HOSTNAME, - 'port': INFLUXDB_PORT, - 'username': INFLUXDB_USERNAME, - 'password': INFLUXDB_PASSWORD, - 'dbname': INFLUXDB_DBNAME - } - data_provider = InfluxDataProvider(applicationId=data_generation_info.applicationId, - startTimestamp=data_generation_info.startTimestamp, - finishTimestamp=data_generation_info.finishTimestamp, - influxConfigs=configs) - # data interpolation - interpolated_data = InterpolatedDataStream(data_provider, - _timestamp_generator(data_provider.peek_t0(), - data_generation_info.delta), - data_provider.column_names) - - queries: List[CPQuery] = [] - it = 0 - for row in interpolated_data: - parameters = {key: str(value) for key, value in row} - query = CPQuery(applicationId=data_generation_info.applicationId, - parameters=parameters, - configuration={}, - it=it) - it += 1 - queries.append(query) - - job_info = task_cache.create_job(jobAmount=len(queries), - data_generation_info=data_generation_info) - - summoner = RemoteCPSolverSolutionSummoner(solution_cache=solution_cache, - task_cache=task_cache) - for query in queries: - summoner.get_solution(jobId=job_info.jobId, - cdo_cpproblem_path=data_generation_info.cdoProblemPath, - query=query) + with tempfile.TemporaryDirectory() as tempdir: + configs = { + 'hostname': INFLUXDB_HOSTNAME, + 'port': INFLUXDB_PORT, + 'username': INFLUXDB_USERNAME, + 'password': INFLUXDB_PASSWORD, + 'dbname': INFLUXDB_DBNAME, + 'path_dataset': tempdir.name + + } + data_provider = InfluxDataProvider(applicationId=data_generation_info.applicationId, + startTimestamp=data_generation_info.startTimestamp, + finishTimestamp=data_generation_info.finishTimestamp, + influxConfigs=configs) + # data interpolation + interpolated_data = InterpolatedDataStream(data_provider, + _timestamp_generator(data_provider.peek_t0(), + data_generation_info.delta), + data_provider.column_names) + + queries: List[CPQuery] = [] + it = 0 + for row in interpolated_data: + parameters = {key: str(value) for key, value in row} + query = CPQuery(applicationId=data_generation_info.applicationId, + parameters=parameters, + configuration={}, + it=it) + it += 1 + queries.append(query) + + job_info = task_cache.create_job(jobAmount=len(queries), + data_generation_info=data_generation_info) + + summoner = RemoteCPSolverSolutionSummoner(solution_cache=solution_cache, + task_cache=task_cache) + for query in queries: + summoner.get_solution(jobId=job_info.jobId, + cdo_cpproblem_path=data_generation_info.cdoProblemPath, + query=query) @app.post('/generate', diff --git a/FCRgendata/src/FCRgendata/rawDataReader/influx_data_provider.py b/FCRgendata/src/FCRgendata/rawDataReader/influx_data_provider.py index 76f4426ce9502293094ec9e7e6582e03e06806f3..e74f0f1d3792bd56729bbcaca8118ab281c0a532 100644 --- a/FCRgendata/src/FCRgendata/rawDataReader/influx_data_provider.py +++ b/FCRgendata/src/FCRgendata/rawDataReader/influx_data_provider.py @@ -49,16 +49,23 @@ class InfluxDataProvider(IRowDataProvider): self._finishTimestamp = finishTimestamp if startTimestamp is not None: - startTimestamp_sql = startTimestamp.strftime('%Y-%m-%d %H:%M:%S') + # Workaround because DatasetMaker requires time period in MySql time format + # and it must be 1 word -> we are formatting it to seconds since now :) + time_difference = dt.datetime.now() - startTimestamp + startTimestamp_sql = f'{round(time_difference.total_seconds())}s' else: startTimestamp_sql = None datasetmaker = DatasetMaker(applicationId, startTimestamp_sql, influxConfigs) - headers, data = datasetmaker.getData() + raw_data = datasetmaker.getData() + headers = raw_data[0] + data = raw_data[1] time_columns = _match_columns(headers, TIME_COLUMN_NAMES) timestamp_columns = _match_columns(headers, self._timestamp_column_names) - + if len(timestamp_columns) == 0: + if 'time' in time_columns: + timestamp_columns.append('time') assert len( timestamp_columns) == 1, f'Cannot specify timestamp column, found column names: {self._headers}' self._timestamp_column_name = timestamp_columns[0] diff --git a/FCRgendata/src/FCRgendata/solution_cache.py b/FCRgendata/src/FCRgendata/solution_cache.py index e896303361b55d933bc38ac923509dfa94277b57..1c9c080874aa5730a85f53c07899356caa48e7f6 100644 --- a/FCRgendata/src/FCRgendata/solution_cache.py +++ b/FCRgendata/src/FCRgendata/solution_cache.py @@ -6,7 +6,7 @@ import pymongo from FCRgendata.config import DEFAULT_MONGODB_URI from FCRgendata.models import CPQuery, CPSolution -DEFAULT_TABLE_NAME: str = 'cpcache' +DEFAULT_TABLE_NAME: str = 'generator_cpcache' class SolutionCache: diff --git a/controller/README.md b/controller/README.md new file mode 100644 index 0000000000000000000000000000000000000000..696a25e93ed91b635962ff82444dc46f8df26c8b --- /dev/null +++ b/controller/README.md @@ -0,0 +1,11 @@ +# Controller +The controller application controls the AS Solver workflow and allows communication with it via REST. +The requests should have a JSON content with appropriate fields filled. +The required fields are described in /src/models directory + +# Rest end-points to communicate with our Solver + +- [POST] /train - Train the network with models extracted from CDO +- [POST] /trainFromFile - Train the network with models extracted from files +- [POST] /solveCP - Solve the constraint problem with models extracted from CDO +- [POST] /solveCPFromFile - Solve the constraint problem with models extracted from files \ No newline at end of file diff --git a/controller/src/cdoAdapter/cdoAdapter.py b/controller/src/cdoAdapter/cdoAdapter.py index 2e1fb149019db4f0dd5bbbad2a454de2e65db060..16f4c19b421df6b0e7fa41278f6f942f3506e3f9 100644 --- a/controller/src/cdoAdapter/cdoAdapter.py +++ b/controller/src/cdoAdapter/cdoAdapter.py @@ -1,8 +1,10 @@ -from typing import Dict +import os +import logging from numbers import Number -from jnius import autoclass from pathlib import Path -import os +from typing import Dict + +from jnius import autoclass, JavaException CDOAdapterClass = autoclass("eu.melodic.upperware.utilitygenerator.cdo.CDOAdapter") ArrayList = autoclass("java.util.ArrayList") @@ -91,3 +93,32 @@ class CDOAdapter: cp_file_path.absolute().as_posix(), node_candidates_file_path.absolute().as_posix(), application_id, notification_uri, request_uuid, esb_url) + + @staticmethod + def notifySolutionNotApplied(application_id: str, + notification_uri: str, + uuid: str) -> None: + """Notifies the ESB that the solution has not been applied + + Args: + application_id: unique id of the application + notification_uri: uri of the ESB at which a notification will be sent + uuid: request uuid which will be passed in the notification to ESB + """ + try: + esb_url = os.environ['esb.url'] + CDOAdapterClass.notifySolutionNotApplied(application_id, notification_uri, uuid, esb_url) + except (KeyError, JavaException) as e: + logging.error(f"Failed to notify ESB about not applying the solution.") + + @staticmethod + def createCacheKey(cdo_cp_path: str) -> str: + """Calculates and returns the cache key for node candidates based on the constraint problem CDO path + + Args: + cdo_cp_path: path in the CDO for the constraint problem + + Returns: + The calculated cache key + """ + return CDOAdapterClass.createCacheKey(cdo_cp_path) diff --git a/controller/src/cdoAdapter/java-src/CDOAdapter.java b/controller/src/cdoAdapter/java-src/CDOAdapter.java index 0d1e21e1c315efa9521f1420c03b656dc660fb3e..32726dd500a556ac2563c709e74a2b8d0978004d 100644 --- a/controller/src/cdoAdapter/java-src/CDOAdapter.java +++ b/controller/src/cdoAdapter/java-src/CDOAdapter.java @@ -118,14 +118,14 @@ public class CDOAdapter { .findFirst(); } - private static void notifySolutionProduced(String camelModelID, String notificationUri, String uuid, String esbUrl) { + public static void notifySolutionProduced(String camelModelID, String notificationUri, String uuid, String esbUrl) { log.info("Sending solution available notification"); NotificationResult result = prepareSuccessNotificationResult(); ConstraintProblemSolutionNotificationRequest notification = prepareNotification(camelModelID, result, uuid); sendNotification(notification, notificationUri, esbUrl); } - private static void notifySolutionNotApplied(String camelModelID, String notificationUri, String uuid, String esbUrl) { + public static void notifySolutionNotApplied(String camelModelID, String notificationUri, String uuid, String esbUrl) { log.info("Sending solution NOT available notification"); NotificationResult result = prepareErrorNotificationResult("Solution was not generated."); ConstraintProblemSolutionNotificationRequest notification = prepareNotification(camelModelID, result, uuid); @@ -184,6 +184,11 @@ public class CDOAdapter { return VariableValueDTOFactory.createElement(name, value); } + // Creates and returns a Node Candidates Cache key based on the cdoResourcePath + public String createCacheKey(String cdoResourcePath) { + return cdoResourcePath.substring(cdoResourcePath.indexOf("/") + 1); + } + // This method saves the given solution to CDO and notifies ESB that a solution has been produced. public static void saveSolutionToCDO(List solution, String cpCDOPath, String camelModelFilePath, String cpFilePath, String nodeCandidatesFilePath, String applicationId, diff --git a/controller/src/controller_application.py b/controller/src/controller_application.py new file mode 100644 index 0000000000000000000000000000000000000000..84d5bddc41b60d970b552dd5a052194f8708ea54 --- /dev/null +++ b/controller/src/controller_application.py @@ -0,0 +1,98 @@ +import logging + +from fastapi import FastAPI, BackgroundTasks, HTTPException + +from .cdoAdapter.cdoAdapter import CDOAdapter +from .models.solveModels import SolveFromFileInfo, SolveInfo +from .models.trainModels import TrainInfo, TrainFromFileInfo, DataGeneratedInfo, NetworkTrainedInfo +from .solving_controller import SolvingController +from .training_controller import TrainingController + +logging.basicConfig(level=logging.INFO) +app = FastAPI() + + +@app.post('/train') +async def train(train_info: TrainInfo, + background_tasks: BackgroundTasks): + """ + Train the network with models extracted from CDO + """ + logging.info(f'Received train request for application id: {train_info.application_id}') + train_from_file_info = None + try: + train_from_file_info = TrainingController.transformToTrainFromFileInfo(train_info) + except KeyError: + raise HTTPException(status_code=404, + detail="For this application id it couldn't download resources from CDO or Memcache") + + background_tasks.add_task(TrainingController.TrainfromFile, train_from_file_info=train_from_file_info) + return "OK" + + +@app.post('/trainFromFile') +async def trainFromFile(train_from_file_info: TrainFromFileInfo, + background_tasks: BackgroundTasks): + """ + Train the network with models extracted from files + """ + logging.info(f'Received train from file request for application id: {train_from_file_info.application_id}') + background_tasks.add_task(TrainingController.TrainfromFile, train_from_file_info=train_from_file_info) + return "OK" + + +@app.post('/dataGenerated') +async def dataGenerated(data_generated_info: DataGeneratedInfo, + background_tasks: BackgroundTasks): + """ + Endpoint for data generator to signal that data generation for a given application has finished + """ + logging.info( + f'Received notification that data generation has finished for application id: {data_generated_info.application_id}') + background_tasks.add_task(TrainingController.requestNetworkTraining, data_generated_info=data_generated_info) + return "OK" + + +@app.post('/networkTrained') +async def networkTrained(network_trained_info: NetworkTrainedInfo, + background_tasks: BackgroundTasks): + """ + Endpoint for supervised gym to signal that network training for a given application has finished + """ + logging.info( + f'Received notification that network training has finished for application id: {network_trained_info.application_id}') + background_tasks.add_task(TrainingController.checkTrainingEffects, network_trained_info=network_trained_info) + return "OK" + + +@app.post('/constraintProblemSolution') +async def constraintProblemSolution(solve_info: SolveInfo): + """ + Predict configuration using a previously trained network with data extracted from Influx. + """ + logging.info(f'Received predict configuration request for application id: {solve_info.application_id}') + prediction = {} + try: + prediction = SolvingController.solve(solve_info=solve_info) + except Exception as e: + logging.error(f"{solve_info.application_id}: Error occurred: {e}") + CDOAdapter.notifySolutionNotApplied(solve_info.application_id, solve_info.notification_uri, solve_info.request_uuid) + raise + return prediction + + +@app.post('/constraintProblemSolutionFromFile') +async def constraintProblemSolutionFromFile(solve_from_file_info: SolveFromFileInfo): + """ + Predict configuration using a previously trained network with data extracted from file. + """ + logging.info( + f'Received predict configuration from file request for application id: {solve_from_file_info.application_id}') + prediction = {} + try: + prediction = SolvingController.solveFromFile(solve_from_file_info=solve_from_file_info) + except Exception as e: + logging.error(f"{solve_from_file_info.application_id}: Error occurred: {e}") + CDOAdapter.notifySolutionNotApplied(solve_from_file_info.application_id, solve_from_file_info.notification_uri, solve_from_file_info.request_uuid) + raise + return prediction diff --git a/controller/src/solving_controller.py b/controller/src/solving_controller.py new file mode 100644 index 0000000000000000000000000000000000000000..02587b1b2717c14f2c24a7d5b46b6acf46ef23fe --- /dev/null +++ b/controller/src/solving_controller.py @@ -0,0 +1,230 @@ +import json +import logging +import os +from pathlib import Path +from typing import Dict, List, Union +from numbers import Number +from uuid import uuid1 + +import requests +from fastapi import HTTPException +from modeldb.ModelDBClient import ModelDBClient, TrainingStatus +from jnius import JavaException +from .cdoAdapter.cdoAdapter import CDOAdapter +from .models.solveModels import SolveInfo, SolveFromFileInfo + + +class SolvingController: + """ + With the use of the Supervised Learning Gym, it generates a new configuration and saves it to CDO + """ + + @staticmethod + def _loadInputFromInflux(solve_info: SolveInfo) -> Dict[str, List[Union[float, int]]]: + """Loads metrics and predictions from Influx. + + Args: + solve_info: information passed in the request to the controller + + Returns: + Dictionary containing lists of metrics to be used by the supervised gym to predict configuration. + """ + logging.info(f'{solve_info.application_id}: Obtain data from Influx for input to network') + + # TODO get data from data generator + """ + data_generator_host = os.environ['DATA_GENERATOR_HOST'] + data_generator_port = os.environ['DATA_GENERATOR_PORT'] + r = requests.post(f'http://{data_generator_host}:{data_generator_port}/someEndpointName', json={ + }) + if r.status_code >= 400: + raise HTTPException(r.status_code, + f"Error obtaining metrics and predictions from Data Generator, as it returned: {r.text}") + return r.json() + """ + + def mockNetworkInput() -> Dict[str, List[Union[float, str, int]]]: + """ Prepares mock Genom input for a network. Will be deleted. + """ + input_labels = ['SimulationElapsedTimeRawMetric', 'SimulationLeftNumberRawMetric', 'NotFinishedOnTime', + 'EstimatedRemainingTime', 'WillFinishTooSoon', 'ETPercentileRawMetric', + 'SimulationLeftNumberRawMetricPrediction', 'NotFinishedOnTimePrediction', + 'EstimatedRemainingTimePrediction', 'WillFinishTooSoonPrediction', + 'ETPercentileRawMetricPrediction', 'MinimumCoresPrediction', 'MinimumCoresReal', + 'MinimumCoresContextPrediction', 'MinimumCoresContextReal', 'ActWorkerCardinality', + 'ActWorkerCores'] + network_input = {} + for label in input_labels: + values = [500 for _ in range(60)] + network_input[label] = values + return network_input + + return mockNetworkInput() + + @staticmethod + def _saveSolutionToCDO(application_id, + solution: Dict[str, Number], + proc_finished_uri, + request_uuid: str, + cp_CDO_path: str) -> None: + """Saves the provided solution to CDO and notifies the ESB that the solution was generated. + + Args: + application_id: unique id of the application + solution: configuration generated by the supervised gym to be saved to CDO + proc_finished_uri: uri of the ESB at which a notification will be sent + request_uuid: request uuid which will be passed in the notification to ESB + cp_CDO_path: path to the constraint problem in CDO to which the solution should be saved + + Raises: + HTTPException: when it could not download the necessary files or it failed to save the solution to CDO. + """ + logging.info(f"{application_id}: Saving generated solution to CDO.") + + # Downloading necessary files from ModelDB and CDO + uuid = uuid1() + cp_path = Path(f'CP-{application_id}-{uuid}') + camel_path = Path(f'camel-model-{application_id}-{uuid}') + nodes_path = Path(f'node-candidates-{application_id}-{uuid}') + try: + client = ModelDBClient() + client.exportCamelModel(application_id, camel_path) + client.exportNodeCandidates(application_id, nodes_path) + CDOAdapter.exportResourceToFile(cp_CDO_path, cp_path) + except KeyError: + cp_path.unlink(missing_ok=True) + camel_path.unlink(missing_ok=True) + nodes_path.unlink(missing_ok=True) + logging.error( + f'{application_id}: Could not export files from model DB because at least one of them is not there') + raise HTTPException(404, + f'Did not find all the necessary files in Model DB for application id: {application_id}') + + # Utilizing the CDO adapter to save the solution to CDO and send a notification to ESB + try: + CDOAdapter.saveSolutionToCDO(solution=solution, + cp_CDO_path=cp_CDO_path, + camel_model_file_path=camel_path, + cp_file_path=cp_path, + node_candidates_file_path=nodes_path, + application_id=application_id, + notification_uri=proc_finished_uri, + request_uuid=request_uuid) + except KeyError: + logging.error(f"{application_id}: Failed to save the solution to CDO, as the esb.url environmental variable has not been set") + raise HTTPException(404, + f'Failed to save the solution to CDO for application id: {application_id}') + except JavaException as e: + logging.error(f"{application_id}: Failed to save the solution to CDO, because of the error: {e}") + raise HTTPException(404, + f'Failed to save the solution to CDO for application id: {application_id}') + finally: + cp_path.unlink(missing_ok=True) + camel_path.unlink(missing_ok=True) + nodes_path.unlink(missing_ok=True) + + @staticmethod + def _sendPredictionRequest(application_id: str, + input_data: Dict[str, List[Union[float, int]]]) -> Dict[str, Number]: + """Sends a request to the supervised gym to predict a configuration. + + Args: + application_id: unique id of the application + input_data: dictionary containing lists of metrics to be used by the supervised gym to predict configuration + + Returns: + Configuration calculated by the supervised gym + + Raises: + HTTPException: when the supervised gym returns a response with an error code + """ + logging.info(f"{application_id}: Requesting configuration prediction from the supervised gym") + gym_host = os.environ['SUPERVISED_GYM_HOST'] + gym_port = os.environ['SUPERVISED_GYM_PORT'] + + r = requests.post(f'http://{gym_host}:{gym_port}/predictConfiguration', json={ + "application_id": application_id, + "network_input": input_data + }) + + if r.status_code >= 400: + raise HTTPException(r.status_code, + f"Error obtaining a prediction from Supervised Gym, as it returned: {r.text}") + return r.json() + + @staticmethod + def _calculateSolution(application_id: str, + proc_finished_uri: str, + input_data: Dict[str, List[Union[float, int]]], + request_uuid: str, + cp_CDO_path: str) -> Dict[str, Number]: + """Checks if there is a model trained for this application id. Then calculates and saves the solution to CDO. + + Args: + application_id: unique id of the application + proc_finished_uri: uri of the ESB at which a notification will be sent + input_data: dictionary containing lists of metrics to be used by the supervised gym to predict configuration + request_uuid: request uuid which will be passed in the notification to ESB + cp_CDO_path: path to the constraint problem in CDO to which the solution should be saved + + Returns: + Configuration calculated by the supervised gym + + Raises: + HTTPException: when the network was not trained or it could not calculate and save the solution to CDO + """ + try: + document = ModelDBClient().loadTrainingStatus(application_id=application_id) + if document["status"] != TrainingStatus.SUCCESS: + raise HTTPException(404, "The network hasn't been trained successfully yet") + except KeyError: + raise HTTPException(404, "The network hasn't been trained successfully yet") + + solution = SolvingController._sendPredictionRequest(application_id, input_data) + SolvingController._saveSolutionToCDO(application_id, solution, proc_finished_uri, request_uuid, cp_CDO_path) + return solution + + @staticmethod + def solveFromFile(solve_from_file_info: SolveFromFileInfo) -> Dict[str, Number]: + """Calculate a configuration extracting model input from file. + + Args: + solve_from_file_info: information passed in the request to the controller + + Returns: + Configuration calculated by the supervised gym + + Raises: + HTTPException: when the input file was not found or it failed to calculate the solution + """ + input_data = {} + try: + with open(solve_from_file_info.input_file_path, mode="r") as input_file: + input_data = json.load(input_file) + except FileNotFoundError: + raise HTTPException(404, "Could not find the input file") + return SolvingController._calculateSolution(solve_from_file_info.application_id, + solve_from_file_info.notification_uri, + input_data, + solve_from_file_info.request_uuid, + solve_from_file_info.cp_CDO_path) + + @staticmethod + def solve(solve_info: SolveInfo) -> Dict[str, Number]: + """Calculate a configuration extracting model input from Influx. + + Args: + solve_info: information passed in the request to the controller + + Returns: + Configuration calculated by the supervised gym + + Raises: + HTTPException: when it failed to download data from Influx or could not calculate the solution + """ + input_data = SolvingController._loadInputFromInflux(solve_info) + return SolvingController._calculateSolution(solve_info.application_id, + solve_info.notification_uri, + input_data, + solve_info.request_uuid, + solve_info.cp_CDO_path) diff --git a/controller/src/training_controller.py b/controller/src/training_controller.py new file mode 100644 index 0000000000000000000000000000000000000000..f6a2cae8422780e760526fa0ce14368a01d14e0b --- /dev/null +++ b/controller/src/training_controller.py @@ -0,0 +1,396 @@ +import logging +import os +from pathlib import Path +from uuid import uuid1 + +import requests +from jnius import JavaException +from modeldb.ModelDBClient import ModelDBClient, TrainingStatus + +from .cdoAdapter.cdoAdapter import CDOAdapter +from .models.trainModels import TrainInfo, TrainFromFileInfo, DataGeneratedInfo, NetworkTrainedInfo +from .nodeCandidatesCacheAdapter.nodeCandidatesCacheAdapter import NodeCandidatesCacheAdapter +from .utilityEvaluator.UtilityEvaluator import UtilityEvaluator + + +class TrainingController: + """ + With the use of the Training Data Generator and Supervised Learning Gym, + it trains the network. + """ + + @staticmethod + async def TrainfromFile(train_from_file_info: TrainFromFileInfo) -> None: + """Schedules network training while extracting models from provided files. + + Args: + train_from_file_info: training information passed in the request to the controller + """ + try: + if train_from_file_info.force_retrain is False: + TrainingController._checkTheTrainingStatus(train_from_file_info.application_id) + + cp_cdo_path = train_from_file_info.application_id + "-SLSolver-" + str(uuid1()) + TrainingController._saveTrainInfoToModelDBandCDO(train_from_file_info.application_id, + Path(train_from_file_info.cp_file_path), + Path(train_from_file_info.camel_model_file_path), + Path(train_from_file_info.node_candidates_file_path), + cp_cdo_path) + + TrainingController._sendRequestForDataGeneration(train_from_file_info.application_id, + train_from_file_info.proc_finished_url, + train_from_file_info.max_utility_error, + cp_cdo_path) + + except WrongStatusForTrainingException: + TrainingController._notifyTrainingFailed( + train_from_file_info.application_id, + train_from_file_info.proc_finished_url, + "Cannot handle the train request because model is well trained or training is already underway.\n" + + "Forcing retraining the network can be done by setting force_retrain to True.\n" + + "But be wary that forcing retraining when another training is in progress may lead to undefined behaviour.") + return + except (FileNotFoundError, CDOException): + TrainingController._notifyTrainingFailed( + train_from_file_info.application_id, + train_from_file_info.proc_finished_url, + "Failed importing files from file system to Model DB and CDO") + return + except Exception as e: + logging.error(f"{train_from_file_info.application_id}: Error occurred: {e}") + TrainingController._notifyTrainingFailed( + train_from_file_info.application_id, + train_from_file_info.proc_finished_url, + "Failed the training due to unknown error.") + raise + + @staticmethod + def requestNetworkTraining(data_generated_info: DataGeneratedInfo) -> None: + """Schedules network training by the supervised gym. + + Args: + data_generated_info: information passed in the request to the controller + """ + client = ModelDBClient() + document = client.loadTrainingStatus(application_id=data_generated_info.application_id) + proc_finished_url = document["proc_finished_url"] + + try: + if data_generated_info.status == "successful": + TrainingController._sendRequestForNetworkTraining(data_generated_info.application_id) + else: + TrainingController._notifyTrainingFailed(data_generated_info.application_id, proc_finished_url, + "Error occurred during training data generation") + except Exception as e: + logging.error(f"{data_generated_info.application_id}: Error occurred: {e}") + TrainingController._notifyTrainingFailed(data_generated_info.application_id, + proc_finished_url, + "Error occurred while requesting network training.") + raise + + @staticmethod + async def checkTrainingEffects(network_trained_info: NetworkTrainedInfo) -> None: + """Checks whether the network has trained well enough. + + Args: + network_trained_info: information passed in the request to the controller + """ + client = ModelDBClient() + document = client.loadTrainingStatus(application_id=network_trained_info.application_id) + proc_finished_url = document["proc_finished_url"] + + try: + if network_trained_info.status == "successful": + if UtilityEvaluator.checkUtilityFuntionMAE(network_trained_info): + TrainingController._notifyTrainingSuccess(network_trained_info.application_id, + proc_finished_url, + "Network has successfully trained") + else: + TrainingController._notifyTrainingFailed(network_trained_info.application_id, + proc_finished_url, + "Network did not pass the max utility error threshold test.") + + else: + TrainingController._notifyTrainingFailed(network_trained_info.application_id, + proc_finished_url, + "Error occurred during network training") + except Exception as e: + logging.error(f"{network_trained_info.application_id}: Error occurred: {e}") + TrainingController._notifyTrainingFailed(network_trained_info.application_id, + proc_finished_url, + "Error occurred while checking training effects") + raise + + @staticmethod + def transformToTrainFromFileInfo(train_info: TrainInfo) -> TrainFromFileInfo: + """Downloads models to files transforming the given train information to TrainFromFileInfo + + Args: + train_info: training information describing from where to extract necessary models + + Returns: + Train from file information describing file paths at which the models are stored. + + Raises: + KeyError: when the controller could not download files from the given paths + """ + cp_path = Path(f'CP-{train_info.application_id}') + camel_path = Path(f'camel-model-{train_info.application_id}') + nodes_path = Path(f'node-candidates-{train_info.application_id}') + + try: + TrainingController._downloadModelsToFileSystem(train_info, cp_path, camel_path, nodes_path) + except (CDOException, KeyError): + TrainingController._notifyTrainingFailed( + train_info.application_id, + train_info.proc_finished_url, + "Failed while downloading files from CDO and Memcache") + raise KeyError + + train_from_file_info = TrainFromFileInfo(application_id=train_info.application_id, + cp_file_path=cp_path.absolute().as_posix(), + camel_model_file_path=camel_path.absolute().as_posix(), + node_candidates_file_path=nodes_path.absolute().as_posix(), + proc_finished_url=train_info.proc_finished_url, + max_utility_error=train_info.max_utility_error, + force_retrain=train_info.force_retrain) + return train_from_file_info + + @staticmethod + def _checkTheTrainingStatus(application_id: str) -> None: + """Checks if the training status allows for a network to be trained for this application_id + + Args: + application_id: unique id of the application + + Raises: + WrongStatusForTrainingException: when the model for this application id is either trained or undergoing training + """ + client = ModelDBClient() + try: + train_status = client.loadTrainingStatus(application_id=application_id)["status"] + except KeyError: + return # It means this is the first time we want to train application of this id, so its fine + + if train_status is not TrainingStatus.FAILED: + # There is a trained network for this application_id or another train request is underway + raise WrongStatusForTrainingException + + @staticmethod + def _saveTrainInfoToModelDBandCDO(application_id: str, + cp_path: Path, + camel_path: Path, + nodes_path: Path, + cp_cdo_path: str) -> None: + """Imports the models to the Model DB and CDO databases to be later used by other services. + + Args: + application_id: unique id of the application + cp_path: file path at which the constraint problem is stored + camel_path: file path at which the camel model is stored + nodes_path: file path at which the node candidates are stored + cp_cdo_path: path in CDO under which the constraint problem will be stored + + Raises: + FileNotFoundError: when it could not import files to model DB + CDOException: when it failed to save the constraint problem to CDO under the key cp_cdo_path + """ + logging.info( + f'{application_id}: Saving CP, camel model and node candidates to modelDB and CDO') + client = ModelDBClient() + try: + client.importCP(application_id, cp_path) + client.importCamelModel(application_id, camel_path) + client.importNodeCandidates(application_id, nodes_path) + if CDOAdapter.importResourceFromFile(cp_path, cp_cdo_path) is False: + raise CDOException + except FileNotFoundError: + logging.error( + f'{application_id}: Could not import files to model DB because at least one of them does not exist: ' + f'{cp_path.absolute().as_posix()} {camel_path.absolute().as_posix()} {nodes_path.absolute().as_posix()}') + raise + finally: + cp_path.unlink(missing_ok=True) + camel_path.unlink(missing_ok=True) + nodes_path.unlink(missing_ok=True) + + @staticmethod + def _downloadModelsToFileSystem(train_info: TrainInfo, + cp_path: Path, + camel_path: Path, + nodes_path: Path) -> None: + """Downloads the models from CDO and Memcache into the filesystem at specified paths + + Args: + train_info: training information describing from where to extract necessary models + cp_path: file path at which the constraint problem will be stored + camel_path: file path at which the camel model will be stored + nodes_path: file path at which the node candidates will be stored + + Raises: + CDOException: when there was an error with downloading recources from CDO + KeyError: when it could not download the node candidates from Memcache + """ + logging.info( + f'{train_info.application_id}: Downloading Constraint Problem, Camel model and Node Candidates') + download_successful = True + try: + if not CDOAdapter.exportResourceToFile(train_info.cp_cdo_path, cp_path): + download_successful = False + logging.error( + f'Could not download Constraint Problem from CDO for the given key: {train_info.cp_cdo_path}') + + if not CDOAdapter.exportResourceToFile(train_info.camel_model_cdo_path, camel_path): + download_successful = False + logging.error( + f'Could not download Camel Model from CDO for the given key: {train_info.camel_model_cdo_path}') + except JavaException: + logging.error('Error occurred while downloading resources from CDO') + download_successful = False + + if download_successful is False: + cp_path.unlink(missing_ok=True) + camel_path.unlink(missing_ok=True) + raise CDOException + + cache_key = CDOAdapter.createCacheKey(train_info.cp_cdo_path) + if not NodeCandidatesCacheAdapter.exportNodeCandidatesToFile(cache_key, + nodes_path): + logging.error(f'Could not download Node Candidates from Memcache for the given key: {cache_key}') + cp_path.unlink(missing_ok=True) + camel_path.unlink(missing_ok=True) + nodes_path.unlink(missing_ok=True) + raise KeyError(cache_key) + + + @staticmethod + def _sendRequestForNetworkTraining(application_id: str) -> None: + """Sends a request to the supervised gym to begin model training for this application id + + Args: + application_id: unique id of the application + """ + logging.info(f'{application_id}: Ordering supervised gym to start network training') + + gym_host = os.environ['SUPERVISED_GYM_HOST'] + gym_port = os.environ['SUPERVISED_GYM_PORT'] + controller_host = os.environ['CONTROLLER_HOST'] + controller_port = os.environ['CONTROLLER_PORT'] + client = ModelDBClient() + document = client.loadTrainingStatus(application_id=application_id) + proc_finished_url = document["proc_finished_url"] + + r = requests.post(f'http://{gym_host}:{gym_port}/scheduleTraining', json={ + "application_id": application_id, + "proc_finished_uri": f'http://{controller_host}:{controller_port}/networkTrained'}) + + if r.status_code >= 400: + TrainingController._notifyTrainingFailed( + application_id, + proc_finished_url, + f'Data generator returned error response: {r.text} while requesting training data generation') + return + + client.updateTrainingStatus(application_id=application_id, + status=TrainingStatus.IN_NETWORK_TRAINING) + + @staticmethod + def _sendRequestForDataGeneration(application_id: str, + proc_finished_url: str, + max_utility_error: float, + cp_cdo_path: str) -> None: + """Sends a request to the data generator to begin generating training data for this application id + + Args: + application_id: unique id of the application + proc_finished_url: url which waits for notification after the training is finished + max_utility_error: maximal MAE error of the predicted configurations to deem a trained network successful + cp_cdo_path: path in CDO which contains the constraint problem which will be used by the data generator + + Raises: + KeyError: when one of the required environmental variables was not set + """ + logging.info( + f'{application_id}: Ordering data generator to start data training') + + data_generator_host = os.environ['DATA_GENERATOR_HOST'] + data_generator_port = os.environ['DATA_GENERATOR_PORT'] + controller_host = os.environ['CONTROLLER_HOST'] + controller_port = os.environ['CONTROLLER_PORT'] + train_to_test_ratio = float(os.environ.get('TRAIN_TO_TEST_RATIO', 0.85)) + timestamp_difference = float(os.environ['TIMESTAMP_DIFFERENCE']) + r = requests.post(f"http://{data_generator_host}:{data_generator_port}/generate", json={ + "applicationId": application_id, + "trainRatio": train_to_test_ratio, + "cdoProblemPath": cp_cdo_path, + "returnEndpoint": f"http://{controller_host}:{controller_port}/dataGenerated", + "delta": timestamp_difference + }) + + if r.status_code >= 400: + TrainingController._notifyTrainingFailed( + application_id, + proc_finished_url, + f'Data generator returned error response: {r.text} while requesting training data generation') + return + + client = ModelDBClient() + client.updateTrainingStatus(application_id=application_id, + status=TrainingStatus.IN_DATA_GENERATION, + proc_finished_url=proc_finished_url) + client.updateNetworkInfo(application_id=application_id, + max_utility_error=max_utility_error) + + + @staticmethod + def _notifyTrainingFailed(application_id: str, + proc_finished_url: str, + msg: str) -> None: + """Notifies at the given url that the training has failed. + + Args: + application_id: unique id of the application + proc_finished_url: url at which the notification will be sent + msg: message describing the network status + """ + logging.error(f'{application_id}: {msg}') + ModelDBClient().updateTrainingStatus(application_id=application_id, + status=TrainingStatus.FAILED) + if proc_finished_url != "": + requests.post(proc_finished_url, json={ + "application_id": application_id, + "status": "failed" + }) + return + + @staticmethod + def _notifyTrainingSuccess(application_id: str, + proc_finished_url: str, + msg: str) -> None: + """Notifies at the given url that the training has successfully finished. + + Args: + application_id: unique id of the application + proc_finished_url: url at which the notification will be sent + msg: message describing the network status + """ + logging.info(f'{application_id}: {msg}') + ModelDBClient().updateTrainingStatus(application_id=application_id, + status=TrainingStatus.SUCCESS) + requests.post(proc_finished_url, json={ + "application_id": application_id, + "status": "successful" + }) + return + + +class WrongStatusForTrainingException(Exception): + """Signifies that the network training status is improper to perform training + """ + pass + + +class CDOException(Exception): + """Signifies that something went wrong while communicating with CDO + """ + pass diff --git a/controller/src/utilityEvaluator/UtilityEvaluator.py b/controller/src/utilityEvaluator/UtilityEvaluator.py new file mode 100644 index 0000000000000000000000000000000000000000..e6e04671614df535af2406f38c6bdf52fed741c6 --- /dev/null +++ b/controller/src/utilityEvaluator/UtilityEvaluator.py @@ -0,0 +1,378 @@ +import logging +import os +from numbers import Number +from pathlib import Path +from typing import List, Dict, Union, Tuple +from fastapi import HTTPException + +import torch +from modeldb.ModelDBClient import ModelDBClient + +from controller.src.utilityEvaluator.UtilityGenerator import UtilityGenerator +from .dataloader.csv_dataloader import Dataloader +from ..models.trainModels import NetworkTrainedInfo +from ..solving_controller import SolvingController + + +class UtilityEvaluator: + """ + Evaluator which uses UtilityGenerator to calculate mean absolute error between + utility function values of predicted and target configurations. + """ + + __metrics_labels_and_positions: Dict[str, int] + __target_labels: List[str] + __utility_generator: UtilityGenerator + + def __init__(self, + metrics_labels_and_positions: Dict[str, int], + target_labels: List[str], + cp_model_file_path: str, + camel_model_file_path: str, + node_candidates_file_path: str): + """ + Args: + metrics_labels_and_positions: dictionary specifying the names of the metrics and their positions in the test file + target_labels: names of the configuration columns in the test file + cp_model_file_path: path to the file containing the constraint problem + camel_model_file_path: path to the file containing the camel model + node_candidates_file_path: path to the file containing the node candidates + """ + + self.__metrics_labels_and_positions = metrics_labels_and_positions + self.__target_labels = target_labels + self.__utility_generator = UtilityGenerator(cp_model_path=cp_model_file_path, + camel_model_path=camel_model_file_path, + node_candidates_path=node_candidates_file_path, + metric_names=list(metrics_labels_and_positions.keys())) + + def evaluateUtilityValueDifference(self, + x: torch.Tensor, + preds: torch.Tensor, + target: torch.Tensor) -> float: + """Calculates the difference in utility values of the given configurations + + Args: + x: input tensor to the network + preds: predicted configuration + target: optimal configuration + + Returns: + difference in utility values of the given configurations + """ + assert preds.shape == target.shape, "Target and prediction tensors are of different shape." + target_utility_value = self.__utility_generator.evaluate(configuration=self._get_configuration(target), + metrics=self._get_metrics(x)) + preds_utility_value = self.__utility_generator.evaluate(configuration=self._get_configuration(preds), + metrics=self._get_metrics(x)) + return abs(target_utility_value - preds_utility_value) + + def _get_metrics(self, x: torch.Tensor) -> Dict[str, int]: + """Prepare a dictionary describing metrics based on the given tensor and self.__metrics_labels_and_positions + + Args: + x: input tensor to the network + + Returns: + dictionary with pairs (metric_name: metric_value) + """ + last_row_of_x = x[-1] + metrics: Dict[str, int] = {} + for label, position in self.__metrics_labels_and_positions.items(): + metrics[label] = last_row_of_x[position].item() + return metrics + + def _get_configuration(self, solve: torch.Tensor) -> Dict[str, int]: + """Prepare a dictionary describing the configuration based on the given tensor and self.__target_labels + + Args: + solve: tensor contatining the configuration + + Returns: + dictionary with pairs (variable_name: variable_value) + """ + configuration: Dict[str, int] = {} + for position, label in enumerate(self.__target_labels): + configuration[label] = solve[position].item() + return configuration + + @staticmethod + def _getCsvHeaders(test_data_file_path: Path) -> List[str]: + """ Extract the column headers of the training data + + Args: + test_data_file_path: path to the file containing the test data + + Returns: + Column headers of the test data + """ + csv_headers = [] + with test_data_file_path.open(mode='r') as test_data_file: + header = test_data_file.readline().rstrip('\n') # read only first line with column headers + csv_headers = header.split(',') + return csv_headers + + @staticmethod + def _getInputDataFromTensor(seq: torch.tensor, + input_columns: List[str], + csv_headers: List[str]) -> Dict[str, List[Union[float, int]]]: + """Transforms tensor data into input data for supervised gym to predict configuration + + Args: + seq: sequence of metrics and predictions + input_columns: names of the input columns + csv_headers: column headers of the test data + + Returns: + input data for supervised gym + """ + input_data = {} + for col in input_columns: + input_data[col] = [] + for line in seq: + for position in range(len(line)): + col = csv_headers[position] + value = (line[position].item()) + input_data[col].append(value) + return input_data + + @staticmethod + def _getPredictionAsTensor(prediction: Dict[str, Number], + target_labels: List[str], + input_columns: List[str], + csv_headers: List[str]) -> torch.tensor: + """Transforms the prediction produced by the supervised gym to a tensor + + Args: + prediction: configuration prediction produced by the supervised gym + target_labels: names of the configuration columns in the test file + input_columns: names of the input columns + csv_headers: column headers of the test data + + Returns: + Tensor created from the configuration prediction + """ + prediction_list = [0 for _ in range(len(target_labels))] + for key in prediction.keys(): + index = csv_headers.index(key) - len(input_columns) + prediction_list[index] = prediction[key] + return torch.IntTensor(prediction_list) + + @staticmethod + def _getNetworkInfo(client: ModelDBClient, + network_trained_info: NetworkTrainedInfo) -> Tuple[int, List[str], float, int]: + """Loads network information from model DB and the environmental variables + + Args: + client: Model DB Client + network_trained_info: information pass to the controller in the request + + Returns: + prediction step, list of input column names, maximal utility error and the sequence length of input + """ + pred_step = 0 + input_columns = [] + max_utility_error = 0.0 + seq_len = 0 + try: + pred_step = int(os.environ['PREDICTION_HORIZON']) + document = client.loadNetworkInfo(network_trained_info.application_id) + input_columns = document["input_columns"] + max_utility_error = document["max_utility_error"] + seq_len = document["input_sequence_len"] + except (KeyError, ValueError): + logging.error("Could not load network information from Model DB") + raise + + return pred_step, input_columns, max_utility_error, seq_len + + @staticmethod + def _downloadFilesFromModelDB(client: ModelDBClient, + application_id: str, + cp_path: Path, + camel_path: Path, + nodes_path: Path, + test_data_file_path) -> None: + """Exports necessary files from ModelDB to the file system. + + Args: + client: Model DB client + application_id: unique id of the application + cp_path: file path at which the constraint problem will be stored + camel_path: file path at which the camel model will be stored + nodes_path: file path at which the node candidates will be stored + test_data_file_path: file path at which the test data will be stored + + Raises: + KeyError: when it could find files in ModelDB for the given application id + """ + try: + client.exportCP(application_id, cp_path) + client.exportCamelModel(application_id, camel_path) + client.exportNodeCandidates(application_id, nodes_path) + client.exportTestDataToFile(application_id, test_data_file_path) + except KeyError: + logging.error( + f'Could not export files from model DB because at least one of them is not there for application id: {application_id}') + raise + + @staticmethod + def _parseCsvHeader(test_data_file_path: Path) -> Tuple[List[str], List[int], int, List[int]]: + """Parses the header of the test data file to extract necessary information + + Args: + test_data_file_path: path to file which describes the test data + + Returns: + training data column names, indexes of columns to be used, index of the group column, indexes of columns with predictions + """ + csv_headers = UtilityEvaluator._getCsvHeaders(test_data_file_path) + usecols = list(range(len(csv_headers))) + try: + time_col = csv_headers.index("time") + usecols.pop(time_col) + csv_headers.pop(time_col) + except ValueError: + pass # Ignore as apparently there was no "time" column in this data + + experiment_id_col = 0 + try: + group_id_name = os.environ.get("GROUP_ID", "experimentId") + experiment_id_col = csv_headers.index(group_id_name) + except ValueError: + logging.error("The test data does not have an experimentId column") + raise + + csv_headers.pop(experiment_id_col) + prediction_cols = [i for i, header in enumerate(csv_headers) if "PREDICTION" in header.upper()] + + return csv_headers, usecols, experiment_id_col, prediction_cols + + @staticmethod + def _generateMetricLabelsAndPositions(csv_headers: List[str], + prediction_cols: List[int], + input_columns: List[str]) -> Dict[str, int]: + """Generates a dictionary with metric names and their positions in the test data + + Args: + csv_headers: headers of test data columns + prediction_cols: indexes of columns containing predictions + input_columns: names of the columns describing the input to a network + + Returns: + dictionary with metric names and their positions in the test data + """ + metrics_labels_and_positions = {} + for i, header in enumerate(csv_headers): + if i not in prediction_cols and i < len(input_columns): + metrics_labels_and_positions[header] = i + return metrics_labels_and_positions + + def _calculateMeanUtilityError(self, + dataloader: Dataloader, + input_columns: List[str], + target_labels: List[str], + csv_headers: List[str], + network_trained_info: NetworkTrainedInfo) -> float: + """Calculates the Mean Average Utility Error by comparing utility values of configurations + described in the test data file with the ones predicted by supervised gym's trained model. + + Args: + dataloader: Dataloader object for loading sequences of data from the test data file + input_columns: names of the columns which are the input to the supervised gym's model + target_labels: names of the columns which describe the optimal configuration + csv_headers: names of the test data column headers + network_trained_info: information about network training received by the controller in the request + + Returns: + Calculated Mean Average Utility Error + + Raises: + HTTPException: when the supervised gym returned an error message after asking it to predict a configuration + """ + utility_error_sum = 0 + try: + for i in range(dataloader.get_size()): + seq, target = dataloader.getitem(i) + input_data = UtilityEvaluator._getInputDataFromTensor(seq, input_columns, csv_headers) + + prediction = SolvingController._sendPredictionRequest(network_trained_info.application_id, input_data) + prediction_as_tensor = UtilityEvaluator._getPredictionAsTensor(prediction, target_labels, + input_columns, csv_headers) + + # if the prediction tensor and target tensor are the same then we know the utility error will be 0 + # so we don't have to evaluate those configurations + if not torch.eq(prediction_as_tensor, target).all(): + utility_error_sum += self.evaluateUtilityValueDifference(seq, prediction_as_tensor, target) + + except HTTPException as e: + logging.error(f"The supervised gym did not predict a configuration for one of the sequences returning: {e}") + raise + return float(utility_error_sum) / dataloader.get_size() + + @staticmethod + def checkUtilityFuntionMAE(network_trained_info: NetworkTrainedInfo) -> bool: + """Checks whether the network has trained well enough. + + Args: + network_trained_info: information about network training received by the controller in the request + + Returns: + True if the network can be deemed as trained well or False otherwise + """ + logging.info(f'Checking if the network has trained for application id: {network_trained_info.application_id}') + test_data_file_path = Path(f"test-data-{network_trained_info.application_id}-utility-check") + cp_path = Path(f'CP-{network_trained_info.application_id}-utility-check') + camel_path = Path(f'camel-model-{network_trained_info.application_id}-utility-check') + nodes_path = Path(f'node-candidates-{network_trained_info.application_id}-utility-check') + + try: + client = ModelDBClient() + pred_step, input_columns, max_utility_error, seq_len = UtilityEvaluator._getNetworkInfo(client, network_trained_info) + UtilityEvaluator._downloadFilesFromModelDB(client, network_trained_info.application_id, + cp_path, + camel_path, + nodes_path, + test_data_file_path) + csv_headers, usecols, experiment_id_col, prediction_cols = UtilityEvaluator._parseCsvHeader( + test_data_file_path) + metrics_labels_and_positions = UtilityEvaluator._generateMetricLabelsAndPositions(csv_headers, + prediction_cols, + input_columns) + target_labels = csv_headers[len(input_columns):] + + dataloader = Dataloader(seq_len=seq_len, + pred_step=pred_step, + file=test_data_file_path, + usecols=usecols, + experiment_id_col=experiment_id_col, + x_y_split=len(input_columns), + x_predictions_cols=prediction_cols) + evaluator = UtilityEvaluator(metrics_labels_and_positions=metrics_labels_and_positions, + target_labels=target_labels, + cp_model_file_path=cp_path.absolute().as_posix(), + camel_model_file_path=camel_path.absolute().as_posix(), + node_candidates_file_path=nodes_path.absolute().as_posix()) + mean_utility_error = evaluator._calculateMeanUtilityError(dataloader, input_columns, + target_labels, csv_headers, + network_trained_info) + + if mean_utility_error > max_utility_error: + logging.info( + f'The trained model does NOT predict within error threshold for application id: {network_trained_info.application_id}\n' + + f'Mean utility error: {mean_utility_error}, Error threshold: {max_utility_error}') + return False + + except (KeyError, HTTPException): + logging.error(f'An error occurred while checking the utility function MAE.') + return False + finally: + test_data_file_path.unlink(missing_ok=True) + cp_path.unlink(missing_ok=True) + camel_path.unlink(missing_ok=True) + nodes_path.unlink(missing_ok=True) + + logging.info( + f'Successfully validated that trained model predicts within error threshold for application id: {network_trained_info.application_id}\n' + + f'Mean utility error: {mean_utility_error}, Error threshold: {max_utility_error}') + return True diff --git a/controller/src/utilityEvaluator/UtilityGenerator.py b/controller/src/utilityEvaluator/UtilityGenerator.py new file mode 100644 index 0000000000000000000000000000000000000000..7ef00bb80b00a8ceb091fd0b6e19f46a093e7eb9 --- /dev/null +++ b/controller/src/utilityEvaluator/UtilityGenerator.py @@ -0,0 +1,107 @@ +import logging +from os import unlink +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import List, Dict +from xml.etree.ElementTree import ElementTree, Element + +from jnius import autoclass +from lxml import etree + +ASUtilityGeneratorApplication = autoclass("eu.melodic.upperware.utilitygenerator.ASUtilityGeneratorApplication") +IntVariableValueDTO = autoclass("eu.melodic.upperware.utilitygenerator.cdo.cp_model.DTO.IntVariableValueDTO") +ArrayList = autoclass("java.util.ArrayList") + + +class UtilityGenerator: + """ + Class used for utilizing the "utility generator" java library to calculate the utility function value + for given constraint problem and camel model. + + Requires java library with utility generator (and its dependencies) and ASUtilityGeneratorApplication from java-src. + The .jar containing the library should be included in the classpath declared at the beginning of this file. + """ + _cp_model: ElementTree + _xml_fillings: Dict[str, Element] + + _cp_model_path: str + _camel_model_path: str + _node_candidates_path: str + + def __init__(self, + cp_model_path: str, + camel_model_path: str, + node_candidates_path: str, + metric_names: List[str]): + """ + Args: + cp_model_path: path to the file containing the constraint problem model + camel_model_path: path to the file containing the camel model + node_candidates_path: path to the file containing node candidates + metric_names: names of the metrics that might be changed (exactly as they appear in the cp model file) + """ + + self._cp_model_path = Path(cp_model_path) + self._cp_model = etree.parse(str(Path(self._cp_model_path))) + self._camel_model_path = camel_model_path + self._node_candidates_path = node_candidates_path + + self._xml_fillings = {} + for name in metric_names: + xml_filling = self._cp_model.find( + f"cpMetrics[@id='{name}']") + if xml_filling is not None: + self._xml_fillings[name] = xml_filling[0] + else: + logging.warning(f"Received metric name which was not found in the CP: {name}. Ommiting it.") + + def _add_metrics(self, + filename: str, + metrics: Dict[str, int]) -> None: + """Adds metrics to the constraint problem model. + + Args: + filename: name of the file containing the constraint problem model + metrics: dictionary with pairs (arg_name, arg_value) describing the metrics to be added. If metrics are + empty, then no value wil be changed + """ + for arg_name, arg_value in metrics.items(): + if arg_name not in self._xml_fillings: + logging.warning(f"Received metric name which was not found in the CP: {arg_name}. Ommiting it.") + continue + arg_loc = self._xml_fillings[arg_name] + value_type = arg_loc.get(r'{http://www.w3.org/2001/XMLSchema-instance}type') + if value_type == 'types:IntegerValueUpperware': + arg_loc.set('value', str(int(arg_value))) + else: + arg_loc.set('value', str(arg_value)) + + self._cp_model.write(filename, + xml_declaration=True, + encoding="ASCII") + + def evaluate(self, + configuration: Dict[str, int], + metrics: Dict[str, int]) -> float: + """Creates java objects based on the parameters and a tmeporary file with an updated contraint problem model. + Then it calculates the utility function value using the java's ASUtilityGeneratorApplication. + + Args: + configuration: dictionary with pairs (arg_name, arg_value) describing the configuration + metrics: dictionary with pairs (arg_name, arg_value) describing the metrics + Returns: + the utility function value for the given parameters + """ + utility_value = 0.0 + with NamedTemporaryFile(delete=False) as tempfile: + + self._add_metrics(filename=tempfile.name, metrics=metrics) + variable_list = ArrayList() + for (name, value) in configuration.items(): + variable_list.add(IntVariableValueDTO(name, round(value))) + + utility_generator = ASUtilityGeneratorApplication(self._camel_model_path, tempfile.name, + self._node_candidates_path) + utility_value = utility_generator.evaluate(variable_list) + + return utility_value diff --git a/controller/src/utilityEvaluator/dataloader/csv_dataloader.py b/controller/src/utilityEvaluator/dataloader/csv_dataloader.py new file mode 100644 index 0000000000000000000000000000000000000000..c64c31d8d98296948235068dab6753c5dc9f7be7 --- /dev/null +++ b/controller/src/utilityEvaluator/dataloader/csv_dataloader.py @@ -0,0 +1,203 @@ +from pathlib import Path +from typing import List + +import numpy as np +import torch + + +class Dataloader: + """ + Based on a .csv file, it maps a sequence of metrics and predictions + gathered at consecutive timestamps to an ideal configuration and enables + getting those (metrics, configuration) pairs by their index. + Both the sequence length and the prediction step are passed as parameters. + + The .csv file can be divided into 3 parts: + - X part - columns containing metrics and predictions, which will be the input to neural networks + - Y part - columns containing the ideal configuration, which will be the desired output of neural networks + - Experiment Id column - column defining the experiment to which a given row corresponds to. + + All the rows with the same experiment id must be grouped together. + The timestamp difference between continuous rows in a given one experiment should always be the same. + The difference in time or amount of rows in different experiments is not relevant. + """ + + def __init__(self, + seq_len: int, + pred_step: int, + file: Path, + usecols: List[int], + experiment_id_col: int, + x_y_split: int, + x_predictions_cols: List[int], + delimiter=',', + skip_header=1 + ): + """ + Args: + seq_len: number of following records returned as a sequence. + pred_step: distance between last record in sequence and correctly predicted row. + file: path to the .csv file containing the necessary data + usecols: ids of columns to be read from csv (it must include experiment_id_col). Other columns will be dismissed. + experiment_id_col: id of the column (after dismissing unused columns) defining the experiment + to which a given row corresponds to. This column will be dismissed. + All the rows with the same experiment id must be grouped together. + x_y_split: id of column (after dismissing unused columns) which is the border + between metrics and predictions (x) and desired configuration (y). + In cols [0, x_y_split-1] are the values describing the input to a network. + In cols [x_y_split, end-1] are the values describing the desired output from a network. + x_predictions_cols: ids of columns (after dismissing unused columns) that contain some prediction value. + They will be pushed by pred_step rows. + delimiter: delimiter between values in .csv file + skip_header: number of rows to skip at the beggining of the .csv file + """ + + if seq_len < 0: + raise ValueError(f"seq_len can't be negative") + + if pred_step < 0: + raise ValueError(f"pred_step can't be negative") + + self.__file = file + self.__seq_len = seq_len + self.__x_y_split = x_y_split + self.__usecols = usecols + self.__experiment_id_col = experiment_id_col + self.__x_predictions_cols = x_predictions_cols + self.__delimiter = delimiter + self.__skip_header = skip_header + + self.__preproces_data(seq_len, pred_step) + + def get_size(self) -> int: + """ Returns the size of the dataset. + Returns: + the size of the dataset + """ + return self.__size + + def __load_series(self, file: Path) -> np.array: + """Loads data from the .csv file and splits it so that each experiment is separated from others. + + Args: + file: path to the file containing the data + + Returns: + array of data where each element describes data for different experiment + """ + series = np.genfromtxt( + file.as_posix(), + delimiter=self.__delimiter, + skip_header=self.__skip_header, + usecols=self.__usecols, + dtype=np.float32 + ) + split_idxes = [] # rows at which a new experiment begins + current_experiment_id = series[0][self.__experiment_id_col] + for i in range(1, series.shape[0]): + if series[i][self.__experiment_id_col] != current_experiment_id: + split_idxes.append(i) + current_experiment_id = series[i][self.__experiment_id_col] + + series = np.delete(series, self.__experiment_id_col, 1) # deleting the column with experiment_id + series = np.vsplit(series, split_idxes) # spliting the data into experiments + return series + + def __preproces_data(self, + seq_len: int, + pred_step: int) -> None: + """Prepares the data. + Firstly loads and splits the data into experiments. + Then, within each experiment, it is pushing/cutting rows according to seq_len and pred_step parameters. + Lastly, it calculates idx_experiment_map and idx_experiment_idx_map so that each pair of (sequence, configuration) + can be accessed using a numerical index (in the method __getitem(idx)). + + Args: + seq_len: length of the sequence required by the supervised gym's model + pred_step: how many data rows in the future the model is predicting for + """ + series = self.__load_series(self.__file) + sizes = np.zeros(len(series), dtype=np.int32) + self.__x = [None] * len(series) + self.__y = [None] * len(series) + + i = 0 + while i < len(series): + x, y = np.hsplit(series[i], [self.__x_y_split]) + x_cols = np.hsplit(x, x.shape[1]) + + x_cols = self.__push_cols(x_cols, self.__x_predictions_cols, pred_step) + x = np.hstack(x_cols) + y = y[seq_len + pred_step - 1:] # make it fit with x sequences + + self.__x[i] = x + self.__y[i] = torch.from_numpy(y) + sizes[i] = y.shape[0] + + if sizes[i] <= 0: + sizes = np.delete(sizes, i) + self.__x.pop(i) + self.__y.pop(i) + i -= 1 + + i += 1 + + cumsizes = np.cumsum(sizes, dtype=np.int32) + self.__size = cumsizes[-1].item() + self.__idx_experiment_map = np.searchsorted(cumsizes, np.arange(self.__size) + 1).astype(np.int32) + self.__idx_experiment_idx_map = np.zeros(self.__size).astype(np.int32) + + self.__idx_experiment_idx_map[0] = 0 + cnt = 1 + for i in range(1, len(self.__idx_experiment_map)): + if self.__idx_experiment_map[i] != self.__idx_experiment_map[i - 1]: + cnt = 0 + self.__idx_experiment_idx_map[i] = cnt + cnt += 1 + + def __push_cols(self, + cols: np.array, + push_idx: List[int], + d: int) -> np.array: + """Pushes cols of indexes in push_idx by value d, while other cols get cut by value d + + Args: + cols: columns of data to be modified + push_idx: indexes of columns that need to be pushed + d: the amount of rows to be pushed + + Returns: + modified columns of data + """ + + if d == 0: + return cols + + new_cols = [None] * len(cols) + + for i, _ in enumerate(cols): + if i in push_idx: + new_cols[i] = cols[i][d:] + else: + new_cols[i] = cols[i][:-d] + + return new_cols + + def getitem(self, idx: int) -> torch.tensor: + """Gets the data sequence of the specified index + + Args: + idx: index of data sequence to be extracted + + Returns: + tuple (seq, conf) where: + - seq is the sequence of metrics and predictions (input to neural networks) + - conf is the ideal configuration based on the sequence (desired output from neural networks) + """ + + f = self.__idx_experiment_map[idx] + f_idx = self.__idx_experiment_idx_map[idx] + x = self.__x[f][f_idx:f_idx + self.__seq_len] + y = self.__y[f][f_idx] + + return (x, y) diff --git a/controller/src/utilityEvaluator/java-src/ASUtilityGeneratorApplication.java b/controller/src/utilityEvaluator/java-src/ASUtilityGeneratorApplication.java new file mode 100644 index 0000000000000000000000000000000000000000..cd5a8f308a992c6fd636ad06940927bd1b362a7c --- /dev/null +++ b/controller/src/utilityEvaluator/java-src/ASUtilityGeneratorApplication.java @@ -0,0 +1,35 @@ +package eu.melodic.upperware.utilitygenerator; + +import eu.melodic.cache.NodeCandidates; +import eu.melodic.cache.impl.FilecacheService; +import eu.melodic.upperware.penaltycalculator.PenaltyFunctionProperties; +import eu.melodic.upperware.utilitygenerator.cdo.cp_model.DTO.VariableValueDTO; +import eu.melodic.upperware.utilitygenerator.properties.UtilityGeneratorProperties; +import eu.paasage.upperware.security.authapi.properties.MelodicSecurityProperties; +import eu.paasage.upperware.security.authapi.token.JWTServiceImpl; + +import java.util.Collection; + +public class ASUtilityGeneratorApplication{ + private UtilityGeneratorApplication utilityGeneratorApplication; + + public ASUtilityGeneratorApplication(String camelModelFilePath, String cpModelFilePath, String NODE_CANDIDATES_FILE_PATH){ + utilityGeneratorApplication = createUtilityGeneratorApplication(camelModelFilePath, cpModelFilePath, NODE_CANDIDATES_FILE_PATH); + } + + public double evaluate(Collection solution) { + return this.utilityGeneratorApplication.evaluate(solution); + } + + private static UtilityGeneratorApplication createUtilityGeneratorApplication(String camelModelFilePath, String cpModelFilePath, String NODE_CANDIDATES_FILE_PATH) { + boolean readFromFile = true; + NodeCandidates nodeCandidates = new FilecacheService().load(NODE_CANDIDATES_FILE_PATH); + UtilityGeneratorProperties utilityGeneratorProperties = new UtilityGeneratorProperties(); + utilityGeneratorProperties.setUtilityGenerator(new UtilityGeneratorProperties.UtilityGenerator()); + utilityGeneratorProperties.getUtilityGenerator().setDlmsControllerUrl(""); + MelodicSecurityProperties melodicSecurityProperties = new MelodicSecurityProperties(); + JWTServiceImpl jWTServiceImpl = new JWTServiceImpl(melodicSecurityProperties); + PenaltyFunctionProperties penaltyFunctionProperties = new PenaltyFunctionProperties(); + return new UtilityGeneratorApplication(camelModelFilePath, cpModelFilePath, readFromFile, nodeCandidates, utilityGeneratorProperties, melodicSecurityProperties, jWTServiceImpl, penaltyFunctionProperties); + } +}