Commit 347a61ff authored by cmathieu's avatar cmathieu
Browse files

Undoing 13841


git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/proactive/scheduling/tags/Scheduling_1.0.1@13845 28e8926c-6b08-0410-baaa-805c5e19b8d6
parent 787c0a44
......@@ -526,8 +526,6 @@
<include name="org/ow2/proactive/scheduler/common/util/logforwarder/LogForwardingService.java" />
<include name="org/ow2/proactive/scheduler/util/classloading/TaskClassServer.java" />
<include name="org/ow2/proactive/scheduler/util/classloading/TaskClassLoader.java" />
<include name="org/ow2/proactive/scheduler/util/classloading/*.java" />
<include name="org/ow2/proactive/scheduler/util/classloading/resourcecache/*.java" />
<include name="org/ow2/proactive/scheduler/common/job/JobPriority.java" />
<include name="org/ow2/proactive/scheduler/common/job/TaskFlowJob.java" />
<include name="org/ow2/proactive/scheduler/common/job/ProActiveJob.java" />
......
......@@ -12,11 +12,6 @@
<proactive base="root" relpath="${proactive.home}">
<configuration>
<jvmarg value='-Dproactive.http.jetty.xml="${proactive.home}/config/deployment/jetty.xml"'/>
<jvmarg value='-Djava.system.class.loader=org.ow2.proactive.scheduler.util.classloading.ComputeNodeClassLoader' />
<jvmarg value='-Djavax.xml.xpath.XPathFactory:http://java.sun.com/jaxp/xpath/dom=com.sun.org.apache.xpath.internal.jaxp.XPathFactoryImpl'/>
<!--
<jvmarg value="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000" />
-->
<applicationClasspath>
<!-- Commented dependencies are referenced through the Jar-Index of Scheduler jars -->
<!-- Script engines must be explicitly in application classpath -->
......
......@@ -708,8 +708,9 @@ public class SchedulerCore implements UserSchedulerInterface_, AdminMethodsInter
logger_dev.debug("Load and Initialize the executable container for task '" +
internalTask.getId() + "'");
ExecutableContainerInitializer eci = new ExecutableContainerInitializer();
eci.setClassServer(getTaskClassServer(currentJob.getId()));
if (InternalJavaTask.class.isAssignableFrom(internalTask.getClass())) {
eci.setClassServer(getTaskClassServer(currentJob.getId()));
}
internalTask.getExecutableContainer().init(eci);
node = nodeSet.get(0);
......
......@@ -54,7 +54,6 @@ import org.ow2.proactive.scheduler.common.exception.ExecutableCreationException;
import org.ow2.proactive.scheduler.common.task.executable.Executable;
import org.ow2.proactive.scheduler.common.task.executable.JavaExecutable;
import org.ow2.proactive.scheduler.common.task.util.BigString;
import org.ow2.proactive.scheduler.util.classloading.ComputeNodeClassLoader;
import org.ow2.proactive.scheduler.util.classloading.TaskClassLoader;
import org.ow2.proactive.scheduler.util.classloading.TaskClassServer;
......@@ -122,11 +121,9 @@ public class JavaExecutableContainer implements ExecutableContainer {
// Instanciate the actual executable
try {
TaskClassLoader tcl = new TaskClassLoader(this.getClass().getClassLoader(), this.classServer);
// Add the task class loader to the system classloader
ComputeNodeClassLoader cncl = (ComputeNodeClassLoader) ClassLoader.getSystemClassLoader();
cncl.setTaskClassLoader(tcl);
Class<?> userExecutableClass = cncl.loadClass(this.userExecutableClassName);
// the tcl becomes the context classloader
Thread.currentThread().setContextClassLoader(tcl);
Class<?> userExecutableClass = tcl.loadClass(this.userExecutableClassName);
userExecutable = (JavaExecutable) userExecutableClass.newInstance();
Map<String, String> tmp = new HashMap<String, String>();
for (Entry<String, BigString> e : this.args.entrySet()) {
......@@ -134,7 +131,7 @@ public class JavaExecutableContainer implements ExecutableContainer {
}
userExecutable.setArgs(tmp);
} catch (Throwable e) {
throw new ExecutableCreationException("Unable to instanciate JavaExecutable : ", e);
throw new ExecutableCreationException("Unable to instanciate JavaExecutable : " + e);
}
}
return userExecutable;
......
......@@ -223,11 +223,6 @@ public class ForkedJavaTaskLauncher extends JavaTaskLauncher {
!"".equals(forkEnvironment.getJVMParameters())) {
command.append(" " + forkEnvironment.getJVMParameters() + " ");
}
command
.append("-Djava.system.class.loader=org.ow2.proactive.scheduler.util.classloading.ComputeNodeClassLoader ");
command
.append("-Djavax.xml.xpath.XPathFactory:http://java.sun.com/jaxp/xpath/dom=com.sun.org.apache.xpath.internal.jaxp.XPathFactoryImpl ");
}
private void setClasspath(StringBuffer command) {
......
......@@ -64,7 +64,6 @@ import org.ow2.proactive.scheduler.common.util.logforwarder.util.LoggingOutputSt
import org.ow2.proactive.scheduler.task.ExecutableContainer;
import org.ow2.proactive.scheduler.task.KillTask;
import org.ow2.proactive.scheduler.util.SchedulerDevLoggers;
import org.ow2.proactive.scheduler.util.classloading.ComputeNodeClassLoader;
import org.ow2.proactive.scripting.Script;
import org.ow2.proactive.scripting.ScriptHandler;
import org.ow2.proactive.scripting.ScriptLoader;
......@@ -207,10 +206,6 @@ public abstract class TaskLauncher implements InitActive {
core.terminate(taskId);
}
this.currentExecutable = null;
// Kill reset the current task class loader
ComputeNodeClassLoader cncl = (ComputeNodeClassLoader) ClassLoader.getSystemClassLoader();
cncl.setTaskClassLoader(null);
}
/**
......
package org.ow2.proactive.scheduler.util.classloading;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicReference;
/** The system class loader of compute nodes
*
* Computes node must have a dedicated class loader for each task; the task
* class loader. Unfortunately, due to ProActive restrictions, it is not possible
* to set the task class loader as a thread context class loader (ProActive would not
* be able to generate the stub classes).
*
* Therefore, a custom system class loader must be set. It is known as the
* {@link ComputeNodeClassLoader}. It inherits from the URLClassLoader and
* will load the class from the VM classpath. In addition, if the task class
* loader is set, the {@link ComputeNodeClassLoader} use it is a class is not
* available in the classpath.
*
* When a task finish, the task class loader is unset. It will be garbage collected
* and the classes will be unloaded.
*/
public class ComputeNodeClassLoader extends URLClassLoader {
/** The curent task class loader, can be null */
private AtomicReference<ClassLoader> tcl;
final private ClassLoader parent;
public ComputeNodeClassLoader(ClassLoader parent) {
super(new URL[] {}, parent);
this.parent = parent;
// Add the classpath
final String s = System.getProperty("java.class.path");
final File[] path = (s == null) ? new File[0] : getClassPath(s);
final URL[] urls = (s == null) ? new URL[0] : pathToURLs(path);
for (URL url : urls) {
super.addURL(url);
}
this.tcl = new AtomicReference<ClassLoader>(null);
}
/** Set the current classloader */
public void setTaskClassLoader(ClassLoader tcl) {
this.tcl.set(tcl);
}
/* Get from the resource from the parent class loader. If the resource
* is not found then try to use the task class loader (if set)
*/
@Override
public URL getResource(String name) {
URL url = null;
url = super.getResource(name);
if (url == null) {
ClassLoader cl = tcl.get();
if (cl != null) {
url = cl.getResource(name);
}
}
return url;
}
/* Get from the resource from the parent class loader. If the resource
* is not found then try to use the task class loader (if set)
*/
@Override
public InputStream getResourceAsStream(String name) {
InputStream is = null;
is = super.getResourceAsStream(name);
if (is == null) {
ClassLoader cl = tcl.get();
if (cl != null) {
is = cl.getResourceAsStream(name);
}
}
return is;
}
/* Merge the results of the parent and the task class loaders. */
@Override
public Enumeration<URL> getResources(String name) throws IOException {
Enumeration<URL> parent = super.getResources(name);
Enumeration<URL> task = null;
ClassLoader cl = tcl.get();
if (cl != null) {
task = cl.getResources(name);
}
Enumeration<URL>[] array;
array = (task == null ? new Enumeration[] { parent } : new Enumeration[] { parent, task });
return new CNEnumeration<URL>(array);
}
/*
*
*
*/
protected Class<?> findClass(final String name) throws ClassNotFoundException {
Class<?> clazz = null;
try {
// Search in the classpath
clazz = super.findClass(name);
} catch (ClassNotFoundException e1) {
try {
// Search in the parent class loader (AppClassLoader -> Extensions -> Bootstrap)
clazz = parent.loadClass(name);
} catch (ClassNotFoundException e2) {
// If set, search in the task class loader
ClassLoader cl = tcl.get();
if (cl != null) {
try {
clazz = cl.loadClass(name);
} catch (ClassNotFoundException e3) {
// Houston, we have a problem: Class Not Found
}
}
}
}
if (clazz == null) {
throw new ClassNotFoundException("Class not found " + name);
}
return clazz;
}
/* Override the ClassLoader.loadClass() method to use our own search algorithm */
@Override
protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
Class<?> clazz = null;
// DO NOT load ourself. ComputeNodeClassLoader must only be loaded by the AppClassLoader
if (!name.equals("org.ow2.proactive.scheduler.util.classloading.ComputeNodeClassLoader")) {
try {
// First, check if the class has already been loaded
clazz = findLoadedClass(name);
if (clazz == null) {
clazz = this.findClass(name);
}
} catch (ClassNotFoundException e) {
// Miam miam miam
}
}
if (clazz == null) { // It's not an "else"
clazz = super.loadClass(name, resolve);
}
return clazz;
}
/*
* getClassPath, getFileURL and pathToURLs come from Open JDK 6
*/
private static File[] getClassPath(String cp) {
File[] path;
if (cp != null) {
int count = 0, maxCount = 1;
int pos = 0, lastPos = 0;
// Count the number of separators first
while ((pos = cp.indexOf(File.pathSeparator, lastPos)) != -1) {
maxCount++;
lastPos = pos + 1;
}
path = new File[maxCount];
lastPos = pos = 0;
// Now scan for each path component
while ((pos = cp.indexOf(File.pathSeparator, lastPos)) != -1) {
if (pos - lastPos > 0) {
path[count++] = new File(cp.substring(lastPos, pos));
} else {
// empty path component translates to "."
path[count++] = new File(".");
}
lastPos = pos + 1;
}
// Make sure we include the last path component
if (lastPos < cp.length()) {
path[count++] = new File(cp.substring(lastPos));
} else {
path[count++] = new File(".");
}
// Trim array to correct size
if (count != maxCount) {
File[] tmp = new File[count];
System.arraycopy(path, 0, tmp, 0, count);
path = tmp;
}
} else {
path = new File[0];
}
return path;
}
private static URL[] pathToURLs(File[] path) {
URL[] urls = new URL[path.length];
for (int i = 0; i < path.length; i++) {
urls[i] = getFileURL(path[i]);
}
return urls;
}
private static URL getFileURL(File file) {
try {
file = file.getCanonicalFile();
} catch (IOException e) {
}
try {
return file.toURI().toURL();
} catch (MalformedURLException e) {
// Should never happen since we specify the protocol...
throw new InternalError();
}
}
static public class CNEnumeration<E> implements Enumeration<E> {
final private LinkedList<E> elements = new LinkedList<E>();
private int index = 0;
public CNEnumeration(Enumeration<E>[] enums) {
for (Enumeration<E> en : enums) {
while (en.hasMoreElements()) {
E e = en.nextElement();
elements.add(e);
}
}
}
public boolean hasMoreElements() {
return index < elements.size();
}
public E nextElement() {
if (!hasMoreElements()) {
throw new NoSuchElementException();
}
return (E) elements.get(index++);
}
public void addElement(E e) {
elements.add(e);
}
}
}
......@@ -31,21 +31,13 @@
*/
package org.ow2.proactive.scheduler.util.classloading;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicLong;
import org.ow2.proactive.scheduler.util.classloading.ComputeNodeClassLoader.CNEnumeration;
import org.ow2.proactive.scheduler.util.classloading.resourcecache.Handler;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.ow2.proactive.scheduler.util.SchedulerDevLoggers;
/**
* This classLoader is used on compute nodes provided by the resource manager to instantiate
* This classLoader is used on nodes provided by the resource manager to instantiate
* the executable. If a class is not found locally, then this class is asked to the
* taskClassCerver associated to this TaskClassLoader.
*
......@@ -55,28 +47,11 @@ import org.ow2.proactive.scheduler.util.classloading.resourcecache.Handler;
*/
public class TaskClassLoader extends ClassLoader {
/* Used to detect recursion.
*
* A HahShet is attached to each thread, before calling the parent
* class loader we put an identifier into the set. If the identifier
* is already present in the set then we MUST NOT delegate the call
* to the parent.
*
* One set is used for each operation. They could probably be merged into
* a single set.
*/
private final ThreadLocal<HashSet<String>> findClass;
private final ThreadLocal<HashSet<String>> getResource;
private final ThreadLocal<HashSet<String>> getResourceAsStream;
/* An unique resource identifier to enable collaboration between the task
* class loader and the protocol handler
*/
private final AtomicLong counter = new AtomicLong();
public static final Logger logger_dev = ProActiveLogger.getLogger(SchedulerDevLoggers.CORE);
/** The associated classserver on the scheduler core side */
// Can be null if no classpath has been set for the job
final private TaskClassServer remoteServer;
private TaskClassServer remoteServer;
/**
* Create a new classloader.
......@@ -85,157 +60,50 @@ public class TaskClassLoader extends ClassLoader {
*/
public TaskClassLoader(ClassLoader parent, TaskClassServer remoteServer) {
super(parent);
this.remoteServer = remoteServer;
this.findClass = new ThreadLocal<HashSet<String>>();
this.getResource = new ThreadLocal<HashSet<String>>();
this.getResourceAsStream = new ThreadLocal<HashSet<String>>();
}
/* Overrides ClassLoader.loadClass to always delagate the call to findClass */
@Override
public synchronized Class<?> loadClass(String className) throws ClassNotFoundException {
/* (non-Javadoc)
* @see java.lang.ClassLoader#loadClass(java.lang.String)
*/
public Class<?> loadClass(String className) throws ClassNotFoundException {
return this.findClass(className);
}
/*
* Try to download the class from the remote class server.
* If the class is not remotely available, then the parent class loader
* is used.
*
* It would be faster to try the parent class loader first (avoid 1RTT to load
* a class available in the classpath), but I'm not sure it will not break everything.
* Needs more investigations.
/* (non-Javadoc)
* @see java.lang.ClassLoader#findClass(java.lang.String)
*/
protected Class<?> findClass(String className) throws ClassNotFoundException {
public Class<?> findClass(String className) throws ClassNotFoundException {
logger_dev.debug("Looking for class " + className);
Class<?> res = this.findLoadedClass(className);
if (res == null) {
if (remoteServer != null) {
// The remote call should be try-catched to be safe in case of
// network failure. But it is currently impossible
byte[] classBytes = this.remoteServer.getClassBytes(className);
if (classBytes != null && classBytes.length != 0) {
res = this.defineClass(className, classBytes, 0, classBytes.length);
}
}
}
// Ask to the parent (ComputeNodeClassLoader) and avoid infinite recursion
if (res == null) {
if (findClass.get() == null) {
findClass.set(new HashSet<String>());
}
if (findClass.get().add(className)) {
res = super.loadClass(className, true);
findClass.get().remove(className);
}
}
if (res == null) {
throw new ClassNotFoundException("Class not found: " + className);
}
return res;
}
/*
* Since getResource() is usually followed by URL.openConnection(), the
* resource is proactively downloaded (and cached) from the remote class server.
* This behavior avoid 1 RTT.
*
* A custom protocol handler is used. It caches the downloaded class on the
* local runtime. So, URL.openConnection() does not perform any remote operation.
*
* The caching algorithm could be improved to avoid multiple download of the same
* resource. The current implemenation is trivial but safe.
*/
@Override
public URL getResource(String name) {
URL url = null;
InputStream is = this.getResourceAsStream(name);
if (is != null) {
Long id = counter.getAndIncrement();
Handler.cache.addResource(id.toString(), is);
try {
url = new URL(Handler.scheme, "unused", id.toString());
} catch (MalformedURLException e) {
// Miam miam miam, cannot happen
}
}
if (url == null) {
if (getResource.get() == null) {
getResource.set(new HashSet<String>());
}
if (getResource.get().add(name)) {
url = super.getResource(name);
getResource.get().remove(name);
}
}
return url;
}
/*
* The same algorithm than findClass is used. Ask to the remote class server first,
* then use the parent class loader.
*/
@Override
public InputStream getResourceAsStream(String name) {
if (name == null)
return null;
InputStream is = null;
// The remote class server expects class name, not a path.
String className = name.replace('/', '.');
className = className.replaceAll("\\.class$", "");
if (this.remoteServer != null) {
// It is not clear if getClassByte returns null or throws an Exception
if (res != null) {
// seeked class is already loaded. Return it.
logger_dev.info("Class " + className + " was already loaded");
} else {
// try parent
try {
byte[] buf = this.remoteServer.getClassBytes(className);
if (buf != null) {
is = new ByteArrayInputStream(buf);
}
res = this.getParent().loadClass(className);
logger_dev.debug("Found class " + className + " locally");
} catch (ClassNotFoundException e) {
if (remoteServer != null) {
// tries remote TaskClassServer...
logger_dev.debug("Ask for class " + className + " to the remote TaskClassServer");
byte[] classBytes = this.remoteServer.getClassBytes(className);
if (classBytes == null || classBytes.length == 0) {
logger_dev.debug("Did not find " + className);
throw new ClassNotFoundException(className);
} else {
logger_dev.debug("Found " + className);
res = this.defineClass(className, classBytes, 0, classBytes.length);
}
} else {
// no remote classserver available...