Unverified Commit a8cb8ed1 authored by oanaBiancaSchiopu's avatar oanaBiancaSchiopu Committed by GitHub
Browse files

add-variables-and-values-to-signals (#3981)



* include-variables-and-values-to-signals

* add-validation-endpoint-for-signal-variables

* add-test-comments

* remove-debug-log

* change-return-type-of-waitFor-method

* change-outputvariables-name

* refactor-output-variables
Co-authored-by: default avatarOana Schiopu <oana-schiopu@activeeon.com>
parent f10118f8
......@@ -160,7 +160,7 @@ configure(javaSubprojects) {
testCompile 'org.hamcrest:hamcrest-junit:2.0.0.0'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile 'ObjectFaker:ObjectFaker:0.2'
testCompile 'ObjectFaker:ObjectFaker:0.4'
testCompile 'com.jayway.awaitility:awaitility:1.6.0'
testCompile 'org.apache.commons:commons-lang3:3.4'
......
......@@ -494,7 +494,7 @@ pa.scheduler.synchronization.db=data/synchronization
#-------------------------------------------------------
# name of the channel of workflow signals (STOP, CONTINUE, etc)
pa.scheduler.signals.channel=PA_SIGNALS_CHANNEL
pa.scheduler.signals.channel=PA_SIGNALS_CHANNEL_
#-------------------------------------------------------
#---------------- PORTAL PROPERTIES ------------------
......
......@@ -2351,7 +2351,27 @@ public interface SchedulerRestInterface {
@Consumes(value = MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
Set<String> addJobSignal(@HeaderParam("sessionid") String sessionId, @QueryParam("signal") String signal,
@PathParam("jobid") String jobId) throws RestException;
@PathParam("jobid") String jobId, Map<String, String> updatedVariables) throws RestException;
/**
*
* Validate the signal's output values
*
* @param sessionId
* current session
* @param jobId
* id of the job
* @param signal
* signal name to validate
*
* @return the result of signal validation
*/
@POST
@Path("job/{jobid}/signals/validate")
@Consumes(value = MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
JobValidationData validateJobSignal(@HeaderParam("sessionid") String sessionId, @QueryParam("signal") String signal,
@PathParam("jobid") String jobId, Map<String, String> updatedVariables) throws RestException;
/**
*
......
......@@ -78,6 +78,8 @@ public class JobInfoData implements java.io.Serializable {
private Set<String> signals;
private Map<String, Map<String, List<JobVariable>>> detailedSignals;
private Map<String, String> visualizationConnectionStrings;
private Map<String, String> visualizationIcons;
......@@ -264,6 +266,14 @@ public class JobInfoData implements java.io.Serializable {
this.signals = signals;
}
public Map<String, Map<String, List<JobVariable>>> getDetailedSignals() {
return detailedSignals;
}
public void setDetailedSignals(Map<String, Map<String, List<JobVariable>>> detailedSignals) {
this.detailedSignals = detailedSignals;
}
public Map<String, String> getVisualizationConnectionStrings() {
return visualizationConnectionStrings;
}
......@@ -313,8 +323,8 @@ public class JobInfoData implements java.io.Serializable {
", numberOfFailedTasks=" + numberOfFailedTasks + ", numberOfFaultyTasks=" + numberOfFaultyTasks +
", numberOfInErrorTasks=" + numberOfInErrorTasks + ", priority=" + priority + ", jobOwner='" + jobOwner +
"', projectName='" + projectName + "', toBeRemoved=" + toBeRemoved + ", genericInformation=" +
genericInformation + ", variables=" + variables + ", signals=" + signals + ", attachedServices=" +
attachedServices + '}';
genericInformation + ", variables=" + variables + ", signals=" + signals + ", detailedSignals=" +
detailedSignals + ", attachedServices=" + attachedServices + '}';
}
}
......@@ -61,6 +61,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
......@@ -110,6 +111,7 @@ import org.ow2.proactive.scheduler.common.job.JobPriority;
import org.ow2.proactive.scheduler.common.job.JobResult;
import org.ow2.proactive.scheduler.common.job.JobState;
import org.ow2.proactive.scheduler.common.job.JobStatus;
import org.ow2.proactive.scheduler.common.job.JobVariable;
import org.ow2.proactive.scheduler.common.job.TaskFlowJob;
import org.ow2.proactive.scheduler.common.job.factories.Job2XMLTransformer;
import org.ow2.proactive.scheduler.common.task.Task;
......@@ -139,6 +141,7 @@ import org.ow2.proactive_grid_cloud_portal.scheduler.dto.JobInfoData;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.JobResultData;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.JobStateData;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.JobUsageData;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.JobValidationData;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.RestPage;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.SchedulerStatusData;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.SchedulerUserData;
......@@ -1300,6 +1303,7 @@ public class SchedulerClient extends ClientBase implements ISchedulerClient {
jobInfoImpl.setVariables(jobInfoData.getVariables());
jobInfoImpl.setDetailedVariables(jobInfoData.getDetailedVariables());
jobInfoImpl.setSignals(jobInfoData.getSignals());
jobInfoImpl.setDetailedSignals(jobInfoData.getDetailedSignals());
jobInfoImpl.setVisualizationConnectionStrings(jobInfoData.getVisualizationConnectionStrings());
jobInfoImpl.setVisualizationIcons(jobInfoData.getVisualizationIcons());
jobInfoImpl.setAttachedServices(jobInfoData.getAttachedServices());
......@@ -1515,17 +1519,60 @@ public class SchedulerClient extends ClientBase implements ISchedulerClient {
}
@Override
public Set<String> addJobSignal(String jobId, String signal)
public Set<String> addJobSignal(String jobId, String signal, Map<String, String> updatedVariables)
throws NotConnectedException, UnknownJobException, PermissionException, SignalApiException {
Set<String> result = new HashSet<>();
try {
result = restApi().addJobSignal(sid, jobId, signal);
result = restApi().addJobSignal(sid, signal, jobId, updatedVariables);
} catch (Exception e) {
throwSAEorUJEOrNCEOrPE(e);
}
return result;
}
@Override
public List<JobVariable> validateJobSignal(String jobId, String signal, Map<String, String> updatedVariables)
throws NotConnectedException, UnknownJobException, PermissionException, SignalApiException {
List<JobVariable> jobVariables = new LinkedList<>();
try {
JobValidationData data = restApi().validateJobSignal(sid, signal, jobId, updatedVariables);
data.getUpdatedVariables().forEach((k, v) -> jobVariables.add(new JobVariable(k, v)));
data.getUpdatedAdvanced()
.forEach((k, v) -> jobVariables.stream()
.filter(jobVariable -> jobVariable.getName().equals(k))
.findFirst()
.get()
.setAdvanced(v));
data.getUpdatedHidden()
.forEach((k, v) -> jobVariables.stream()
.filter(jobVariable -> jobVariable.getName().equals(k))
.findFirst()
.get()
.setHidden(v));
data.getUpdatedDescriptions()
.forEach((k, v) -> jobVariables.stream()
.filter(jobVariable -> jobVariable.getName().equals(k))
.findFirst()
.get()
.setDescription(v));
data.getUpdatedGroups()
.forEach((k, v) -> jobVariables.stream()
.filter(jobVariable -> jobVariable.getName().equals(k))
.findFirst()
.get()
.setGroup(v));
data.getUpdatedModels()
.forEach((k, v) -> jobVariables.stream()
.filter(jobVariable -> jobVariable.getName().equals(k))
.findFirst()
.get()
.setModel(v));
} catch (Exception e) {
throwSAEorUJEOrNCEOrPE(e);
}
return jobVariables;
}
private org.apache.http.impl.client.HttpClientBuilder getHttpClientBuilder()
throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
SSLContextBuilder builder = new SSLContextBuilder();
......
......@@ -84,6 +84,7 @@ public class DataUtility {
impl.setVariables(d.getVariables());
impl.setDetailedVariables(d.getDetailedVariables());
impl.setSignals(d.getSignals());
impl.setDetailedSignals(d.getDetailedSignals());
impl.setVisualizationConnectionStrings(d.getVisualizationConnectionStrings());
impl.setVisualizationIcons(d.getVisualizationIcons());
impl.setAttachedServices(d.getAttachedServices());
......
......@@ -84,6 +84,8 @@ public class JobInfoImpl implements JobInfo {
private Set<String> signals;
private Map<String, Map<String, List<JobVariable>>> detailedSignals;
private Map<String, String> visualizationConnectionStrings;
private Map<String, String> visualizationIcons;
......@@ -316,6 +318,16 @@ public class JobInfoImpl implements JobInfo {
this.signals = signals;
}
@Override
public Map<String, Map<String, List<JobVariable>>> getDetailedSignals() {
return (detailedSignals == null) ? new LinkedHashMap<>() : detailedSignals;
}
@Override
public void setDetailedSignals(Map<String, Map<String, List<JobVariable>>> detailedSignals) {
this.detailedSignals = detailedSignals;
}
@Override
public Map<String, String> getVisualizationConnectionStrings() {
return visualizationConnectionStrings;
......
......@@ -149,6 +149,47 @@ public class SchedulerClientTest extends AbstractRestFuncTestCase {
Assert.assertTrue(jobInfo.getAttachedServices().containsKey(12));
}
@Test(timeout = MAX_WAIT_TIME)
public void testSignals() throws Throwable {
ISchedulerClient client = clientInstance();
Job job = nodeClientJob("/functionaltests/descriptors/register_service_with_signals.groovy", null, null);
//submit job with 'ready signal' and add variables
JobId jobId = submitJob(job, client);
JobInfo jobInfo;
//wait until 'ready_my_signal' signal is sent
do {
jobInfo = client.getJobInfo(jobId.toString());
Assert.assertNotNull(jobInfo);
Thread.sleep(1000);
} while (!jobInfo.getSignals().contains("ready_my_signal"));
Map<String, String> outpuVariables = new HashMap<>();
outpuVariables.put("name", "15");
// validate the job variables
List<JobVariable> jobVariables = client.validateJobSignal(jobId.value(), "my_signal", outpuVariables);
// add 'my_signal' signal
Set<String> signals = client.addJobSignal(jobId.value(), "my_signal", outpuVariables);
// wait until the job is finished
client.waitForJob(jobId.toString(), TimeUnit.MINUTES.toMillis(5));
Assert.assertFalse(signals.contains("ready_my_signal"));
Assert.assertTrue(signals.contains("my_signal"));
JobVariable nameJobVariable = jobVariables.stream()
.filter(jobVariable -> jobVariable.getName().equals("name"))
.findFirst()
.get();
Assert.assertNotNull(nameJobVariable);
Assert.assertEquals("15", nameJobVariable.getValue());
Assert.assertEquals("PA:Integer", nameJobVariable.getModel());
JobVariable secondJobVariable = jobVariables.stream()
.filter(jobVariable -> jobVariable.getName().equals("second"))
.findFirst()
.get();
Assert.assertNotNull(secondJobVariable);
Assert.assertEquals("15", secondJobVariable.getValue());
Assert.assertEquals("PA:Integer", secondJobVariable.getModel());
}
@Test(timeout = MAX_WAIT_TIME)
public void testDisconnect() throws Exception {
ISchedulerClient client = clientInstance();
......
import org.ow2.proactive.scheduler.common.job.JobVariable
List <JobVariable> variables = new java.util.ArrayList<JobVariable>()
variables.add(new JobVariable("name", "15", "PA:Integer"))
variables.add(new JobVariable("second", '${name}', "PA:Integer"))
signalapi.readyForSignal("my_signal", variables)
// Wait until one signal among those specified is received
outputParameters = signalapi.waitFor("my_signal")
println "Output parameters = " + outputParameters
\ No newline at end of file
......@@ -254,10 +254,30 @@ public class SchedulerStateRest implements SchedulerRestInterface {
}
@Override
public Set<String> addJobSignal(String sessionId, String signal, String jobId) throws RestException {
public Set<String> addJobSignal(String sessionId, String signal, String jobId, Map<String, String> updatedVariables)
throws RestException {
try {
Scheduler s = checkAccess(sessionId, "/scheduler/jobs/" + jobId);
return s.addJobSignal(jobId, signal, updatedVariables);
} catch (JobCreationException e) {
throw new JobCreationRestException(e);
} catch (SchedulerException e) {
throw RestException.wrapExceptionToRest(e);
}
}
@Override
public JobValidationData validateJobSignal(String sessionId, String signal, String jobId,
Map<String, String> updatedVariables) throws RestException {
JobValidationData data = new JobValidationData();
try {
Scheduler s = checkAccess(sessionId, "/scheduler/jobs/" + jobId);
return s.addJobSignal(jobId, signal);
List<JobVariable> jobVariables = s.validateJobSignal(jobId, signal, updatedVariables);
ValidationUtil.fillUpdatedVariables(jobVariables, data);
return data;
} catch (JobValidationException e) {
ValidationUtil.setJobValidationDataErrorMessage(data, e);
return data;
} catch (SchedulerException e) {
throw RestException.wrapExceptionToRest(e);
}
......
......@@ -67,6 +67,7 @@ import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ow2.proactive.scheduler.common.Scheduler;
......@@ -80,11 +81,19 @@ import org.ow2.proactive.scheduler.common.job.factories.FlowError;
import org.ow2.proactive.scheduler.common.job.factories.JobFactory;
import org.ow2.proactive.scheduler.common.task.Task;
import org.ow2.proactive.scheduler.common.task.TaskVariable;
import org.ow2.proactive.scheduler.synchronization.SynchronizationInternal;
import org.ow2.proactive_grid_cloud_portal.scheduler.dto.JobValidationData;
public class ValidationUtil {
/**
* Attributes used for the signal api
*/
private SynchronizationInternal publicStore;
private static final String SIGNAL_ORIGINATOR = "scheduler";
public static JobValidationData validateJobDescriptor(File jobDescFile, Map<String, String> jobVariables,
Scheduler scheduler, SchedulerSpaceInterface space, String sessionId) {
return validateJob(jobDescFile.getAbsolutePath(), jobVariables, scheduler, space, sessionId);
......@@ -106,31 +115,35 @@ public class ValidationUtil {
}
} catch (JobCreationException e) {
data.setTaskName(e.getTaskName());
data.setErrorMessage(e.getMessage());
data.setStackTrace(getStackTrace(e));
if (e.getUpdatedVariables() != null) {
data.setUpdatedVariables(e.getUpdatedVariables());
}
if (e.getUpdatedModels() != null) {
data.setUpdatedModels(e.getUpdatedModels());
}
if (e.getUpdatedDescriptions() != null) {
data.setUpdatedDescriptions(e.getUpdatedDescriptions());
}
if (e.getUpdatedGroups() != null) {
data.setUpdatedGroups(e.getUpdatedGroups());
}
if (e.getUpdatedAdvanced() != null) {
data.setUpdatedAdvanced(e.getUpdatedAdvanced());
}
if (e.getUpdatedHidden() != null) {
data.setUpdatedHidden(e.getUpdatedHidden());
}
setJobValidationDataErrorMessage(data, e);
}
return data;
}
public static void setJobValidationDataErrorMessage(JobValidationData data, JobCreationException e) {
data.setErrorMessage(e.getMessage());
data.setStackTrace(getStackTrace(e));
if (e.getUpdatedVariables() != null) {
data.setUpdatedVariables(e.getUpdatedVariables());
}
if (e.getUpdatedModels() != null) {
data.setUpdatedModels(e.getUpdatedModels());
}
if (e.getUpdatedDescriptions() != null) {
data.setUpdatedDescriptions(e.getUpdatedDescriptions());
}
if (e.getUpdatedGroups() != null) {
data.setUpdatedGroups(e.getUpdatedGroups());
}
if (e.getUpdatedAdvanced() != null) {
data.setUpdatedAdvanced(e.getUpdatedAdvanced());
}
if (e.getUpdatedHidden() != null) {
data.setUpdatedHidden(e.getUpdatedHidden());
}
}
private static void fillUpdatedVariables(TaskFlowJob job, JobValidationData data) {
HashMap<String, String> updatedVariables = new HashMap<>();
HashMap<String, String> updatedModels = new HashMap<>();
......@@ -164,6 +177,30 @@ public class ValidationUtil {
data.setUpdatedHidden(updatedHidden);
}
public static void fillUpdatedVariables(List<JobVariable> jobVariables, JobValidationData data) {
HashMap<String, String> updatedVariables = new HashMap<>();
HashMap<String, String> updatedModels = new HashMap<>();
HashMap<String, String> updatedDescriptions = new HashMap<>();
HashMap<String, String> updatedGroups = new HashMap<>();
HashMap<String, Boolean> updatedAdvanced = new HashMap<>();
HashMap<String, Boolean> updatedHidden = new HashMap<>();
for (JobVariable jobVariable : jobVariables) {
updatedVariables.put(jobVariable.getName(), jobVariable.getValue());
updatedModels.put(jobVariable.getName(), jobVariable.getModel());
updatedDescriptions.put(jobVariable.getName(), jobVariable.getDescription());
updatedGroups.put(jobVariable.getName(), jobVariable.getGroup());
updatedAdvanced.put(jobVariable.getName(), jobVariable.isAdvanced());
updatedHidden.put(jobVariable.getName(), jobVariable.isHidden());
}
data.setUpdatedVariables(updatedVariables);
data.setUpdatedModels(updatedModels);
data.setUpdatedDescriptions(updatedDescriptions);
data.setUpdatedGroups(updatedGroups);
data.setUpdatedAdvanced(updatedAdvanced);
data.setUpdatedHidden(updatedHidden);
data.setValid(true);
}
private static void validateJob(TaskFlowJob job, JobValidationData data) {
ArrayList<Task> tasks = job.getTasks();
if (tasks.isEmpty()) {
......
......@@ -50,6 +50,7 @@ import org.ow2.proactive.scheduler.common.SchedulerEventListener;
import org.ow2.proactive.scheduler.common.SortSpecifierContainer;
import org.ow2.proactive.scheduler.common.TaskDescriptor;
import org.ow2.proactive.scheduler.common.exception.JobCreationException;
import org.ow2.proactive.scheduler.common.exception.JobValidationException;
import org.ow2.proactive.scheduler.common.exception.NotConnectedException;
import org.ow2.proactive.scheduler.common.exception.PermissionException;
import org.ow2.proactive.scheduler.common.exception.SchedulerException;
......@@ -61,6 +62,7 @@ import org.ow2.proactive.scheduler.common.job.JobId;
import org.ow2.proactive.scheduler.common.job.JobInfo;
import org.ow2.proactive.scheduler.common.job.JobResult;
import org.ow2.proactive.scheduler.common.job.JobState;
import org.ow2.proactive.scheduler.common.job.JobVariable;
import org.ow2.proactive.scheduler.common.job.TaskFlowJob;
import org.ow2.proactive.scheduler.common.task.Task;
import org.ow2.proactive.scheduler.common.task.TaskId;
......@@ -816,9 +818,17 @@ public class RestSmartProxyImpl extends AbstractSmartProxy<RestJobTrackerImpl>
}
@Override
public Set<String> addJobSignal(String jobId, String signal)
throws UnknownJobException, NotConnectedException, PermissionException, SignalApiException {
return (_getScheduler()).addJobSignal(jobId, signal);
public Set<String> addJobSignal(String jobId, String signal, Map<String, String> updatedVariables)
throws UnknownJobException, NotConnectedException, PermissionException, SignalApiException,
JobValidationException {
return (_getScheduler()).addJobSignal(jobId, signal, updatedVariables);
}
@Override
public List<JobVariable> validateJobSignal(String jobId, String signal, Map<String, String> updatedVariables)
throws UnknownJobException, NotConnectedException, PermissionException, SignalApiException,
JobValidationException {
return (_getScheduler()).validateJobSignal(jobId, signal, updatedVariables);
}
}
......@@ -37,6 +37,7 @@ import org.ow2.proactive.authentication.UserData;
import org.ow2.proactive.db.SortParameter;
import org.ow2.proactive.scheduler.common.exception.JobAlreadyFinishedException;
import org.ow2.proactive.scheduler.common.exception.JobCreationException;
import org.ow2.proactive.scheduler.common.exception.JobValidationException;
import org.ow2.proactive.scheduler.common.exception.NotConnectedException;
import org.ow2.proactive.scheduler.common.exception.PermissionException;
import org.ow2.proactive.scheduler.common.exception.SchedulerException;
......@@ -49,6 +50,7 @@ import org.ow2.proactive.scheduler.common.job.JobInfo;
import org.ow2.proactive.scheduler.common.job.JobPriority;
import org.ow2.proactive.scheduler.common.job.JobResult;
import org.ow2.proactive.scheduler.common.job.JobState;
import org.ow2.proactive.scheduler.common.job.JobVariable;
import org.ow2.proactive.scheduler.common.task.TaskId;
import org.ow2.proactive.scheduler.common.task.TaskResult;
import org.ow2.proactive.scheduler.common.task.TaskState;
......@@ -105,6 +107,7 @@ public interface Scheduler extends SchedulerUsage, ThirdPartyCredentials {
*
* @param jobId id of the job
* @param signal signal name
* @param updatedVariables the updated variables of the signal
* @return the set of job signals including the added signal
* @throws NotConnectedException
* if you are not authenticated.
......@@ -114,9 +117,34 @@ public interface Scheduler extends SchedulerUsage, ThirdPartyCredentials {
* if you can't access to this particular job.
* @throws SignalApiException
* errors related to the signal api
* @throws SignalApiException
* if the given updated variables are not compatible with the job input values
*/
Set<String> addJobSignal(String jobId, String signal, Map<String, String> updatedVariables)
throws NotConnectedException, UnknownJobException, PermissionException, SignalApiException,
JobValidationException;
/**
* Validate the given signal's updated variables
*
* @param jobId id of the job
* @param signal signal name
* @param updatedVariables the updated variables of the signal
* @return the list of signal input values
* @throws NotConnectedException
* if you are not authenticated.
* @throws UnknownJobException
* if the job does not exist.
* @throws PermissionException
* if you can't access to this particular job.
* @throws SignalApiException
* errors related to the signal api
* @throws SignalApiException
* if the given updated variables are not compatible with the job input values
*/
Set<String> addJobSignal(String jobId, String signal)
throws NotConnectedException, UnknownJobException, PermissionException, SignalApiException;
List<JobVariable> validateJobSignal(String jobId, String signal, Map<String, String> updatedVariables)
throws NotConnectedException, UnknownJobException, PermissionException, SignalApiException,
JobValidationException;
/**
* Returns the USER DataSpace URIs associated with the current user
......
......@@ -276,6 +276,17 @@ public interface JobInfo extends Serializable {
*/
void setSignals(Set<String> signals);
/**
* Returns the map of signals and input variables
* @return signals map
*/
Map<String, Map<String, List<JobVariable>>> getDetailedSignals();
/**
* Sets the map of job signals and input variables
*/
void setDetailedSignals(Map<String, Map<String, List<JobVariable>>> detailedSignals);
/**
* Return the list of tasks names with precious results
* @return list of task names
......
......@@ -67,6 +67,10 @@ public class JobVariable implements Serializable {
//Empty constructor
}
public JobVariable(String name) {
this(name, null, null);
}
public JobVariable(String name, String value) {
this(name, value, null);
}
......