Commit 2b9c2de8 authored by Alessio Carenini's avatar Alessio Carenini
Browse files

Moved postStart method to Choreography top level entity

parent 1195f668
......@@ -15,37 +15,14 @@
*/
package org.apache.brooklyn.entity.webapp.chorevolution;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.namespace.QName;
import javax.xml.soap.MessageFactory;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPBodyElement;
import javax.xml.soap.SOAPConnection;
import javax.xml.soap.SOAPConnectionFactory;
import javax.xml.soap.SOAPElement;
import javax.xml.soap.SOAPEnvelope;
import javax.xml.soap.SOAPException;
import javax.xml.soap.SOAPHeader;
import javax.xml.soap.SOAPMessage;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import org.apache.brooklyn.api.entity.Application;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.location.Location;
import org.apache.brooklyn.api.mgmt.Task;
......@@ -57,6 +34,7 @@ import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import org.apache.brooklyn.core.entity.trait.Startable;
import org.apache.brooklyn.core.feed.ConfigToAttributes;
import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
import org.apache.brooklyn.entity.proxy.LoadBalancer;
import org.apache.brooklyn.entity.webapp.ControlledDynamicWebAppClusterImpl;
import org.apache.brooklyn.util.collections.MutableList;
......@@ -78,7 +56,7 @@ public class ControlledDynamicChoreographyClusterImpl extends ControlledDynamicW
@Override
public void start(Collection<? extends Location> locations) {
ConfigToAttributes.apply(this, DEPENDENCIES_SPEC);
ConfigToAttributes.apply(this, CHOR_SPEC);
ConfigToAttributes.apply(this, CD_SPEC);
ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
List<Entity> childrenToStart=null;
......@@ -125,14 +103,9 @@ public class ControlledDynamicChoreographyClusterImpl extends ControlledDynamicW
throw Exceptions.propagate(e);
} finally {
connectSensors();
configureNetwork();
}
}
public void postStart() {
configureNetwork();
}
public void deployProcesses() throws InterruptedException, ExecutionException {
Map<String, String> cd_specs = getConfig(CD_SPEC);
Iterable<TomcatOdeServer> targets = Iterables.filter(getCluster().getChildren(), TomcatOdeServer.class);
......@@ -143,6 +116,17 @@ public class ControlledDynamicChoreographyClusterImpl extends ControlledDynamicW
}
private String findService(String service_name) {
String res=null;
Application app = getApplication();
Collection<Entity> clusters = app.getChildren();
for (Entity e:clusters) {
List<Map<String, Object>> specs = e.getConfig(CHOR_SPEC);
}
return res;
}
static <T> Task<T> whenServiceUp(final Entity target, final TaskAdaptable<T> task, String name) {
return Tasks.<T>builder().displayName(name).dynamic(true).body(new Callable<T>() {
@Override
......@@ -180,72 +164,6 @@ public class ControlledDynamicChoreographyClusterImpl extends ControlledDynamicW
}).build();
}
public void configureNetwork() {
int nThreads=10;
Map<String, Map<String, String>> deps = getAttribute(DEPENDENCIES_SPEC);
Map<String, String> coordination_delegates = getAttribute(CD_SPEC);
executor = Executors.newFixedThreadPool(nThreads);
List<Callable<AtomicInteger>> tasks = new ArrayList<Callable<AtomicInteger>>();
String tomcat_base_url="http://"+getController().getAttribute(HOSTNAME)+":"+getController().getAttribute(HTTP_PORT)+"/";
String ode_base_url="http://"+getController().getAttribute(HOSTNAME)+":"+getController().getAttribute(HTTP_PORT)+"/ode/processes/";
for (String source: deps.keySet()) {
Map<String, String> service_dependencies = deps.get(source);
for (String dest_role: service_dependencies.keySet()) {
String s=null;
String d=null;
String dest_name=service_dependencies.get(dest_role);
ArrayList<String> urls=new ArrayList<>();
if (coordination_delegates.keySet().contains(source))
s=ode_base_url+source;
else
s=tomcat_base_url+source;
if (coordination_delegates.keySet().contains(dest_name))
s=ode_base_url+dest_name;
else
s=tomcat_base_url+dest_name;
urls.add(d);
log.info("Setting dependency {} (URL: {}) for service {} (URL: {})", new Object[]{dest_name, d, source, s});
tasks.add(createInvocationTask(s, dest_role, dest_name, urls));
}
log.info("Configuring choreography...");
try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
e.printStackTrace();
}
log.info("Choreography configured");
}
}
//Adds count to the sum and returns the reference of the sum as the result
private Callable<AtomicInteger> createInvocationTask(String sourceName, String partnerRole, String partnerName, List<String> partnerEndpoints) {
final String se=sourceName;
final String pr=partnerRole;
final String pn=partnerName;
final List<String> endpoints=partnerEndpoints;
Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){
public AtomicInteger call() {
try {
log.info("Service "+se+": setting "+pn+" to "+endpoints);
sendContext(se, pr, pn, endpoints);
} catch (Exception e) {
e.printStackTrace();
return new AtomicInteger(1);
}
return new AtomicInteger(0);
}
};
return clientPlanCall;
}
@Override
public Integer resize(Integer desiredSize) {
......@@ -253,94 +171,4 @@ public class ControlledDynamicChoreographyClusterImpl extends ControlledDynamicW
return getCluster().resize(desiredSize);
}
private void sendContext(String serviceEndpoint, String partnerRole, String partnerName,List<String> partnerEndpoints) throws ContextNotSentException, UnsupportedOperationException, SOAPException, XMLStreamException, IOException {
SOAPConnectionFactory sfc = SOAPConnectionFactory.newInstance();
SOAPConnection connection = sfc.createConnection();
MessageFactory mf = MessageFactory.newInstance();
SOAPMessage sm = mf.createMessage();
SOAPEnvelope envelope = sm.getSOAPPart().getEnvelope();
String namespace = parseNamespace(serviceEndpoint);
envelope.addNamespaceDeclaration("pre", namespace);
SOAPHeader sh = sm.getSOAPHeader();
SOAPBody sb = sm.getSOAPBody();
sh.detachNode();
QName bodyName = new QName("setInvocationAddress");
SOAPBodyElement bodyElement = sb.addBodyElement(bodyName);
bodyElement.setPrefix("pre");
QName role = new QName("arg0");
SOAPElement quotation1 = bodyElement.addChildElement(role);
quotation1.addTextNode(partnerRole);
QName name = new QName("arg1");
SOAPElement quotation2 = bodyElement.addChildElement(name);
quotation2.addTextNode(partnerName);
for (String partnerEndpoint : partnerEndpoints) {
QName address = new QName("arg2");
SOAPElement quotation3 = bodyElement.addChildElement(address);
quotation3.addTextNode(partnerEndpoint);
}
if (serviceEndpoint.trim().endsWith("/"))
serviceEndpoint = serviceEndpoint.substring(0, serviceEndpoint.length() - 1);
URL endpoint = new URL(serviceEndpoint);
// @SuppressWarnings("unused")
// this.printSOAPMessage(sm);
connection.call(sm, endpoint);
// this.printSOAPMessage(msg);
}
private String parseNamespace(final String endpoint) throws XMLStreamException, IOException {
final String wsdl = getWsdl(endpoint);
final URL url = new URL(wsdl);
final InputStreamReader streamReader = new InputStreamReader(url.openStream());
final BufferedReader wsdlInputStream = new BufferedReader(streamReader);
final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
final XMLEventReader reader = xmlInputFactory.createXMLEventReader(wsdlInputStream);
String elementName, namespace = "";
XMLEvent event;
StartElement element;
while (reader.hasNext()) {
event = reader.nextEvent();
if (event.isStartElement()) {
element = event.asStartElement();
elementName = element.getName().getLocalPart();
if ("import".equals(elementName)) {
final QName qname = new QName("namespace"); // NOPMD
namespace = element.getAttributeByName(qname).getValue();
break;
}
}
}
reader.close();
return namespace;
}
private String getWsdl(final String endpoint) {
String slashLess;
if (endpoint.endsWith("/")) {
slashLess = endpoint.substring(0, endpoint.length() - 1);
} else {
slashLess = endpoint;
}
return slashLess + "?wsdl";
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment