Commit 6fd0b74d authored by cdelbe's avatar cdelbe
Browse files

PICKED FROM TRUNK 15156

SCHEDULING-529 : add export variable for native tasks. Fix SCHEDULING-530

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/proactive/scheduling/branches/Scheduling_2.0.x@15161 28e8926c-6b08-0410-baaa-805c5e19b8d6
parent 03c18c9f
......@@ -41,31 +41,52 @@ package org.ow2.proactive.scripting;
public class Exporter {
/** Name of the java property that contains the names of currently exported properties */
public final static String PROPAGATED_PROPERTIES_VAR_NAME = "pa.scheduler.propagated.properties.names";
public final static String EXPORTED_PROPERTIES_VAR_NAME = "pa.scheduler.exported.properties.names";
public final static String EXPORTED_VARS_VAR_SEPARATOR = "%";
public final static String VARS_VAR_SEPARATOR = "%";
/**
* This method allows to export a java property, i.e. make this property available for all dependent tasks.
* @param key the name of the exported property.
* This method allows to propagate a java property, i.e. make this property available for all dependent tasks.
* @param key the name of the propagated property.
* @throws IllegalArgumentException if the property key is not set.
*/
public static void propagateProperty(String key) {
checkPropertyName(key);
// an exception is thrown key is not valid
String allPropagatedVars = System.getProperty(PROPAGATED_PROPERTIES_VAR_NAME);
if (allPropagatedVars == null) {
System.setProperty(PROPAGATED_PROPERTIES_VAR_NAME, "");
allPropagatedVars = "";
}
System.setProperty(PROPAGATED_PROPERTIES_VAR_NAME, allPropagatedVars + VARS_VAR_SEPARATOR + key);
}
/**
* This method allows to export a java property, i.e. make this property available in native and
* forked java task. For native task, the property is exported
* @param key
* @throws IllegalArgumentException if the property key is not set.
*/
public static void exportProperty(String key) {
checkPropertyName(key);
// an exception is thrown key is not valid
String allExportedVars = System.getProperty(EXPORTED_PROPERTIES_VAR_NAME);
if (allExportedVars == null) {
System.setProperty(EXPORTED_PROPERTIES_VAR_NAME, "");
allExportedVars = "";
}
System.setProperty(EXPORTED_PROPERTIES_VAR_NAME, allExportedVars + VARS_VAR_SEPARATOR + key);
}
private static void checkPropertyName(String key) {
if (System.getProperty(key) == null) {
throw new IllegalArgumentException(key + " is not set as Java Property");
} else if (key.length() > 255) {
throw new IllegalArgumentException(key +
" name is too long (exported property name length must be less than 256).");
} else if (key.contains(EXPORTED_VARS_VAR_SEPARATOR)) {
throw new IllegalArgumentException(key + " cannot contain character " +
EXPORTED_VARS_VAR_SEPARATOR);
} else {
String allExportedVars = System.getProperty(EXPORTED_PROPERTIES_VAR_NAME);
if (allExportedVars == null) {
System.setProperty(EXPORTED_PROPERTIES_VAR_NAME, "");
allExportedVars = "";
}
System.setProperty(EXPORTED_PROPERTIES_VAR_NAME, allExportedVars + EXPORTED_VARS_VAR_SEPARATOR +
key);
" name is too long (propagated property name length must be less than 256).");
} else if (key.contains(VARS_VAR_SEPARATOR)) {
throw new IllegalArgumentException(key + " cannot contain character " + VARS_VAR_SEPARATOR);
}
}
}
......@@ -106,11 +106,11 @@ public interface TaskResult extends Serializable {
public String getTextualDescription();
/**
* Returns a map containing all the java properties that has been exported by this task
* Returns a map containing all the java properties that has been propagated by this task
* (by the executable or by scripts).
* @see org.ow2.proactive.scripting.Exporter
* @return a map containing all the java properties that has been exported by this task,
* null if no property has been exported
* @return a map containing all the java properties that has been propagated by this task,
* null if no property has been propagated
*/
public Map<String, String> getExportedProperties();
public Map<String, String> getPropagatedProperties();
}
......@@ -34,6 +34,7 @@
*/
package org.ow2.proactive.scheduler.core;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
......@@ -1281,6 +1282,14 @@ public class SchedulerCore implements UserSchedulerInterface_, AdminMethodsInter
}
}
try {
File.createTempFile("MK_CORE_", ((res.getPropagatedProperties() != null) ? "" +
res.getPropagatedProperties().size() : "null"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//store this task result in the job result.
((JobResultImpl) job.getJobResult()).addTaskResult(descriptor.getName(), res, descriptor
.isPreciousResult());
......
/*
* ################################################################
*
* ProActive: The Java(TM) library for Parallel, Distributed,
* Concurrent computing with Security and Mobility
*
* Copyright (C) 1997-2009 INRIA/University of
* Nice-Sophia Antipolis/ActiveEon
* Contact: proactive@ow2.org
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; version 3 of
* the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA
*
* If needed, contact us to obtain a release under GPL Version 2.
*
* Initial developer(s): The ProActive Team
* http://proactive.inria.fr/team_members.htm
* Contributor(s):
*
* ################################################################
* $$ACTIVEEON_INITIAL_DEV$$
*/
package org.ow2.proactive.scheduler.examples;
import java.io.Serializable;
import org.ow2.proactive.scheduler.common.task.TaskResult;
import org.ow2.proactive.scheduler.common.task.executable.JavaExecutable;
/**
* Do nothing...
*/
public class EmptyTask extends JavaExecutable {
/** */
@Override
public Serializable execute(TaskResult... results) {
System.out.println("Do nothing...");
return "Nothing";
}
}
......@@ -43,6 +43,7 @@ import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.log4j.Logger;
import org.objectweb.proactive.ActiveObjectCreationException;
......@@ -52,12 +53,14 @@ import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.ow2.proactive.scheduler.common.exception.UserException;
import org.ow2.proactive.scheduler.common.task.TaskResult;
import org.ow2.proactive.scheduler.common.task.executable.Executable;
import org.ow2.proactive.scheduler.common.task.util.BigString;
import org.ow2.proactive.scheduler.exception.RunningProcessException;
import org.ow2.proactive.scheduler.exception.StartProcessException;
import org.ow2.proactive.scheduler.task.launcher.TaskLauncher.SchedulerVars;
import org.ow2.proactive.scheduler.util.SchedulerDevLoggers;
import org.ow2.proactive.scheduler.util.process.ProcessTreeKiller;
import org.ow2.proactive.scheduler.util.process.ThreadReader;
import org.ow2.proactive.scripting.Exporter;
import org.ow2.proactive.scripting.GenerationScript;
import org.ow2.proactive.scripting.ScriptHandler;
import org.ow2.proactive.scripting.ScriptLoader;
......@@ -217,14 +220,36 @@ public class NativeExecutable extends Executable {
taskEnvVariables.put(SchedulerVars.JAVAENV_TASK_NAME_VARNAME.toString(), System
.getProperty(SchedulerVars.JAVAENV_TASK_NAME_VARNAME.toString()));
// exported properties
String allVars = System.getProperty(Exporter.EXPORTED_PROPERTIES_VAR_NAME);
Map<String, String> taskExportedProperties = null;
if (allVars != null) {
StringTokenizer parser = new StringTokenizer(allVars, Exporter.VARS_VAR_SEPARATOR);
taskExportedProperties = new Hashtable<String, String>(parser.countTokens());
while (parser.hasMoreTokens()) {
String key = parser.nextToken();
String value = System.getProperty(key);
if (value != null) {
logger_dev.debug("Value of exported property " + key + " is " + value);
taskExportedProperties.put(key, value);
} else {
logger_dev.warn("Exported property " + key + " is not set !");
}
}
System.clearProperty(Exporter.EXPORTED_PROPERTIES_VAR_NAME);
}
// current env
Map<String, String> systemEnvVariables = System.getenv();
int i = 0;
int nbVars = taskEnvVariables.size() + systemEnvVariables.size() +
((taskExportedProperties != null) ? taskExportedProperties.size() : 0);
if (nodesFiles != null) {
envVarsTab = new String[taskEnvVariables.size() + systemEnvVariables.size() + 3];
envVarsTab = new String[nbVars + 3];
envVarsTab[i++] = CORE_FILE_ENV + "=" + nodesFiles.getAbsolutePath();
} else {
envVarsTab = new String[taskEnvVariables.size() + systemEnvVariables.size() + 2];
envVarsTab = new String[nbVars + 2];
}
envVarsTab[i++] = CORE_NB + "=" + coresNumber;
......@@ -233,14 +258,23 @@ public class NativeExecutable extends Executable {
for (Map.Entry<String, String> entry : taskEnvVariables.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
envVarsTab[i++] = ("" + name + "=" + value).toUpperCase().replace('.', '_');
envVarsTab[i++] = convertJavaenvNameToSysenvName(name) + "=" + value;
}
//after we add to the returnTab the system environment variables
for (Map.Entry<String, String> entry : systemEnvVariables.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
envVarsTab[i++] = "" + name + "=" + value;
envVarsTab[i++] = name + "=" + value;
}
//then we add exported properties
if (taskExportedProperties != null) {
for (Map.Entry<String, String> entry : taskExportedProperties.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
envVarsTab[i++] = convertJavaenvNameToSysenvName(name) + "=" + value;
}
}
//then the cookie used by ProcessTreeKiller
......@@ -319,4 +353,13 @@ public class NativeExecutable extends Executable {
return command;
}
/**
* Convert the Java Property name to the System environment name
*
* @param javaPropName the java Property name.
* @return the System environment name.
*/
private static String convertJavaenvNameToSysenvName(String javaPropName) {
return javaPropName.toUpperCase().replace('.', '_');
}
}
......@@ -161,12 +161,12 @@ public class TaskResultImpl implements TaskResult {
@Transient
private long taskDuration = -1;
/** All the properties export through Exporter.exportProperty() */
@Unloadable
/** All the properties propagated through Exporter.propagateProperty() */
// @Unloadable // BUG ? DON't WORK WITH UNLOADABLE
@OneToMany(cascade = javax.persistence.CascadeType.ALL)
@Cascade(CascadeType.ALL)
@LazyCollection(value = LazyCollectionOption.FALSE)
private Map<String, BigString> exportedProperties;
private Map<String, BigString> propagatedProperties;
/** ProActive empty constructor. */
public TaskResultImpl() {
......@@ -184,14 +184,14 @@ public class TaskResultImpl implements TaskResult {
* @param value the result of the task.
* @param output the output of the task.
* @param execDuration the execution duration of the task itself
* @param exportedProperties the map of all properties that has been exported through Exporter.exportProperty
* @param propagatedProperties the map of all properties that has been propagated through Exporter.propagateProperty
*/
public TaskResultImpl(TaskId id, Serializable value, TaskLogs output, long execDuration,
Map<String, BigString> exportedProperties) {
Map<String, BigString> propagatedProperties) {
this(id, output);
this.taskDuration = execDuration;
this.value = value;
this.exportedProperties = exportedProperties;
this.propagatedProperties = propagatedProperties;
try {
this.serializedValue = ObjectToByteConverter.ObjectStream.convert(value);
} catch (IOException e) {
......@@ -206,14 +206,14 @@ public class TaskResultImpl implements TaskResult {
* @param exception the exception that occurred in the task.
* @param output the output of the task.
* @param execDuration the execution duration of the task itself
* @param exportedProperties the map of all properties that has been exported through Exporter.exportProperty
* @param propagatedProperties the map of all properties that has been propagated through Exporter.propagateProperty
*/
public TaskResultImpl(TaskId id, Throwable exception, TaskLogs output, long execDuration,
Map<String, BigString> exportedProperties) {
Map<String, BigString> propagatedProperties) {
this(id, output);
this.taskDuration = execDuration;
this.exception = exception;
this.exportedProperties = exportedProperties;
this.propagatedProperties = propagatedProperties;
try {
this.serializedException = ObjectToByteConverter.ObjectStream.convert(exception);
} catch (IOException e) {
......@@ -519,16 +519,15 @@ public class TaskResultImpl implements TaskResult {
/**
* {@inheritDoc}
*/
public Map<String, String> getExportedProperties() {
// null if no prop has been exported
if (this.exportedProperties == null) {
public Map<String, String> getPropagatedProperties() {
// null if no prop has been propagated
if (this.propagatedProperties == null) {
return null;
} else {
// do not return to user internal type BigString
Map<String, String> convertedProperties = new Hashtable<String, String>(this.exportedProperties
Map<String, String> convertedProperties = new Hashtable<String, String>(this.propagatedProperties
.size());
for (String k : this.exportedProperties.keySet()) {
convertedProperties.put(k, this.exportedProperties.get(k).getValue());
for (String k : this.propagatedProperties.keySet()) {
convertedProperties.put(k, this.propagatedProperties.get(k).getValue());
}
return convertedProperties;
}
......
......@@ -92,7 +92,7 @@ public class JavaTaskLauncher extends TaskLauncher {
copyInputDataToScratch();
// set exported vars
this.setExportedProperties(results);
this.setPropagatedProperties(results);
sample = System.currentTimeMillis();
//launch pre script
......@@ -136,11 +136,11 @@ public class JavaTaskLauncher extends TaskLauncher {
//return result
return new TaskResultImpl(taskId, userResult, this.getLogs(), duration,
retreiveExportedProperties());
retreivePropagatedProperties());
} catch (Throwable ex) {
logger_dev.info("", ex);
// exceptions are always handled at scheduler core level
return new TaskResultImpl(taskId, ex, this.getLogs(), duration, retreiveExportedProperties());
return new TaskResultImpl(taskId, ex, this.getLogs(), duration, retreivePropagatedProperties());
} finally {
terminateDataSpace();
if (core != null) {
......
......@@ -105,7 +105,7 @@ public class NativeTaskLauncher extends TaskLauncher {
copyInputDataToScratch();
// set exported vars
this.setExportedProperties(results);
this.setPropagatedProperties(results);
//get Executable before schedule timer
currentExecutable = executableContainer.getExecutable();
......@@ -159,11 +159,11 @@ public class NativeTaskLauncher extends TaskLauncher {
//return result
return new TaskResultImpl(taskId, userResult, this.getLogs(), duration,
retreiveExportedProperties());
retreivePropagatedProperties());
} catch (Throwable ex) {
logger_dev.info("", ex);
// exceptions are always handled at scheduler core level
return new TaskResultImpl(taskId, ex, this.getLogs(), duration, retreiveExportedProperties());
return new TaskResultImpl(taskId, ex, this.getLogs(), duration, retreivePropagatedProperties());
} finally {
terminateDataSpace();
if (isWallTime()) {
......
......@@ -34,6 +34,8 @@
*/
package org.ow2.proactive.scheduler.task.launcher;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
......@@ -314,6 +316,9 @@ public abstract class TaskLauncher implements InitActive {
.getReadableName());
System.setProperty(SchedulerVars.JAVAENV_TASK_ID_VARNAME.toString(), this.taskId.value());
System.setProperty(SchedulerVars.JAVAENV_TASK_NAME_VARNAME.toString(), this.taskId.getReadableName());
// previously exported and propagated vars must be deleted
System.clearProperty(Exporter.EXPORTED_PROPERTIES_VAR_NAME);
System.clearProperty(Exporter.PROPAGATED_PROPERTIES_VAR_NAME);
}
/**
......@@ -328,52 +333,52 @@ public abstract class TaskLauncher implements InitActive {
/**
* Set as Java Property all the properties that comes with incoming results, i.e.
* properties that have been exported in parent tasks.
* properties that have been propagated in parent tasks.
* @see org.ow2.proactive.scripting.Exporter
*/
protected void setExportedProperties(TaskResult[] incomingResults) {
protected void setPropagatedProperties(TaskResult[] incomingResults) {
for (int i = 0; i < incomingResults.length; i++) {
Map<String, String> properties = incomingResults[i].getExportedProperties();
Map<String, String> properties = incomingResults[i].getPropagatedProperties();
if (properties != null) {
logger_dev.info("Imported properties for task " + this.taskId + " are " + properties);
logger_dev.info("Incoming properties for task " + this.taskId + " are " + properties);
for (String key : properties.keySet()) {
logger_dev.debug("Value of imported property " + key + " is " + properties.get(key));
logger_dev.debug("Value of Incoming property " + key + " is " + properties.get(key));
System.setProperty(key, properties.get(key));
}
} else {
logger_dev.info(" No imported properties for task " + this.taskId);
logger_dev.info("No Incoming properties for task " + this.taskId);
}
}
}
/**
* Extract name and value of all the properties that have been exported during the execution
* Extract name and value of all the properties that have been propagated during the execution
* of this task launcher (on scripts and executable).
* @see org.ow2.proactive.scripting.Exporter
* @return a map that contains [name->value] of all exported properties.
* @return a map that contains [name->value] of all propagated properties.
*/
protected Map<String, BigString> retreiveExportedProperties() {
// get all names of exported vars
String allVars = System.getProperty(Exporter.EXPORTED_PROPERTIES_VAR_NAME);
protected Map<String, BigString> retreivePropagatedProperties() {
// get all names of propagated vars
String allVars = System.getProperty(Exporter.PROPAGATED_PROPERTIES_VAR_NAME);
if (allVars != null) {
logger_dev.info("Exported properties for task " + this.taskId + " are : " + allVars);
StringTokenizer parser = new StringTokenizer(allVars, Exporter.EXPORTED_VARS_VAR_SEPARATOR);
logger_dev.info("Propagated properties for task " + this.taskId + " are : " + allVars);
StringTokenizer parser = new StringTokenizer(allVars, Exporter.VARS_VAR_SEPARATOR);
Map<String, BigString> exportedVars = new Hashtable<String, BigString>();
while (parser.hasMoreTokens()) {
String key = parser.nextToken();
String value = System.getProperty(key);
if (value != null) {
logger_dev.debug("Value of exported property " + key + " is " + value);
logger_dev.debug("Value of Propagated property " + key + " is " + value);
exportedVars.put(key, new BigString(value));
System.clearProperty(key);
} else {
logger_dev.warn("Exported property " + key + " is not set !");
logger_dev.warn("Propagated property " + key + " is not set !");
}
}
System.clearProperty(Exporter.EXPORTED_PROPERTIES_VAR_NAME);
System.clearProperty(Exporter.PROPAGATED_PROPERTIES_VAR_NAME);
return exportedVars;
} else {
logger_dev.info("No exported properties for task " + this.taskId);
logger_dev.info("No Propagated properties for task " + this.taskId);
return null;
}
}
......
......@@ -61,12 +61,18 @@ public class TestExportVars extends FunctionalTest {
JobResult res = SchedulerTHelper.getJobResult(id);
for (String i : res.getAllResults().keySet()) {
System.out.println("==>" + i);
System.out.println("====> Output " + i + " : " + res.getResult(i).getOutput().getAllLogs(true));
}
String taskid = "task1";
TaskResult r = res.getResult(taskid);
Map<String, String> exVal = r.getExportedProperties();
Map<String, String> exVal = r.getPropagatedProperties();
System.out.println("+++++++++++++");
for (String k : r.getPropagatedProperties().keySet()) {
System.out.println("+++++++++++++" + k);
}
Assert.assertTrue(exVal != null);
Assert.assertTrue(exVal.get("key1").equals("value1"));
Assert.assertTrue(exVal.get("key2").equals("value2"));
......@@ -80,6 +86,15 @@ public class TestExportVars extends FunctionalTest {
r = res.getResult(taskid);
// exception in post script evaluation
Assert.assertTrue(r.hadException());
// nothing to test for task4
taskid = "task5";
r = res.getResult(taskid);
// exception in post script evaluation
System.out.println("***************************************" + r.value());
Assert.assertEquals(0, (Integer) r.value());
}
}
......@@ -2,6 +2,9 @@
<job xmlns="urn:proactive:jobdescriptor:dev" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:proactive:jobdescriptor:dev ../../../src/org/ow2/proactive/scheduler/common/xml/schemas/jobdescriptor/dev/schedulerjob.xsd"
name="job_export_var" cancelJobOnError="false" priority="normal">
<variables>
<variable name="WORK_DIR" value="${pa.scheduler.home}/classes/schedulerTests/functionaltests/executables"/>
</variables>
<description>Export variables</description>
<taskFlow>
<task name="task1" preciousResult="true">
......@@ -14,7 +17,7 @@
print("Setting system property key1 = " + java.lang.System.getProperty("key1") + "\n");
java.lang.System.setProperty("key2", "value2");
print("Setting system property key2 = " + java.lang.System.getProperty("key2") + "\n");
Exporter.exportProperty("key1");
Exporter.propagateProperty("key1");
</code>
</script>
</pre>
......@@ -23,7 +26,7 @@
<script>
<code language="javascript">
importClass(org.ow2.proactive.scripting.Exporter);
Exporter.exportProperty("key2");
Exporter.propagateProperty("key2");
</code>
</script>
</post>
......@@ -57,7 +60,7 @@
<script>
<code language="javascript">
importClass(org.ow2.proactive.scripting.Exporter);
Exporter.exportProperty("prohibited%name");
Exporter.propagateProperty("prohibited%name");
</code>
</script>
</pre>
......@@ -65,5 +68,41 @@
</task>
<task name="task4" preciousResult="true">
<pre>
<script>
<code language="javascript">
importClass(org.ow2.proactive.scripting.Exporter);
java.lang.System.setProperty("user.var.1", ".User Value 1.");
Exporter.propagateProperty("user.var.1");
</code>
</script>
</pre>
<javaExecutable class="org.ow2.proactive.scheduler.examples.EmptyTask"/>
</task>
<task name="task5" preciousResult="true">
<depends>
<task ref="task4"/>
</depends>
<pre>
<script>
<code language="javascript">
importClass(org.ow2.proactive.scripting.Exporter);
java.lang.System.setProperty("user.var.2", ".User Value 2.");
Exporter.exportProperty("user.var.2");
Exporter.exportProperty("user.var.1");
</code>
</script>
</pre>
<nativeExecutable>