Commit 11c8f454 authored by Fabien Viale's avatar Fabien Viale
Browse files

Global variables and generic information

 - define properties files which allow to define global variables and generic info
 - Adapt StaxJobFactory to handle these global values
 - add relevant tests in TestStaxJobFactory
parent 12b2132a
# GLOBAL GENERIC INFORMATION
# This file can be used to define generic information which will be affected to all workflows
# e.g.
# MY_GLOBAL_INFO=some_value
\ No newline at end of file
# GLOBAL VARIABLES
# This file can be used to define variables which will be affected to all workflows
# e.g.
# MY_GLOBAL_VAR=some_value
\ No newline at end of file
......@@ -381,6 +381,16 @@ pa.scheduler.db.recovery.load.jobs.batch_size=100
# Batch size to fetch parent tasks'results in a merge task
pa.scheduler.db.fetch.batch_size=50
#-------------------------------------------------------
#------- VARIABLES & GENERIC INFO PROPERTIES ---------
#-------------------------------------------------------
# file containing variables which can be used in all workflows
pa.scheduler.global.variables.configuration=config/scheduler/global_variables.properties
# file containing generic information which can be used in all workflows
pa.scheduler.global.generic.info.configuration=config/scheduler/global_generic_info.properties
#-------------------------------------------------------
#---------- EMAIL NOTIFICATION PROPERTIES ------------
#-------------------------------------------------------
......
......@@ -378,8 +378,8 @@ public class SchedulerClientTest extends AbstractRestFuncTestCase {
JobId jobId = submitJob(job, client);
JobId jobId1 = client.reSubmit(jobId, Collections.emptyMap(), Collections.emptyMap());
String jobContent = client.getJobContent(jobId);
String jobContent1 = client.getJobContent(jobId1);
String jobContent = client.getJobContent(jobId).replaceAll("\\s+", "");
String jobContent1 = client.getJobContent(jobId1).replaceAll("\\s+", "");
assertEquals(jobContent, jobContent1);
}
......@@ -475,7 +475,7 @@ public class SchedulerClientTest extends AbstractRestFuncTestCase {
}
@Test
public void testReSubmitJobMargeVar() throws Exception {
public void testReSubmitJobMergeVar() throws Exception {
ISchedulerClient client = clientInstance();
Job job = nodeClientJob("/functionaltests/descriptors/dataspace_client_node_push_delete.groovy",
"/functionaltests/descriptors/dataspace_client_node_fork.groovy",
......@@ -501,7 +501,7 @@ public class SchedulerClientTest extends AbstractRestFuncTestCase {
}
@Test
public void testReSubmitJobMargeInfo() throws Exception {
public void testReSubmitJobMergeInfo() throws Exception {
ISchedulerClient client = clientInstance();
Job job = nodeClientJob("/functionaltests/descriptors/dataspace_client_node_push_delete.groovy",
"/functionaltests/descriptors/dataspace_client_node_fork.groovy",
......
......@@ -45,6 +45,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.xml.stream.XMLInputFactory;
......@@ -94,6 +95,7 @@ import org.ow2.proactive.topology.descriptor.ThresholdProximityDescriptor;
import org.ow2.proactive.topology.descriptor.TopologyDescriptor;
import org.ow2.proactive.utils.Tools;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
......@@ -129,6 +131,12 @@ public class StaxJobFactory extends JobFactory {
private GetJobContentGenerator getJobContentFactory = new GetJobContentGenerator();
@VisibleForTesting
static Map<String, JobVariable> globalVariables = null;
@VisibleForTesting
static Map<String, String> globalGenericInformation = null;
/**
* Create a new instance of StaxJobFactory.
*/
......@@ -386,13 +394,13 @@ public class StaxJobFactory extends JobFactory {
* the first tag that define the real type of job.
*
* @param cursorJob the streamReader with the cursor on the job element.
* @param replacementVariables job submission variables map taking priority on those defined in the xml
* @param replacementGenericInfos job submission generic infos map taking priority on those defined in the xml
* @param submittedJobVariables job submission variables map taking priority on those defined in the xml
* @param submittedGenericInfos job submission generic infos map taking priority on those defined in the xml
* @param jobContent contains xml representation of this job
* @throws JobCreationException if an exception occurs during job creation.
*/
private Job createAndFillJob(XMLStreamReader cursorJob, Map<String, String> replacementVariables,
Map<String, String> replacementGenericInfos, String jobContent) throws JobCreationException {
private Job createAndFillJob(XMLStreamReader cursorJob, Map<String, String> submittedJobVariables,
Map<String, String> submittedGenericInfos, String jobContent) throws JobCreationException {
// A temporary job
Job commonPropertiesHolder = new Job() {
......@@ -422,18 +430,30 @@ public class StaxJobFactory extends JobFactory {
try {
int eventType;
// Start by adding to the temporary job, the job submission variables.
if (replacementVariables != null) {
// Start by adding to the temporary job, the global variables (with internal references enabled)
commonPropertiesHolder.getVariables()
.putAll(replaceVariablesInJobVariablesMap(getConfiguredGlobalJobVariables(),
getConfiguredGlobalJobVariablesAsReplacementMap()));
commonPropertiesHolder.addGenericInformations(getResolvedGenericInformations(getConfiguredGlobalGenericInfo(),
commonPropertiesHolder.getVariablesAsReplacementMap()));
// Then add job submission variables, which will override eventually global variables
if (submittedJobVariables != null) {
commonPropertiesHolder.getVariables()
.putAll(submittedJobVariables.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
this::newJobVariable)));
// enable referencing of global variables by submitted variables
commonPropertiesHolder.getVariables()
.putAll(replacementVariables.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
this::newJobVariable)));
.putAll(replaceVariablesInJobVariablesMap(commonPropertiesHolder.getVariables(),
commonPropertiesHolder.getVariablesAsReplacementMap()));
}
// Then add job submission generic information, resolved using job submission and global variables
if (submittedGenericInfos != null) {
commonPropertiesHolder.addGenericInformations(getResolvedGenericInformations(submittedGenericInfos,
commonPropertiesHolder.getVariablesAsReplacementMap()));
}
// Then add job submission generic informations, resolved using job submission variables
if (replacementGenericInfos != null)
commonPropertiesHolder.addGenericInformations(getResolvedGenericInformations(replacementGenericInfos,
replacementVariables));
// Continue to fill the temporary job with xml elements
while (cursorJob.hasNext()) {
......@@ -448,7 +468,7 @@ public class StaxJobFactory extends JobFactory {
Map<String, JobVariable> unresolvedJobVariablesMap = createUnresolvedJobVariables(cursorJob);
commonPropertiesHolder.getUnresolvedVariables().putAll(unresolvedJobVariablesMap);
Map<String, JobVariable> jobVariablesMap = replaceVariablesInJobVariablesMap(unresolvedJobVariablesMap,
replacementVariables);
submittedJobVariables);
commonPropertiesHolder.getVariables().putAll(jobVariablesMap);
} else if (XMLTags.COMMON_GENERIC_INFORMATION.matches(current)) {
......@@ -458,8 +478,11 @@ public class StaxJobFactory extends JobFactory {
Map<String, String> resolvedGenericInformationsDefinedInWorkflow = getResolvedGenericInformations(unresolvedGenericInformationsDefinedInWorkflow,
resolvedJobVariables);
// Then add/replace the resolved generic infos in the xml with the ones specified at job submission
Map<String, String> submittedGenericInformations = commonPropertiesHolder.getGenericInformation();
resolvedGenericInformationsDefinedInWorkflow.putAll(submittedGenericInformations);
if (submittedGenericInfos != null) {
Map<String, String> submittedGenericInformations = getResolvedGenericInformations(submittedGenericInfos,
commonPropertiesHolder.getVariablesAsReplacementMap());
resolvedGenericInformationsDefinedInWorkflow.putAll(submittedGenericInformations);
}
// Update the temporary job
commonPropertiesHolder.setGenericInformation(resolvedGenericInformationsDefinedInWorkflow);
......@@ -537,6 +560,58 @@ public class StaxJobFactory extends JobFactory {
}
private static synchronized Map<String, JobVariable> getConfiguredGlobalJobVariables() {
if (globalVariables == null) {
globalVariables = new LinkedHashMap<>();
String path = PASchedulerProperties.getAbsolutePath(PASchedulerProperties.GLOBAL_VARIABLES_CONFIGURATION.getValueAsString());
logger.info("Loading global variables from file: " + path);
Properties props = new Properties();
try (InputStream fis = new FileInputStream(path)) {
props.load(fis);
} catch (IOException e) {
logger.error("Global Variable Configuration file: " + path + " cannot be read.", e);
return globalVariables;
}
Map<String, String> propertyMap = (Map) props;
for (Map.Entry<String, String> var : propertyMap.entrySet()) {
globalVariables.put(var.getKey(), new JobVariable(var.getKey(), var.getValue()));
}
}
return new LinkedHashMap<>(globalVariables);
}
private static synchronized Map<String, String> getConfiguredGlobalJobVariablesAsReplacementMap() {
if (globalVariables == null) {
// initialize the global variable map
getConfiguredGlobalJobVariables();
}
Map<String, String> replacementVariables = new LinkedHashMap<>(globalVariables.size());
for (JobVariable variable : globalVariables.values()) {
replacementVariables.put(variable.getName(), variable.getValue());
}
return replacementVariables;
}
private static synchronized Map<String, String> getConfiguredGlobalGenericInfo() {
if (globalGenericInformation == null) {
globalGenericInformation = new LinkedHashMap<>();
String path = PASchedulerProperties.getAbsolutePath(PASchedulerProperties.GLOBAL_GENERIC_INFO_CONFIGURATION.getValueAsString());
logger.info("Loading global generic information from file: " + path);
Properties props = new Properties();
try (InputStream fis = new FileInputStream(path)) {
props.load(fis);
} catch (IOException e) {
logger.error("Global Generic Info Configuration file: " + path + " cannot be read.", e);
return globalGenericInformation;
}
Map<String, String> propertyMap = (Map) props;
for (Map.Entry<String, String> var : propertyMap.entrySet()) {
globalGenericInformation.put(var.getKey(), var.getValue());
}
}
return new LinkedHashMap<>(globalGenericInformation);
}
private void handleJobAttributes(Job commonPropertiesHolder, Map<String, String> delayedJobAttributes)
throws JobCreationException {
for (Map.Entry<String, String> delayedAttribute : delayedJobAttributes.entrySet()) {
......@@ -592,7 +667,8 @@ public class StaxJobFactory extends JobFactory {
*/
private Map<String, JobVariable> createUnresolvedJobVariables(XMLStreamReader cursorVariables)
throws JobCreationException {
HashMap<String, JobVariable> unresolvedVariablesMap = new LinkedHashMap<>();
// The following initializaion is to enable overridding of global variables by workflow variables
Map<String, JobVariable> unresolvedVariablesMap = getConfiguredGlobalJobVariables();
try {
int eventType;
while (cursorVariables.hasNext()) {
......@@ -802,9 +878,10 @@ public class StaxJobFactory extends JobFactory {
* @param cursorInfo the streamReader with the cursor on the 'ELEMENT_COMMON_GENERIC_INFORMATION' tag.
* @return the list of generic information as a hashMap.
*/
private HashMap<String, String> getUnresolvedGenericInformations(XMLStreamReader cursorInfo)
private Map<String, String> getUnresolvedGenericInformations(XMLStreamReader cursorInfo)
throws JobCreationException {
HashMap<String, String> infos = new HashMap<>();
// The following initializaion is to enable overridding of global generic info by workflow gi
Map<String, String> infos = getConfiguredGlobalGenericInfo();
try {
int eventType;
while (cursorInfo.hasNext()) {
......
......@@ -130,9 +130,9 @@ public enum XMLTags {
/**
* pattern that matches for open tag for provided tag name.
* e.g.: <code>String.format(CLOSE_TAG_PATTERN, XMLTags.VARIABLES</code> seeks
* for the string like this: &lt;/variables&gt;, &lt;/ variables&gt;, &lt; / variables &gt;, etc.
* for the string like this: &lt;/variables&gt;, &lt;variables/&gt; &lt;/ variables&gt;, &lt; / variables &gt;, etc.
*/
public static final String CLOSE_TAG_PATTERN = "<[ ]*/[ ]*%s[ ]*>";
public static final String CLOSE_TAG_PATTERN = "<[ ]*/[ ]*%s[ ]*>|<[ ]*%s[ ]*/[ ]*>";
private String xmlName;
......@@ -154,7 +154,7 @@ public enum XMLTags {
}
public String getCloseTagPattern() {
return String.format(CLOSE_TAG_PATTERN, this.xmlName);
return String.format(CLOSE_TAG_PATTERN, this.xmlName, this.xmlName);
}
private static Map<String, XMLTags> namesToEnum = null;
......
......@@ -380,6 +380,20 @@ public enum PASchedulerProperties implements PACommonProperties {
SCHEDULER_DB_FETCH_TASK_RESULTS_BATCH_SIZE("pa.scheduler.db.fetch.batch_size", PropertyType.INTEGER, "50"),
/* ***************************************************************** */
/* ************** VARIABLES & GENERIC INFO PROPERTIES ************** */
/* ***************************************************************** */
GLOBAL_VARIABLES_CONFIGURATION(
"pa.scheduler.global.variables.configuration",
PropertyType.STRING,
"config/scheduler/global_variables.properties"),
GLOBAL_GENERIC_INFO_CONFIGURATION(
"pa.scheduler.global.generic.info.configuration",
PropertyType.STRING,
"config/scheduler/global_generic_info.properties"),
/* ***************************************************************** */
/* ***************** EMAIL NOTIFICATION PROPERTIES ***************** */
/* ***************************************************************** */
......
......@@ -26,15 +26,18 @@
package org.ow2.proactive.scheduler.common.job.factories;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
......@@ -72,6 +75,8 @@ public class TestStaxJobFactory {
private static URI jobDescriptorWithUnresolvedGenericInfoAndVariables;
private static URI jobDescriptorWithGlobalVariablesAndGenericInfo;
private static URI jobDescriptorAttrDefGenericInformationXmlElement;
private static URI jobDescriptorAttrDefParameterXmlElement;
......@@ -100,6 +105,9 @@ public class TestStaxJobFactory {
jobDescriptorWithUnresolvedGenericInfoAndVariables = TestStaxJobFactory.class.getResource("job_with_unresolved_generic_info_and_variables.xml")
.toURI();
jobDescriptorWithGlobalVariablesAndGenericInfo = TestStaxJobFactory.class.getResource("/org/ow2/proactive/scheduler/common/job/factories/job_with_global_variables_and_gi.xml")
.toURI();
jobDescriptorAttrDefGenericInformationXmlElement = TestStaxJobFactory.class.getResource("job_attr_def_generic_information_xml_element.xml")
.toURI();
......@@ -118,6 +126,14 @@ public class TestStaxJobFactory {
@Before
public void setJobFactory() {
factory = (StaxJobFactory) JobFactory.getFactory(JOB_FACTORY_IMPL);
factory.globalVariables = new LinkedHashMap<>();
factory.globalGenericInformation = new LinkedHashMap<>();
}
@After
public void cleanGlobals() {
factory.globalVariables = new LinkedHashMap<>();
factory.globalGenericInformation = new LinkedHashMap<>();
}
@Test
......@@ -136,6 +152,87 @@ public class TestStaxJobFactory {
factory.createJob(jobDescriptorWithEmptyMetadata);
}
@Test
public void testCreateJobWithNoVariablesShouldReferenceGlobalVariablesAndGenericInfo() throws Exception {
factory.globalVariables.put("globalVar", new JobVariable("globalVar", "globalValue"));
factory.globalGenericInformation.put("globalGI", "globalGIValue");
Job testScriptJob = factory.createJob(jobDescriptorNoVariablesUri);
assertNotNull(testScriptJob.getVariables().get("globalVar"));
assertEquals("globalValue", testScriptJob.getVariables().get("globalVar").getValue());
assertEquals("globalGIValue", testScriptJob.getGenericInformation().get("globalGI"));
}
@Test
public void testCreateJobWithVariablesAndGenericInfoShouldReferenceGlobalVariables() throws Exception {
factory.globalVariables.put("referenced_global_var",
new JobVariable("referenced_global_var", "global_var_value"));
Job testScriptJob = factory.createJob(jobDescriptorWithGlobalVariablesAndGenericInfo);
assertNotNull(testScriptJob.getVariables().get("job_var_referencing_global_var"));
assertEquals("global_var_value", testScriptJob.getVariables().get("job_var_referencing_global_var").getValue());
assertEquals("global_var_value", testScriptJob.getGenericInformation().get("gen_info_referencing_global_var"));
// other vars and gi in the workflow should be present
assertNotNull(testScriptJob.getVariables().get("job_var"));
assertNotNull(testScriptJob.getGenericInformation().get("gen_info"));
}
@Test
public void testCreateJobWithVariablesAndGenericInfoShouldOverrideGlobalVariablesAndGenericInfo() throws Exception {
factory.globalVariables.put("global_var", new JobVariable("global_var", "global_var_value"));
factory.globalGenericInformation.put("global_gi", "global_gi_value");
Job testScriptJob = factory.createJob(jobDescriptorWithGlobalVariablesAndGenericInfo);
assertNotNull(testScriptJob.getVariables().get("global_var"));
assertEquals("global_var_overridden_by_xml", testScriptJob.getVariables().get("global_var").getValue());
assertEquals("global_gi_overridden_by_xml", testScriptJob.getGenericInformation().get("global_gi"));
// other vars and gi in the workflow should be present
assertNotNull(testScriptJob.getVariables().get("job_var"));
assertNotNull(testScriptJob.getGenericInformation().get("gen_info"));
}
@Test
public void
testCreateJobWithVariablesAndGenericInfoAndGlobalOnesShouldBeOverriddenBySubmittedVariablesAndGenericInfo()
throws Exception {
factory.globalVariables.put("global_var", new JobVariable("global_var", "global_var_value"));
factory.globalGenericInformation.put("global_gi", "global_gi_value");
Job testScriptJob = factory.createJob(jobDescriptorWithGlobalVariablesAndGenericInfo,
ImmutableMap.of("global_var", "submitted_var_value"),
ImmutableMap.of("global_gi", "submitted_gi_value"));
assertNotNull(testScriptJob.getVariables().get("global_var"));
assertEquals("submitted_var_value", testScriptJob.getVariables().get("global_var").getValue());
assertEquals("submitted_gi_value", testScriptJob.getGenericInformation().get("global_gi"));
// other vars and gi in the workflow should be present
assertNotNull(testScriptJob.getVariables().get("job_var"));
assertNotNull(testScriptJob.getGenericInformation().get("gen_info"));
}
@Test
public void testCreateJobWithSubmittedVariablesAndGenericInfoShouldReferenceGlobalVariables() throws Exception {
factory.globalVariables.put("global_var", new JobVariable("global_var", "global_var_value"));
factory.globalGenericInformation.put("global_gi", "global_gi_value");
Job testScriptJob = factory.createJob(jobDescriptorNoVariablesUri,
ImmutableMap.of("submitted_var", "${global_var}"),
ImmutableMap.of("submitted_gi", "${global_var}"));
assertNotNull(testScriptJob.getVariables().get("global_var"));
assertEquals("global_var_value", testScriptJob.getVariables().get("global_var").getValue());
assertNotNull(testScriptJob.getVariables().get("submitted_var"));
assertEquals("global_var_value", testScriptJob.getVariables().get("submitted_var").getValue());
assertEquals("global_gi_value", testScriptJob.getGenericInformation().get("global_gi"));
assertEquals("global_var_value", testScriptJob.getGenericInformation().get("submitted_gi"));
}
@Test
public void testCreateJobWithSubmittedVariablesAndGenericInfoShouldOverrideGlobalVariablesAndGenericInfo()
throws Exception {
factory.globalVariables.put("global_var", new JobVariable("global_var", "global_var_value"));
factory.globalGenericInformation.put("global_gi", "global_gi_value");
Job testScriptJob = factory.createJob(jobDescriptorNoVariablesUri,
ImmutableMap.of("global_var", "submitted_var_value"),
ImmutableMap.of("global_gi", "submitted_gi_value"));
assertNotNull(testScriptJob.getVariables().get("global_var"));
assertEquals("submitted_var_value", testScriptJob.getVariables().get("global_var").getValue());
assertEquals("submitted_gi_value", testScriptJob.getGenericInformation().get("global_gi"));
}
@Test
public void testCreateJobShouldUseVariableMapToReplaceJobNameVariable() throws Exception {
Map<String, String> variablesMap = Maps.newHashMap();
......
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns="urn:proactive:jobdescriptor:dev" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:proactive:jobdescriptor:dev ../../../src/org/ow2/proactive/scheduler/common/xml/schemas/jobdescriptor/dev/schedulerjob.xsd"
name="NoJobVariables" onTaskError="continueJobExecution" priority="normal">
<variables>
<variable name="job_var" value="job_var_value" />
<variable name="global_var" value="global_var_overridden_by_xml" />
<variable name="job_var_referencing_global_var" value="${referenced_global_var}" />
</variables>
<description>NoVariablesTestJob</description>
<genericInformation>
<info name="gen_info" value="gen_info_value"/>
<info name="global_gi" value="global_gi_overridden_by_xml"/>
<info name="gen_info_referencing_global_var" value="${referenced_global_var}"/>
</genericInformation>
<taskFlow>
<task name="Linux_Bash_Task">
<description>
<![CDATA[ The simplest task, ran by a bash engine. ]]>
</description>
<scriptExecutable>
<script>
<code language="bash">
<![CDATA[
ls -la
]]>
</code>
</script>
</scriptExecutable>
</task>
</taskFlow>
</job>
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment