Commit 302acf91 authored by Oliver Stahl's avatar Oliver Stahl
Browse files

Added PoC/demo implementation of facade

parent ee321959
Pipeline #18945 passed with stages
in 12 minutes and 49 seconds
......@@ -126,6 +126,23 @@
<artifactId>jwt-commons</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.19.2</version>
</dependency>
</dependencies>
......
......@@ -15,6 +15,8 @@ import eu.melodic.upperware.utilitygenerator.cdo.cp_model.DTO.VariableDTO;
import eu.melodic.upperware.utilitygenerator.cdo.cp_model.DTO.VariableValueDTO;
import eu.melodic.upperware.utilitygenerator.cdo.cp_model.MetricsConverter;
import eu.melodic.upperware.utilitygenerator.evaluator.template_function_evaluator_utils.TemplateNodeCandidatesConverter;
import eu.melodic.upperware.utilitygenerator.facade.PMFacade;
import eu.melodic.upperware.utilitygenerator.facade.PMFacadeImpl;
import eu.melodic.upperware.utilitygenerator.node_candidates.NodeCandidatesConverter;
import eu.melodic.upperware.utilitygenerator.utility_function.ArgumentConverter;
import eu.melodic.upperware.utilitygenerator.utility_function.UtilityFunction;
......@@ -41,6 +43,7 @@ public class UtilityFunctionEvaluator {
private Collection<VariableDTO> variablesFromConstraintProblem;
private NodeCandidates nodeCandidates;
private Collection<ArgumentConverter> converters;
private final PMFacade facade;
public UtilityFunctionEvaluator(String camelModelFilePath, String cpModelFilePath, boolean readFromFile, NodeCandidates nodeCandidates,
MelodicSecurityProperties melodicSecurityProperties, JWTService jwtService) {
......@@ -65,6 +68,7 @@ public class UtilityFunctionEvaluator {
fromCamelModelExtractor.endWorkWithCamelModel();
constraintProblemExtractor.endWorkWithCPModel();
facade = new PMFacadeImpl();
}
public UtilityFunctionEvaluator(String camelModelFilePath, String cpModelFilePath, boolean readFromFile, NodeCandidates nodeCandidates,
......@@ -90,6 +94,7 @@ public class UtilityFunctionEvaluator {
fromCamelModelExtractor.endWorkWithCamelModel();
constraintProblemExtractor.endWorkWithCPModel();
facade = new PMFacadeImpl();
}
public UtilityFunctionEvaluator(String cpModelFilePath, NodeCandidates nodeCandidates, List<Map.Entry<TemplateProvider.AvailableTemplates, Double>> utilityComponents) {
......@@ -108,6 +113,7 @@ public class UtilityFunctionEvaluator {
this.function = new UtilityFunction(utilityFormula, new ArrayList<>());
constraintProblemExtractor.endWorkWithCPModel();
facade = new PMFacadeImpl();
}
private Collection<ArgumentConverter> createConverters(FromCamelModelExtractor fromCamelModelExtractor, String formula, NodeCandidatesConverter nodeCandidatesConverter,
......@@ -137,6 +143,10 @@ public class UtilityFunctionEvaluator {
return 0;
}
// TODO filter for dependent metrics here
//Double predictionResult = facade.callPmPrediction(solution);
// TODO what to do with the result?
Collection<Argument> allArguments = this.converters.stream()
.map(converter -> converter.convertToArguments(solution, newConfiguration))
.flatMap(Collection::stream)
......
package eu.melodic.upperware.utilitygenerator.facade;
import com.google.protobuf.GeneratedMessageV3;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.codehaus.plexus.util.StringUtils;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
@Slf4j
public abstract class AbstractFacade {
private final String jmsUrl;
private int idCounter = 0;
protected AbstractFacade() {
jmsUrl = initializeJmsServerUrl();
}
protected String generateNextId() {
return "" + ++idCounter;
}
protected void sendMessage(Message message, Destination destination, Session session) throws JMSException {
MessageProducer producer = session.createProducer(destination);
producer.send(message);
producer.close();
}
protected Message receiveMessage(Destination destination, Session session) throws JMSException {
MessageConsumer consumer = session.createConsumer(destination);
return consumer.receive();
}
protected byte[] getBytesFromMessage(BytesMessage message) throws JMSException {
long length = message.getBodyLength();
byte[] bytes = new byte[(int) length];
message.readBytes(bytes);
return bytes;
}
protected BytesMessage createBytesMessage(GeneratedMessageV3 bean, Session session) throws JMSException {
BytesMessage message = session.createBytesMessage();
message.writeBytes(bean.toByteArray());
return message;
}
protected Topic createTopic(String name, Session session) throws JMSException {
return session.createTopic(name);
}
protected Queue createQueue(String name, Session session) throws JMSException {
return session.createQueue(name);
}
protected Connection initializeConnection() throws JMSException {
ConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(jmsUrl);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
return connection;
}
protected Session initializeSession(Connection connection) throws JMSException {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected void closeSession(Session session) {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
log.error("Closing JMS session failed: {}", e.getMessage(), e);
}
}
}
protected void closeConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
log.error("Closing JMS connection failed: {}", e.getMessage(), e);
}
}
}
private String initializeJmsServerUrl() {
String jmsUrl = System.getProperties().getProperty("jmsUrl");
if (StringUtils.isBlank(jmsUrl)) {
throw new IllegalArgumentException("No URL for JMS server set.");
}
return jmsUrl;
}
protected void prepareBuilder(GeneratedMessageV3.Builder builder, Map<String, Object> data) {
Class<? extends GeneratedMessageV3.Builder> builderClass = builder.getClass();
for(Map.Entry<String, Object> entry : data.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
try {
String setterName = "set" + key.substring(0, 1).toUpperCase() + key.substring(1);
for(Method m : builderClass.getMethods()) {
if(m.getName().equalsIgnoreCase(setterName) && m.getParameterTypes().length == 1) {
Class<?> para = m.getParameterTypes()[0];
Method method = builderClass.getDeclaredMethod(setterName, para);
log.info("!!!!!!!! Invoking " + builderClass.getSimpleName() + "." + method.getName() + "("+ value +")");
method.invoke(builder, value);
break;
}
}
}
catch(NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
log.error(e.getMessage(), e);
}
}
}
}
package eu.melodic.upperware.utilitygenerator.facade;
import eu.melodic.upperware.utilitygenerator.cdo.cp_model.DTO.VariableValueDTO;
import java.util.Collection;
public interface PMFacade {
Double callPmPrediction(Collection<VariableValueDTO> solution);
}
package eu.melodic.upperware.utilitygenerator.facade;
import eu.melodic.upperware.utilitygenerator.cdo.cp_model.DTO.VariableValueDTO;
import lombok.extern.slf4j.Slf4j;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class PMFacadeImpl extends AbstractFacade implements PMFacade {
private static final String PM_PREDICT_REQUEST_TOPIC = "performance_module_predict";
private static final String PM_PREDICT_REPLY_TOPIC = "performance_module_predict_feedback";
private static final String APPLICATION_PROPERTY_NAME = "application";
private static final String TARGET_PROPERTY_NAME = "target";
private static final String FEATURES_PROPERTY_NAME = "features";
private static final String VARIANT_PROPERTY_NAME = "variant";
private static final String SENDER_ID_PROPERTY_NAME = "sender_id";
public PMFacadeImpl() {
}
@Override
public Double callPmPrediction(Collection<VariableValueDTO> solution) {
Connection connection = null;
Session session = null;
try {
connection = initializeConnection();
session = initializeSession(connection);
String id = generateNextId();
Map<String, Number> features = new HashMap<>();
for(VariableValueDTO dto : solution) {
features.put(dto.getName(), dto.getValue());
}
Message requestMessage = session.createObjectMessage();
requestMessage.setStringProperty(APPLICATION_PROPERTY_NAME, "default_application");
requestMessage.setStringProperty(TARGET_PROPERTY_NAME, "whatever");
requestMessage.setObjectProperty(FEATURES_PROPERTY_NAME, features);
requestMessage.setStringProperty(VARIANT_PROPERTY_NAME, "vm");
requestMessage.setStringProperty(SENDER_ID_PROPERTY_NAME, id);
Topic requestTopic = createTopic(PM_PREDICT_REQUEST_TOPIC, session);
Queue requestQueue = createQueue(PM_PREDICT_REQUEST_TOPIC, session);
sendMessage(requestMessage, requestQueue, session);
Topic replyTopic = createTopic(PM_PREDICT_REPLY_TOPIC, session);
Queue replyQueue = createQueue(PM_PREDICT_REPLY_TOPIC, session);
Message replyMessage = receiveMessage(replyQueue, session);
String replyId = replyMessage.getStringProperty(SENDER_ID_PROPERTY_NAME);
if(id.equals(replyId)) {
String test = replyMessage.getStringProperty(APPLICATION_PROPERTY_NAME);
return 0d;
// return Float.valueOf(replyMessage.getFloatProperty("value")).doubleValue();
}
else {
// TODO go around
}
} catch (JMSException e) {
log.error("JMS communication failed: {}", e.getMessage(), e);
} finally {
closeSession(session);
closeConnection(connection);
}
// TODO what to return in this case?
return null;
}
public static void main(String[] args) {
PMFacadeImpl pmFacade = new PMFacadeImpl();
Collection<VariableValueDTO> solution = new ArrayList<>();
solution.add(new VariableValueDTO("test-long", 100L));
solution.add(new VariableValueDTO("test-float", 1.1f));
solution.add(new VariableValueDTO("test-int", -50));
Double result = pmFacade.callPmPrediction(solution);
log.info(result == null ? null : result.toString());
}
}
package eu.melodic.upperware.utilitygenerator.facade;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.melodic.upperware.utilitygenerator.facade.test.CallPmPrediction;
import eu.melodic.upperware.utilitygenerator.facade.test.CallPmPredictionResponse;
import lombok.extern.slf4j.Slf4j;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class PMSideDemoFacade extends AbstractFacade {
private static final String PM_PREDICT_REQUEST_TOPIC = "performance_module_predict";
private static final String PM_PREDICT_REPLY_TOPIC = "performance_module_predict_feedback";
public PMSideDemoFacade() {
}
public void call() {
Connection connection = null;
Session session = null;
try {
connection = initializeConnection();
session = initializeSession(connection);
log.info("!!!!!!!! Setting up PM side...");
log.info("!!!!!!!! Listening to queue: " + PM_PREDICT_REQUEST_TOPIC);
Queue requestQueue = createQueue(PM_PREDICT_REQUEST_TOPIC, session);
BytesMessage requestMessage = (BytesMessage) receiveMessage(requestQueue, session);
CallPmPrediction pmPrediction = CallPmPrediction.parseFrom(getBytesFromMessage(requestMessage));
log.info("!!!!!!!! Message received:\n" + pmPrediction);
log.info("!!!!!!!! Enter PM code -------------------------------------------------------------------------------------------");
int sum = pmPrediction.getDispatcherCores() + pmPrediction.getDispatcherRam() + pmPrediction.getWorkerCores() + pmPrediction.getWorkerRam();
double result = sum * pmPrediction.getWorkerMulti();
log.info("!!!!!!!! Calculated value = " + result);
log.info("!!!!!!!! Exit PM code -------------------------------------------------------------------------------------------");
CallPmPredictionResponse.Builder pmPredictionResponse = CallPmPredictionResponse.newBuilder();
Map<String, Object> data = new HashMap<>();
data.put("utilityValue", result);
data.put("senderID", pmPrediction.getSenderID());
prepareBuilder(pmPredictionResponse, data);
CallPmPredictionResponse callPmPredictionResponse = pmPredictionResponse.build();
BytesMessage replyMessage = createBytesMessage(callPmPredictionResponse, session);
log.info("!!!!!!!! Sending to queue: " + PM_PREDICT_REPLY_TOPIC);
log.info("!!!!!!!! Message: \n" + callPmPredictionResponse);
Queue replyQueue = createQueue(PM_PREDICT_REPLY_TOPIC, session);
sendMessage(replyMessage, replyQueue, session);
log.info("!!!!!!!! Message sent");
}
catch(InvalidProtocolBufferException e) {
log.error(e.getMessage(), e);
}
catch (JMSException e) {
log.error("JMS communication failed: {}", e.getMessage(), e);
}
finally {
closeSession(session);
closeConnection(connection);
}
}
public static void main(String[] args) {
PMSideDemoFacade pmFacade = new PMSideDemoFacade();
pmFacade.call();
}
}
package eu.melodic.upperware.utilitygenerator.facade;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.melodic.upperware.utilitygenerator.facade.test.CallPmPrediction;
import eu.melodic.upperware.utilitygenerator.facade.test.CallPmPredictionResponse;
import lombok.extern.slf4j.Slf4j;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class UGSideDemoFacadeImpl extends AbstractFacade {
private static final String PM_PREDICT_REQUEST_TOPIC = "performance_module_predict";
private static final String PM_PREDICT_REPLY_TOPIC = "performance_module_predict_feedback";
public UGSideDemoFacadeImpl() {
}
public Double callPmPrediction(Map<String, Object> data) {
Connection connection = null;
Session session = null;
try {
connection = initializeConnection();
session = initializeSession(connection);
log.info("!!!!!!!! Setting up UG side...");
String id = generateNextId();
data.put("senderID", id);
CallPmPrediction.Builder builder = CallPmPrediction.newBuilder();
prepareBuilder(builder, data);
CallPmPrediction callPmPrediction = builder.build();
Message requestMessage = createBytesMessage(callPmPrediction, session);
log.info("!!!!!!!! Sending to queue: " + PM_PREDICT_REQUEST_TOPIC);
log.info("!!!!!!!! Message: \n" + callPmPrediction);
Queue requestQueue = createQueue(PM_PREDICT_REQUEST_TOPIC, session);
sendMessage(requestMessage, requestQueue, session);
log.info("!!!!!!!! Message sent");
log.info("!!!!!!!! Listening to queue: " + PM_PREDICT_REPLY_TOPIC);
Queue replyQueue = createQueue(PM_PREDICT_REPLY_TOPIC, session);
BytesMessage replyByteMessage = (BytesMessage) receiveMessage(replyQueue, session);
CallPmPredictionResponse pmPredictionResponse = CallPmPredictionResponse.parseFrom(getBytesFromMessage(replyByteMessage));
log.info("!!!!!!!! Message received:\n" + pmPredictionResponse);
String replyId = pmPredictionResponse.getSenderID();
if(id.equals(replyId)) {
return pmPredictionResponse.getUtilityValue();
}
else {
// TODO go around
}
}
catch(InvalidProtocolBufferException e) {
log.error(e.getMessage(), e);
}
catch (JMSException e) {
log.error("JMS communication failed: {}", e.getMessage(), e);
}
finally {
closeSession(session);
closeConnection(connection);
}
return null;
}
public static void main(String[] args) {
UGSideDemoFacadeImpl pmFacade = new UGSideDemoFacadeImpl();
Map<String, Object> data = new HashMap<>();
data.put("dispatcherCores", 2);
data.put("dispatcherRam", 4);
data.put("dispatcherLocation", "Somewhere");
data.put("workerCores", 4);
data.put("workerRam", 8);
data.put("workerLocation", "Somewhere else");
data.put("workerMulti", 100);
Double result = pmFacade.callPmPrediction(data);
try {
Thread.sleep(1000);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
syntax = "proto3";
package eu.melodic.upperware.utilitygenerator.facade.test;
option java_package = "eu.melodic.upperware.utilitygenerator.facade.test";
option optimize_for = SPEED;
option java_multiple_files = true;
message CallPmPrediction {
int32 dispatcherCores = 1;
int32 dispatcherRam = 2;
string dispatcherLocation = 3;
int32 workerCores = 4;
int32 workerRam = 5;
string workerLocation = 6;
int32 workerMulti = 7;
string senderID = 8; // facade sender ID
};
message Config {
int32 cores = 1;
int32 ram = 2;
string location = 3;
}
message CallPmPrediction2 {
Config dispatcher = 1;
Config worker = 2;
int32 workerMulti = 3;
string senderID = 4; // facade sender ID
}
message CallPmPredictionResponse {
double utilityValue = 1;
// int32 utilityValue = 1;
string senderID = 2; // facade sender ID
};
\ No newline at end of file
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: facade.proto
package eu.melodic.upperware.utilitygenerator.facade.test;
public interface CallPmPrediction2OrBuilder extends
// @@protoc_insertion_point(interface_extends:eu.melodic.upperware.utilitygenerator.facade.test.CallPmPrediction2)
com.google.protobuf.MessageOrBuilder {
/**
* <code>.eu.melodic.upperware.utilitygenerator.facade.test.Config dispatcher = 1;</code>
* @return Whether the dispatcher field is set.
*/
boolean hasDispatcher();
/**
* <code>.eu.melodic.upperware.utilitygenerator.facade.test.Config dispatcher = 1;</code>
* @return The dispatcher.
*/
Config getDispatcher();
/**
* <code>.eu.melodic.upperware.utilitygenerator.facade.test.Config dispatcher = 1;</code>