Commit dde94b42 authored by Fabien Viale's avatar Fabien Viale
Browse files

Fix in-error time incorrect values

InError time needs to be reset after a job or task is no longer in-error

Added corresponding checks in existing functional tests
parent f09f566b
......@@ -733,6 +733,7 @@ class LiveJobs {
} else {
//remove the parent tasks results if task finished with no error
task.removeParentTasksResults();
task.setInErrorTime(-1);
}
terminateTask(jobData, task, errorOccurred, result, terminationData);
......@@ -777,6 +778,9 @@ class LiveJobs {
job.setStatus(JobStatus.IN_ERROR);
} else if (job.getStatus().equals(JobStatus.IN_ERROR) && job.getNumberOfInErrorTasks() == 0) {
job.setStatus(JobStatus.RUNNING);
job.setInErrorTime(-1);
} else if (job.getNumberOfInErrorTasks() == 0) {
job.setInErrorTime(-1);
}
}
......@@ -822,6 +826,13 @@ class LiveJobs {
terminationData.addJobToTerminate(job.getId(), job.getGenericInformation(), job.getCredentials());
}
task.setInErrorTime(-1);
boolean jobUpdated = false;
if (job.getNumberOfInErrorTasks() == 0) {
job.setInErrorTime(-1);
jobUpdated = true;
}
// Update database
if (taskResult.getAction() != null) {
dbManager.updateAfterWorkflowTaskFinished(job, changesInfo, taskResult);
......@@ -844,6 +855,11 @@ class LiveJobs {
new JobInfoImpl((JobInfoImpl) job.getJobInfo())));
listener.jobUpdatedFullData(job);
} else if (jobUpdated) {
listener.jobStateUpdated(job.getOwner(),
new NotificationData<JobInfo>(SchedulerEvent.JOB_UPDATED,
new JobInfoImpl((JobInfoImpl) job.getJobInfo())));
listener.jobUpdatedFullData(job);
}
return terminationData;
......
......@@ -108,13 +108,13 @@ import com.google.common.collect.Lists;
@NamedQuery(name = "updateJobDataSetJobToBeRemoved", query = "update JobData set toBeRemoved = :toBeRemoved, lastUpdatedTime = :lastUpdatedTime where id = :jobId"),
@NamedQuery(name = "updateJobDataPriority", query = "update JobData set priority = :priority, lastUpdatedTime = :lastUpdatedTime where id = :jobId"),
@NamedQuery(name = "updateJobDataAfterTaskFinished", query = "update JobData set status = :status, " +
"finishedTime = :finishedTime, numberOfPendingTasks = :numberOfPendingTasks, " +
"finishedTime = :finishedTime, inErrorTime= :inErrorTime, numberOfPendingTasks = :numberOfPendingTasks, " +
"numberOfFinishedTasks = :numberOfFinishedTasks, " +
"numberOfRunningTasks = :numberOfRunningTasks, " +
"numberOfFailedTasks = :numberOfFailedTasks, numberOfFaultyTasks = :numberOfFaultyTasks, " +
"numberOfInErrorTasks = :numberOfInErrorTasks, lastUpdatedTime = :lastUpdatedTime, resultMap = :resultMap, preciousTasks = :preciousTasks where id = :jobId"),
@NamedQuery(name = "updateJobDataAfterWorkflowTaskFinished", query = "update JobData set status = :status, " +
"finishedTime = :finishedTime, numberOfPendingTasks = :numberOfPendingTasks, " +
"finishedTime = :finishedTime, inErrorTime = :inErrorTime, numberOfPendingTasks = :numberOfPendingTasks, " +
"numberOfFinishedTasks = :numberOfFinishedTasks, " +
"numberOfRunningTasks = :numberOfRunningTasks, totalNumberOfTasks =:totalNumberOfTasks, " +
"numberOfFailedTasks = :numberOfFailedTasks, numberOfFaultyTasks = :numberOfFaultyTasks, " +
......
......@@ -1222,6 +1222,7 @@ public class SchedulerDBManager {
session.getNamedQuery("updateJobDataAfterWorkflowTaskFinished")
.setParameter("status", jobInfo.getStatus())
.setParameter("finishedTime", jobInfo.getFinishedTime())
.setParameter("inErrorTime", jobInfo.getInErrorTime())
.setParameter("numberOfPendingTasks", jobInfo.getNumberOfPendingTasks())
.setParameter("numberOfFinishedTasks", jobInfo.getNumberOfFinishedTasks())
.setParameter("numberOfRunningTasks", jobInfo.getNumberOfRunningTasks())
......@@ -1309,6 +1310,7 @@ public class SchedulerDBManager {
updateJob = session.getNamedQuery("updateJobDataAfterTaskFinished")
.setParameter("status", jobInfo.getStatus())
.setParameter("finishedTime", jobInfo.getFinishedTime())
.setParameter("inErrorTime", jobInfo.getInErrorTime())
.setParameter("numberOfPendingTasks", jobInfo.getNumberOfPendingTasks())
.setParameter("numberOfFinishedTasks", jobInfo.getNumberOfFinishedTasks())
.setParameter("numberOfRunningTasks", jobInfo.getNumberOfRunningTasks())
......@@ -1395,6 +1397,7 @@ public class SchedulerDBManager {
int result = session.getNamedQuery("updateJobDataAfterTaskFinished")
.setParameter("status", jobInfo.getStatus())
.setParameter("finishedTime", jobInfo.getFinishedTime())
.setParameter("inErrorTime", jobInfo.getInErrorTime())
.setParameter("numberOfPendingTasks", jobInfo.getNumberOfPendingTasks())
.setParameter("numberOfFinishedTasks", jobInfo.getNumberOfFinishedTasks())
.setParameter("numberOfRunningTasks", jobInfo.getNumberOfRunningTasks())
......@@ -1673,6 +1676,7 @@ public class SchedulerDBManager {
session.getNamedQuery("updateJobDataAfterTaskFinished")
.setParameter("status", jobInfo.getStatus())
.setParameter("finishedTime", jobInfo.getFinishedTime())
.setParameter("inErrorTime", jobInfo.getInErrorTime())
.setParameter("numberOfPendingTasks", jobInfo.getNumberOfPendingTasks())
.setParameter("numberOfFinishedTasks", jobInfo.getNumberOfFinishedTasks())
.setParameter("numberOfRunningTasks", jobInfo.getNumberOfRunningTasks())
......
......@@ -150,25 +150,31 @@ public class TestErrorAndFailure extends SchedulerFunctionalTestNoRestart {
@Test
public void testPauseJobErrorPolicyRestartAllInErrorTasks() throws Throwable {
testRestartInError(false, false);
testRestartInError(false, false, false);
}
@Test
public void testPauseJobErrorPolicyRestartInErrorTask() throws Throwable {
testRestartInError(true, false);
testRestartInError(true, false, false);
}
@Test
public void testPauseTaskErrorPolicyRestartAllInErrorTasks() throws Throwable {
testRestartInError(false, true);
testRestartInError(false, true, false);
}
@Test
public void testPauseTaskErrorPolicyRestartInErrorTask() throws Throwable {
testRestartInError(true, true);
testRestartInError(true, true, false);
}
private void testRestartInError(boolean restartSingleTask, boolean pauseTaskPolicy) throws Throwable {
@Test
public void testPauseTaskErrorMarkAsFinishedTask() throws Throwable {
testRestartInError(true, true, true);
}
private void testRestartInError(boolean restartSingleTask, boolean pauseTaskPolicy, boolean markAsFinished)
throws Throwable {
TaskFlowJob submittedJob = new TaskFlowJob();
submittedJob.setName(this.getClass().getSimpleName() +
(pauseTaskPolicy ? "_pauseTaskPolicy" : "_pauseJobPolicy"));
......@@ -203,6 +209,7 @@ public class TestErrorAndFailure extends SchedulerFunctionalTestNoRestart {
JobState jobState = schedulerHelper.getSchedulerInterface().getJobState(id);
Assert.assertEquals(pauseTaskPolicy ? JobStatus.IN_ERROR : JobStatus.PAUSED, jobState.getStatus());
Assert.assertEquals(1, jobState.getNumberOfInErrorTasks());
Assert.assertNotEquals(-1, jobState.getInErrorTime());
List<TaskResult> taskResults = schedulerHelper.getSchedulerInterface()
.getTaskResultAllIncarnations(jInfo.getJobId(), "errorTask");
Assert.assertNotNull(taskResults);
......@@ -211,7 +218,9 @@ public class TestErrorAndFailure extends SchedulerFunctionalTestNoRestart {
Assert.assertNotNull(taskLogs);
Assert.assertTrue(taskLogs.contains("some logs"));
FileUtils.writeStringToFile(groovyScriptFile, ok_groovy_script, StandardCharsets.UTF_8.toString());
if (restartSingleTask) {
if (markAsFinished) {
schedulerHelper.getSchedulerInterface().finishInErrorTask(id.toString(), errorTaskName);
} else if (restartSingleTask) {
schedulerHelper.getSchedulerInterface().restartInErrorTask(id.toString(), errorTaskName);
} else {
schedulerHelper.restartAllInErrorTasks(id.toString());
......@@ -219,7 +228,15 @@ public class TestErrorAndFailure extends SchedulerFunctionalTestNoRestart {
if (!pauseTaskPolicy) {
schedulerHelper.getSchedulerInterface().resumeJob(id);
}
schedulerHelper.waitForEventTaskFinished(id, errorTaskName);
if (markAsFinished) {
schedulerHelper.waitForEventTaskInErrorToFinished(id, errorTaskName);
schedulerHelper.waitForEventJobUpdated(id);
} else {
schedulerHelper.waitForEventTaskFinished(id, errorTaskName);
}
jobState = schedulerHelper.getSchedulerInterface().getJobState(id);
Assert.assertEquals(-1, jobState.getInErrorTime());
schedulerHelper.waitForEventTaskRunning(id, "okTask");
JobInfo jobInfo = schedulerHelper.getSchedulerInterface().getJobInfo(id.toString());
Assert.assertTrue(jobInfo.getStatus() == JobStatus.RUNNING || jobInfo.getStatus() == JobStatus.STALLED);
......@@ -232,6 +249,7 @@ public class TestErrorAndFailure extends SchedulerFunctionalTestNoRestart {
Assert.assertEquals(0, jobState.getNumberOfInErrorTasks());
Assert.assertEquals(0, jobState.getNumberOfRunningTasks());
Assert.assertEquals(2, jobState.getNumberOfFinishedTasks());
Assert.assertEquals(-1, jobState.getInErrorTime());
groovyScriptFile.delete();
}
......
......@@ -874,6 +874,39 @@ public class SchedulerTHelper {
return getSchedulerMonitorsHandler().waitForEventJob(SchedulerEvent.JOB_REMOVE_FINISHED, id, timeout);
}
/**
* Wait for a job updated event.
* If corresponding event has been already thrown by scheduler, returns immediately
* with JobInfo object associated to event, otherwise wait for event reception.
*
* @param id job identifier, for which event is waited for.
* @return JobInfo event's associated object.
*/
public JobInfo waitForEventJobUpdated(JobId id) {
try {
return waitForEventJobUpdated(id, 0);
} catch (ProActiveTimeoutException e) {
//unreachable block, 0 means infinite, no timeout
//log sthing ?
return null;
}
}
/**
* Wait for a job updated event.
* If corresponding event has been already thrown by scheduler, returns immediately
* with JobInfo object associated to event, otherwise wait for reception
* of the corresponding event.
*
* @param id job identifier, for which event is waited for.
* @param timeout max waiting time in milliseconds.
* @return JobInfo event's associated object.
* @throws ProActiveTimeoutException if timeout is reached.
*/
public JobInfo waitForEventJobUpdated(JobId id, long timeout) {
return getSchedulerMonitorsHandler().waitForEventJob(SchedulerEvent.JOB_UPDATED, id, timeout);
}
/**
* Wait for a task passing from pending to running.
* If corresponding event has been already thrown by scheduler, returns immediately
......@@ -972,6 +1005,26 @@ public class SchedulerTHelper {
}
}
/**
* Wait for a in-error task to terminate after a call to markAsFinishedAndResume.
* If corresponding event has been already thrown by scheduler, returns immediately
* with TaskInfo object associated to event, otherwise wait for reception
* of the corresponding event.
*
* @param jobId job identifier, for which task belongs.
* @param taskName for which event is waited for.
* @return TaskInfo event's associated object.
*/
public TaskInfo waitForEventTaskInErrorToFinished(JobId jobId, String taskName) {
try {
return waitForEventTaskInErrorToFinished(jobId, taskName, 0);
} catch (ProActiveTimeoutException e) {
//unreachable block, 0 means infinite, no timeout
//log sthing ?
return null;
}
}
/**
* Wait for a task failed that reaches in-error state.
* If corresponding event has been already thrown by scheduler, returns immediately
......@@ -988,6 +1041,25 @@ public class SchedulerTHelper {
return getSchedulerMonitorsHandler().waitForEventTask(SchedulerEvent.TASK_IN_ERROR, jobId, taskName, timeout);
}
/**
* Wait for a in-error task to terminate after a call to markAsFinishedAndResume.
* If corresponding event has been already thrown by scheduler, returns immediately
* with TaskInfo object associated to event, otherwise wait for reception
* of the corresponding event.
*
* @param jobId job identifier, for which task belongs.
* @param taskName for which event is waited for.
* @param timeout max waiting time in milliseconds.
* @return TaskInfo event's associated object.
* @throws ProActiveTimeoutException if timeout is reached.
*/
public TaskInfo waitForEventTaskInErrorToFinished(JobId jobId, String taskName, long timeout) {
return getSchedulerMonitorsHandler().waitForEventTask(SchedulerEvent.TASK_IN_ERROR_TO_FINISHED,
jobId,
taskName,
timeout);
}
/**
* Wait for a task passing from running to finished.
* If corresponding event has been already thrown by scheduler, returns immediately
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment