Commit 3dc8059a authored by Alessio Carenini's avatar Alessio Carenini
Browse files

Implentation of the Choreography top level entity

parent e48c0128
package org.apache.brooklyn.entity.webapp.chorevolution;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.xml.namespace.QName;
import javax.xml.soap.MessageFactory;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPBodyElement;
import javax.xml.soap.SOAPConnection;
import javax.xml.soap.SOAPConnectionFactory;
import javax.xml.soap.SOAPElement;
import javax.xml.soap.SOAPEnvelope;
import javax.xml.soap.SOAPException;
import javax.xml.soap.SOAPHeader;
import javax.xml.soap.SOAPMessage;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.core.entity.AbstractApplication;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.entity.webapp.tomcat.Tomcat8Server;
import org.apache.cxf.endpoint.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.chorevolution.datamodel.ChoreographyService;
import eu.chorevolution.datamodel.ConfigurableExistingService;
import eu.chorevolution.datamodel.DeployedService;
import eu.chorevolution.datamodel.ExistingService;
import eu.chorevolution.datamodel.PackageType;
import eu.chorevolution.datamodel.ServiceDependency;
import eu.chorevolution.datamodel.ServiceGroup;
import eu.chorevolution.datamodel.ServiceType;
import eu.chorevolution.datamodel.StatusType;
import eu.chorevolution.datamodel.deployment.CloudNode;
import eu.chorevolution.datamodel.deployment.DeploymentInfo;
import eu.chorevolution.datamodel.deployment.DeploymentLocation;
public class ChoreographyImpl extends AbstractApplication implements Choreography {
private static final Logger log = LoggerFactory.getLogger(ChoreographyImpl.class);
private static final String tenant = null;
@Override
public void init() {
super.init();
setDefaultDisplayName("Choreography ("+getId()+")");
}
public void postStart(Collection<? extends Location> locations) {
eu.chorevolution.datamodel.Choreography chor = getChorSpec();
/*
Client client = ClientBuilder.newClient(config);
WebTarget target = client.target(getBaseURI());
// Get XML
String xmlResponse = target.path("rest").path("todo").request()
.accept(MediaType.TEXT_XML).get(String.class);
*/
}
public eu.chorevolution.datamodel.Choreography getChorSpec() {
eu.chorevolution.datamodel.Choreography c=new eu.chorevolution.datamodel.Choreography();
StatusType status=StatusType.RUNNING;
DeploymentLocation location=new DeploymentLocation();
List<ServiceGroup> service_groups=new ArrayList<ServiceGroup>();
Collection<Entity> clusters = getChildren();
ControlledDynamicChoreographyCluster chor_cluster=null;
List<Map<String, Object>> chor_spec=new ArrayList<Map<String,Object>>();
List<Map<String, Object>> cluster_spec=null;
c.setId(getApplicationId());
c.setLocation(location);
c.setServiceGroups(service_groups);
c.setStatus(status);
for (Entity cluster_node: clusters) {
ServiceGroup sg=new ServiceGroup();
chor_cluster=(ControlledDynamicChoreographyCluster)cluster_node;
String tomcat_base_url="http://"+chor_cluster.getController().getAttribute(ControlledDynamicChoreographyCluster.HOSTNAME)+":"+chor_cluster.getController().getAttribute(ControlledDynamicChoreographyCluster.HTTP_PORT)+"/";
String ode_base_url="http://"+chor_cluster.getController().getAttribute(ControlledDynamicChoreographyCluster.HOSTNAME)+":"+chor_cluster.getController().getAttribute(ControlledDynamicChoreographyCluster.HTTP_PORT)+"/ode/processes/";
cluster_spec= chor_cluster.getConfig(ControlledDynamicChoreographyCluster.CHOR_SPEC);
for (Map<String, Object> service_node:cluster_spec) {
Map<String,Object> service_cfg = (Map<String,Object>)(service_node.get("service"));
String service_name=(String) service_cfg.get("name");
String service_roles=(String) service_cfg.get("roles");
String service_type=(String) service_cfg.get("service_type");
String service_artifact_type=(String) service_cfg.get("artifact_type");
String service_package_type=(String) service_cfg.get("package_type");
String service_package_url=(String) service_cfg.get("package_url");
String service_url=(String) service_cfg.get("url");
Object deps = service_cfg.get("dependencies");
List<Map<String, String>> service_deps=(List<Map<String, String>>)deps;
if (service_type.equals("existingService")) {
if (service_deps!=null) {
ConfigurableExistingService srv=null;
srv=new ConfigurableExistingService();
srv.setName(service_name);
srv.setRoles(Arrays.asList(service_roles));
srv.setUrl(service_url);
srv.setDependencies(create_deps(service_deps));
}
else {
ExistingService srv=null;
srv=new ExistingService();
srv.setName(service_name);
srv.setRoles(Arrays.asList(service_roles));
srv.setUrl(service_url);
}
}
else if (service_type.equals("deployableService")) {
DeployedService srv=null;
srv=new DeployedService();
srv.setName(service_name);
srv.setRoles(Arrays.asList(service_roles));
srv.setPackageType(PackageType.valueOf(service_package_type));
srv.setPackageUrl(service_package_url);
srv.setServiceType(ServiceType.valueOf(service_artifact_type));
srv.setDependencies(create_deps(service_deps));
if (srv.getServiceType().equals(ServiceType.COORDINATION_DELEGATE)) {
srv.setUrl(ode_base_url+srv.getName());
}
else {
srv.setUrl(tomcat_base_url+srv.getName());
}
ArrayList<DeploymentInfo> deploymentInfo = new ArrayList<DeploymentInfo>();
for (Entity e:chor_cluster.getCluster().getMembers()) {
CloudNode nodeinfo;
String srv_tomcat_base_url="http://"+e.getAttribute(Tomcat8Server.HOSTNAME)+":"+e.getAttribute(Tomcat8Server.HTTP_PORT)+"/";
String srv_ode_base_url="http://"+e.getAttribute(Tomcat8Server.HOSTNAME)+":"+e.getAttribute(Tomcat8Server.HTTP_PORT)+"/ode/processes/";
DeploymentInfo d=new DeploymentInfo();
if (srv.getPackageType().equals(PackageType.ODE)) {
d.setEndpoint(srv_ode_base_url+srv.getName());
}
else {
d.setEndpoint(srv_tomcat_base_url+srv.getName());
}
//MachineProvisioningLocation location_props = e.getAttribute(SoftwareProcess.PROVISIONING_LOCATION);
nodeinfo=new CloudNode();
/* TODO find hw data in brooklyn
* nodeinfo.setCpus(cpus);
nodeinfo.setHostname(hostname);
nodeinfo.setImage(image);
nodeinfo.setIp(ip);
nodeinfo.setOs(so);
nodeinfo.setPrivateKey(privateKeyFile);
nodeinfo.setPrivateKeyFile(privateKeyFile);
nodeinfo.setRam(ram);
nodeinfo.setState(state);
nodeinfo.setStorage(storage);
nodeinfo.setUser(user);
nodeinfo.setZone(zone);
*/
d.setNode(nodeinfo);
}
srv.setDeploymentInfo(deploymentInfo);
srv.setInstances(chor_cluster.getCluster().getCurrentSize());
}
}
}
return c;
}
private List<ServiceDependency> create_deps(List<Map<String,String>> deps) {
List<ServiceDependency> result=new ArrayList<ServiceDependency>();
for (Map<String, String> dep:deps) {
for (String service_role: dep.keySet()) {
ServiceDependency sd=new ServiceDependency();
sd.setServiceSpecName(dep.get(service_role));
sd.setServiceSpecRole(service_role);
result.add(sd);
}
}
return result;
}
private Map<String, ChoreographyService> create_map(eu.chorevolution.datamodel.Choreography c) {
Map<String, ChoreographyService> res=new HashMap<String, ChoreographyService>();
for (ServiceGroup sg:c.getServiceGroups()) {
for (ChoreographyService srv:sg.getServices()) {
String url=null;
res.put(srv.getName(), srv);
}
}
return res;
}
public void configureNetwork() {
int nThreads=10;
DeployedService ds=null;
ConfigurableExistingService es=null;
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
List<Callable<AtomicInteger>> tasks = new ArrayList<Callable<AtomicInteger>>();
eu.chorevolution.datamodel.Choreography chor_spec = getChorSpec();
Map<String, ChoreographyService> service_map=create_map(chor_spec);
for (ChoreographyService srv:service_map.values()) {
List<ServiceDependency> service_deps=null;
String source_url=null;
String dest_name=null;
String dest_role=null;
if (srv instanceof ConfigurableExistingService) {
service_deps=((ConfigurableExistingService)srv).getDependencies();
source_url=((ConfigurableExistingService)srv).getUrl();
}
else if (srv instanceof DeployedService) {
service_deps=((DeployedService)srv).getDependencies();
source_url=((DeployedService)srv).getUrl();
}
for (ServiceDependency sd: service_deps) {
ArrayList<String> urls=new ArrayList<>();
dest_name=sd.getServiceSpecName();
dest_role=sd.getServiceSpecRole();
ChoreographyService d = service_map.get(dest_name);
if (d instanceof ExistingService)
urls.add(((ExistingService) d).getUrl());
else if (d instanceof ConfigurableExistingService)
urls.add(((ConfigurableExistingService) d).getUrl());
else if (d instanceof DeployedService)
urls.add(((DeployedService) d).getUrl());
log.info("Setting dependency {} (URL: {}) for service {} (URL: {})", new Object[]{dest_name, d, srv.getName(), source_url});
tasks.add(createInvocationTask(source_url, dest_role, dest_name, urls));
}
}
log.info("Configuring choreography...");
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
e.printStackTrace();
}
log.info("Choreography configured");
}
//Adds count to the sum and returns the reference of the sum as the result
private Callable<AtomicInteger> createInvocationTask(String sourceName, String partnerRole, String partnerName, List<String> partnerEndpoints) {
final String se=sourceName;
final String pr=partnerRole;
final String pn=partnerName;
final List<String> endpoints=partnerEndpoints;
Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){
public AtomicInteger call() {
try {
log.info("Service "+se+": setting "+pn+" to "+endpoints);
sendContext(se, pr, pn, endpoints);
} catch (Exception e) {
e.printStackTrace();
return new AtomicInteger(1);
}
return new AtomicInteger(0);
}
};
return clientPlanCall;
}
private void sendContext(String serviceEndpoint, String partnerRole, String partnerName,List<String> partnerEndpoints) throws ContextNotSentException, UnsupportedOperationException, SOAPException, XMLStreamException, IOException {
SOAPConnectionFactory sfc = SOAPConnectionFactory.newInstance();
SOAPConnection connection = sfc.createConnection();
MessageFactory mf = MessageFactory.newInstance();
SOAPMessage sm = mf.createMessage();
SOAPEnvelope envelope = sm.getSOAPPart().getEnvelope();
String namespace = parseNamespace(serviceEndpoint);
envelope.addNamespaceDeclaration("pre", namespace);
SOAPHeader sh = sm.getSOAPHeader();
SOAPBody sb = sm.getSOAPBody();
sh.detachNode();
QName bodyName = new QName("setInvocationAddress");
SOAPBodyElement bodyElement = sb.addBodyElement(bodyName);
bodyElement.setPrefix("pre");
QName role = new QName("arg0");
SOAPElement quotation1 = bodyElement.addChildElement(role);
quotation1.addTextNode(partnerRole);
QName name = new QName("arg1");
SOAPElement quotation2 = bodyElement.addChildElement(name);
quotation2.addTextNode(partnerName);
for (String partnerEndpoint : partnerEndpoints) {
QName address = new QName("arg2");
SOAPElement quotation3 = bodyElement.addChildElement(address);
quotation3.addTextNode(partnerEndpoint);
}
if (serviceEndpoint.trim().endsWith("/"))
serviceEndpoint = serviceEndpoint.substring(0, serviceEndpoint.length() - 1);
URL endpoint = new URL(serviceEndpoint);
// @SuppressWarnings("unused")
// this.printSOAPMessage(sm);
connection.call(sm, endpoint);
// this.printSOAPMessage(msg);
}
private String parseNamespace(final String endpoint) throws XMLStreamException, IOException {
final String wsdl = getWsdl(endpoint);
final URL url = new URL(wsdl);
final InputStreamReader streamReader = new InputStreamReader(url.openStream());
final BufferedReader wsdlInputStream = new BufferedReader(streamReader);
final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
final XMLEventReader reader = xmlInputFactory.createXMLEventReader(wsdlInputStream);
String elementName, namespace = "";
XMLEvent event;
StartElement element;
while (reader.hasNext()) {
event = reader.nextEvent();
if (event.isStartElement()) {
element = event.asStartElement();
elementName = element.getName().getLocalPart();
if ("import".equals(elementName)) {
final QName qname = new QName("namespace"); // NOPMD
namespace = element.getAttributeByName(qname).getValue();
break;
}
}
}
reader.close();
return namespace;
}
private String getWsdl(final String endpoint) {
String slashLess;
if (endpoint.endsWith("/")) {
slashLess = endpoint.substring(0, endpoint.length() - 1);
} else {
slashLess = endpoint;
}
return slashLess + "?wsdl";
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment