Commit 24f1cb3f authored by Fabien Viale's avatar Fabien Viale
Browse files

Generic Info REMOVE_DELAY to remove a job after a configurable time

- Add functional test for housekeeping

- Also, fix issue with ServerJobAndTaskLogs while removing precious logs (access to SchedulerSpacesSupport from a non-active object context).
Fix minor issues appearing in the code
parent 4acab84c
......@@ -41,8 +41,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.node.Node;
import org.objectweb.proactive.core.node.NodeFactory;
import org.objectweb.proactive.core.node.NodeImpl;
import org.objectweb.proactive.extensions.dataspaces.api.PADataSpaces;
import org.objectweb.proactive.extensions.dataspaces.core.BaseScratchSpaceConfiguration;
......@@ -426,7 +424,7 @@ public class DataSpaceServiceStarter implements Serializable {
throw new IllegalStateException("DataSpace service is not started");
}
try {
DataSpacesNodes.closeNodeConfig(NodeFactory.getDefaultNode());
DataSpacesNodes.closeNodeConfig(schedulerNode);
namingServiceDeployer.terminate();
} catch (Throwable t) {
}
......
......@@ -764,7 +764,7 @@ class LiveJobs {
job.terminate();
jlogger.debug(job.getId(), "terminated");
jobs.remove(job.getId());
terminationData.addJobToTerminate(job.getId());
terminationData.addJobToTerminate(job.getId(), job.getGenericInformation());
}
// Update database
......@@ -971,7 +971,7 @@ class LiveJobs {
// terminating job
job.terminate();
jlogger.debug(job.getId(), "terminated");
terminationData.addJobToTerminate(job.getId());
terminationData.addJobToTerminate(job.getId(), job.getGenericInformation());
jobs.remove(job.getId());
}
......@@ -1044,7 +1044,7 @@ class LiveJobs {
JobId jobId = jobData.job.getId();
jobs.remove(jobId);
terminationData.addJobToTerminate(jobId);
terminationData.addJobToTerminate(jobId, jobData.job.getGenericInformation());
InternalJob job = jobData.job;
......@@ -1103,7 +1103,7 @@ class LiveJobs {
JobId jobId = jobData.job.getId();
jobs.remove(jobId);
terminationData.addJobToTerminate(jobId);
terminationData.addJobToTerminate(jobId, jobData.job.getGenericInformation());
InternalJob job = jobData.job;
......
......@@ -77,6 +77,7 @@ import javax.security.auth.Subject;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.EndActive;
import org.objectweb.proactive.InitActive;
import org.objectweb.proactive.RunActive;
import org.objectweb.proactive.Service;
......@@ -178,7 +179,7 @@ import org.ow2.proactive.utils.Tools;
* @since ProActive Scheduling 0.9
*/
@ActiveObject
public class SchedulerFrontend implements InitActive, Scheduler, RunActive, SchedulerSpaceInterface {
public class SchedulerFrontend implements InitActive, Scheduler, RunActive, EndActive, SchedulerSpaceInterface {
/**
* Delay to wait for between getting a job result and removing the job
......@@ -1710,4 +1711,9 @@ public class SchedulerFrontend implements InitActive, Scheduler, RunActive, Sche
}
}
@Override
public void endActivity(Body body) {
ServerJobAndTaskLogs.terminateActiveInstance();
PAActiveObject.terminateActiveObject(authentication, true);
}
}
......@@ -107,38 +107,40 @@ public class SchedulerSpacesSupport {
*/
public void registerUserSpace(String username) {
if (this.userGlobalSpaces.get(username) == null) {
DataSpacesFileObject userSpace;
String userSpaceName = SchedulerConstants.USERSPACE_NAME + "_" + username;
if (!PASchedulerProperties.DATASPACE_DEFAULTUSER_URL.isSet()) {
logger.warn("URL of the root USER space is not set, cannot create a USER space for " + username);
return;
}
String localpath = PASchedulerProperties.DATASPACE_DEFAULTUSER_LOCALPATH.getValueAsStringOrNull();
String hostname = PASchedulerProperties.DATASPACE_DEFAULTUSER_HOSTNAME.getValueAsStringOrNull();
try {
DataSpaceServiceStarter.getDataSpaceServiceStarter()
.createSpaceWithUserNameSubfolder(username,
SchedulerConstants.SCHEDULER_DATASPACE_APPLICATION_ID,
userSpaceName,
PASchedulerProperties.DATASPACE_DEFAULTUSER_URL.getValueAsString(),
localpath,
hostname,
false,
true);
// immediately retrieve the User Space
userSpace = PADataSpaces.resolveOutput(userSpaceName);
logger.info("USER space for user " + username + " is at " + userSpace.getAllRealURIs());
} catch (Exception e) {
logger.warn("", e);
return;
synchronized (this) {
DataSpacesFileObject userSpace;
String userSpaceName = SchedulerConstants.USERSPACE_NAME + "_" + username;
if (!PASchedulerProperties.DATASPACE_DEFAULTUSER_URL.isSet()) {
logger.warn("URL of the root USER space is not set, cannot create a USER space for " + username);
return;
}
String localpath = PASchedulerProperties.DATASPACE_DEFAULTUSER_LOCALPATH.getValueAsStringOrNull();
String hostname = PASchedulerProperties.DATASPACE_DEFAULTUSER_HOSTNAME.getValueAsStringOrNull();
try {
DataSpaceServiceStarter.getDataSpaceServiceStarter()
.createSpaceWithUserNameSubfolder(username,
SchedulerConstants.SCHEDULER_DATASPACE_APPLICATION_ID,
userSpaceName,
PASchedulerProperties.DATASPACE_DEFAULTUSER_URL.getValueAsString(),
localpath,
hostname,
false,
true);
// immediately retrieve the User Space
userSpace = PADataSpaces.resolveOutput(userSpaceName);
logger.info("USER space for user " + username + " is at " + userSpace.getAllRealURIs());
// register the user GlobalSpace to the frontend state
this.userGlobalSpaces.put(username, userSpace);
} catch (Exception e) {
logger.error("", e);
return;
}
}
// register the user GlobalSpace to the frontend state
this.userGlobalSpaces.put(username, userSpace);
}
}
......
......@@ -30,6 +30,7 @@ import java.security.KeyException;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
......@@ -63,7 +64,6 @@ import org.ow2.proactive.scheduler.descriptor.EligibleTaskDescriptor;
import org.ow2.proactive.scheduler.job.InternalJob;
import org.ow2.proactive.scheduler.job.JobInfoImpl;
import org.ow2.proactive.scheduler.policy.Policy;
import org.ow2.proactive.scheduler.synchronization.AOSynchronization;
import org.ow2.proactive.scheduler.synchronization.SynchronizationInternal;
import org.ow2.proactive.scheduler.task.TaskInfoImpl;
import org.ow2.proactive.scheduler.task.TaskLauncher;
......@@ -73,6 +73,7 @@ import org.ow2.proactive.scheduler.util.JobLogger;
import org.ow2.proactive.scheduler.util.ServerJobAndTaskLogs;
import org.ow2.proactive.scheduler.util.TaskLogger;
import org.ow2.proactive.utils.NodeSet;
import org.ow2.proactive.utils.Tools;
import it.sauronsoftware.cron4j.Scheduler;
......@@ -91,6 +92,8 @@ public class SchedulingService {
static final long SCHEDULER_REMOVED_JOB_DELAY = PASchedulerProperties.SCHEDULER_REMOVED_JOB_DELAY.getValueAsInt() *
1000;
static final String GENERIC_INFO_REMOVE_DELAY = "REMOVE_DELAY";
public static final String SCHEDULING_SERVICE_RECOVER_TASKS_STATE_FINISHED = "SchedulingService::recoverTasksState finished";
public static final int SCHEDULER_KILL_DELAY = PASchedulerProperties.SCHEDULER_KILL_DELAY.getValueAsInt();
......@@ -572,6 +575,7 @@ public class SchedulingService {
boolean shouldRemoveFromDb = PASchedulerProperties.JOB_REMOVE_FROM_DB.getValueAsBoolean();
if (tempJobs.size() == 1) {
logger.info("Job " + jobId + " will be removed at " + new Date(at));
infrastructure.getDBManager().scheduleJobForRemoval(tempJobs.get(0).getJobInfo().getJobId(),
at,
shouldRemoveFromDb);
......@@ -949,14 +953,27 @@ public class SchedulingService {
}
}
void terminateJobHandling(final JobId jobId) {
void terminateJobHandling(final JobId jobId, final Map<String, String> jobGenericInfo) {
try {
listenJobLogsSupport.cleanLoggers(jobId);
jlogger.close(jobId);
// auto remove
long removeDelay = Long.MAX_VALUE;
if (SchedulingService.SCHEDULER_AUTO_REMOVED_JOB_DELAY > 0) {
long timeToRemove = System.currentTimeMillis() + SchedulingService.SCHEDULER_AUTO_REMOVED_JOB_DELAY;
removeDelay = SchedulingService.SCHEDULER_AUTO_REMOVED_JOB_DELAY;
}
if (jobGenericInfo != null && jobGenericInfo.containsKey(GENERIC_INFO_REMOVE_DELAY)) {
try {
removeDelay = Tools.formatDate(jobGenericInfo.get(GENERIC_INFO_REMOVE_DELAY));
} catch (Exception e) {
logger.error("Error when parsing generic information " + GENERIC_INFO_REMOVE_DELAY + " for job " +
jobId, e);
}
}
// auto remove
if (removeDelay < Long.MAX_VALUE) {
long timeToRemove = System.currentTimeMillis() + removeDelay;
scheduleJobRemove(jobId, timeToRemove);
}
} catch (Throwable t) {
......@@ -981,22 +998,36 @@ public class SchedulingService {
if (SCHEDULER_REMOVED_JOB_DELAY > 0 || SCHEDULER_AUTO_REMOVED_JOB_DELAY > 0) {
logger.debug("Removing non-managed jobs");
// Note : by default, no finished jobs are recovered from the database, the following code
// will not be executed
for (InternalJob job : recoveredState.getFinishedJobs()) {
//re-set job removed delay (if job result has been sent to user)
long toWait = 0;
long toWait = Long.MAX_VALUE;
if (job.isToBeRemoved()) {
toWait = SCHEDULER_REMOVED_JOB_DELAY *
SCHEDULER_AUTO_REMOVED_JOB_DELAY == 0 ? SCHEDULER_REMOVED_JOB_DELAY +
SCHEDULER_AUTO_REMOVED_JOB_DELAY
SCHEDULER_AUTO_REMOVED_JOB_DELAY == 0 ? Long.MAX_VALUE
: Math.min(SCHEDULER_REMOVED_JOB_DELAY,
SCHEDULER_AUTO_REMOVED_JOB_DELAY);
} else {
toWait = SCHEDULER_AUTO_REMOVED_JOB_DELAY;
if (SCHEDULER_AUTO_REMOVED_JOB_DELAY > 0) {
toWait = SCHEDULER_AUTO_REMOVED_JOB_DELAY;
}
if (job.getGenericInformation() != null &&
job.getGenericInformation().containsKey(GENERIC_INFO_REMOVE_DELAY)) {
try {
toWait = Tools.formatDate(job.getGenericInformation().get(GENERIC_INFO_REMOVE_DELAY));
} catch (Exception e) {
logger.error("Error when parsing generic information " + GENERIC_INFO_REMOVE_DELAY +
" for job " + job.getId(), e);
}
}
}
if (toWait > 0) {
scheduleJobRemove(job.getId(), System.currentTimeMillis() + toWait);
jlogger.debug(job.getId(), "will be removed in " + (SCHEDULER_REMOVED_JOB_DELAY / 1000) + "sec");
if (toWait < Long.MAX_VALUE) {
long removalDate = System.currentTimeMillis() + toWait;
if (removalDate < job.getScheduledTimeForRemoval()) {
scheduleJobRemove(job.getId(), removalDate);
jlogger.debug(job.getId(), "will be removed at " + new Date(removalDate));
}
}
}
}
......@@ -1132,7 +1163,7 @@ public class SchedulingService {
getListener().jobStateUpdated(owner,
new NotificationData<>(SchedulerEvent.JOB_REMOVE_FINISHED,
new JobInfoImpl(jobId, owner)));
ServerJobAndTaskLogs.getInstance().remove(jobId, owner);
ServerJobAndTaskLogs.getActiveInstance().remove(jobId, owner);
logger.debug("HOUSEKEEPING sent JOB_REMOVE_FINISHED notification for job " + jobId);
}
......
......@@ -116,6 +116,8 @@ final class TerminationData {
private final Set<JobId> jobsToTerminate;
private final Map<JobId, Map<String, String>> jobsToTerminateGenericInformation;
private final Map<TaskIdWrapper, TaskTerminationData> tasksToTerminate;
private final Map<TaskIdWrapper, TaskRestartData> tasksToRestart;
......@@ -123,23 +125,27 @@ final class TerminationData {
private final InternalTaskParentFinder internalTaskParentFinder;
static final TerminationData EMPTY = new TerminationData(Collections.emptySet(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());
static TerminationData newTerminationData() {
return new TerminationData(new HashSet<>(), new HashMap<>(), new HashMap<>());
return new TerminationData(new HashSet<>(), new HashMap<>(), new HashMap<>(), new HashMap<>());
}
private TerminationData(Set<JobId> jobsToTerminate, Map<TaskIdWrapper, TaskTerminationData> tasksToTerminate,
Map<TaskIdWrapper, TaskRestartData> tasksToRestart) {
Map<TaskIdWrapper, TaskRestartData> tasksToRestart,
Map<JobId, Map<String, String>> jobsToTerminateGenericInformation) {
this.jobsToTerminate = jobsToTerminate;
this.tasksToTerminate = tasksToTerminate;
this.jobsToTerminateGenericInformation = jobsToTerminateGenericInformation;
this.tasksToRestart = tasksToRestart;
this.internalTaskParentFinder = InternalTaskParentFinder.getInstance();
}
void addJobToTerminate(JobId jobId) {
void addJobToTerminate(JobId jobId, Map<String, String> jobGenericInfo) {
jobsToTerminate.add(jobId);
jobsToTerminateGenericInformation.put(jobId, jobGenericInfo);
}
void addTaskData(InternalJob jobData, RunningTaskData taskData, TerminationStatus terminationStatus,
......@@ -180,7 +186,7 @@ final class TerminationData {
private void terminateJobs(final SchedulingService service) {
for (JobId jobId : jobsToTerminate) {
service.terminateJobHandling(jobId);
service.terminateJobHandling(jobId, jobsToTerminateGenericInformation.get(jobId));
}
}
......
......@@ -40,6 +40,7 @@ import java.util.Set;
import org.apache.commons.lang3.Validate;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.extensions.dataspaces.api.DataSpacesFileObject;
import org.objectweb.proactive.extensions.dataspaces.exceptions.FileSystemException;
import org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector;
......@@ -66,10 +67,23 @@ public class ServerJobAndTaskLogs {
private SchedulerSpacesSupport spacesSupport = null;
private static ServerJobAndTaskLogs activeInstance = null;
public static ServerJobAndTaskLogs getInstance() {
return LazyHolder.INSTANCE;
}
public static ServerJobAndTaskLogs getActiveInstance() {
if (activeInstance == null) {
try {
activeInstance = PAActiveObject.turnActive(getInstance());
} catch (Exception e) {
logger.error("Could not create ServerJobAndTaskLogs instance", e);
}
}
return activeInstance;
}
static final JobLogger jlogger = JobLogger.getInstance();
public void configure() {
......@@ -89,6 +103,12 @@ public class ServerJobAndTaskLogs {
this.spacesSupport = spacesSupport;
}
public static void terminateActiveInstance() {
if (activeInstance != null) {
PAActiveObject.terminateActiveObject(activeInstance, true);
}
}
public String getTaskLog(TaskId id) {
String result = readLog(TaskLogger.getTaskLogRelativePath(id));
return result != null ? result : "Cannot retrieve logs for task " + id;
......
......@@ -35,6 +35,7 @@ import functionaltests.dataspaces.TestDataspaceConcurrentTransfer;
import functionaltests.db.schedulerdb.SchedulerDbManagerConcurrencyTest;
import functionaltests.db.schedulerdb.TestJobRemove;
import functionaltests.db.schedulerdb.TestMultipleTasks;
import functionaltests.housekeeping.TestHouseKeeping;
import functionaltests.job.taskkill.TestJobKilled;
import functionaltests.job.taskkill.TestProcessTreeKiller;
import functionaltests.job.taskkill.TestProcessTreeKillerNonForked;
......@@ -87,7 +88,7 @@ import functionaltests.workflow.complex.TestWorkflowReplicateJobs3;
TestDataspaceConcurrentTransfer.class, TestDataspaceConcurrentKilling.class,
RunningTaskRecoveryWithDownNodeTest.class, RunningTaskRecoveryWithForkedTaskExecutorTest.class,
RunningTaskRecoveryWithInProcessTaskExecutorTest.class,
RunningTaskRecoveryWhenNodesAreReservedInBatchTest.class })
RunningTaskRecoveryWhenNodesAreReservedInBatchTest.class, TestHouseKeeping.class })
/**
* @author ActiveEon Team
......
/*
* ProActive Parallel Suite(TM):
* The Open Source library for parallel and distributed
* Workflows & Scheduling, Orchestration, Cloud Automation
* and Big Data Analysis on Enterprise Grids & Clouds.
*
* Copyright (c) 2007 - 2017 ActiveEon
* Contact: contact@activeeon.com
*
* This library is free software: you can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License
* as published by the Free Software Foundation: version 3 of
* the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* If needed, contact us to obtain a release under GPL Version 2 or 3
* or a different license than the AGPL.
*/
package functionaltests.housekeeping;
import java.io.File;
import java.net.URL;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.ow2.proactive.scheduler.common.job.JobId;
import org.ow2.proactive.scheduler.common.job.JobResult;
import functionaltests.utils.SchedulerFunctionalTestWithCustomConfigAndRestart;
import functionaltests.utils.SchedulerTHelper;
/**
* @author ActiveEon Team
* @since 20/06/2020
*/
public class TestHouseKeeping extends SchedulerFunctionalTestWithCustomConfigAndRestart {
private static URL simpleJob = TestHouseKeeping.class.getResource("/functionaltests/descriptors/Job_houseKeeping.xml");
private static URL jobWithRemoveTimeGI = TestHouseKeeping.class.getResource("/functionaltests/descriptors/Job_houseKeeping_With_GI.xml");
@BeforeClass
public static void startDedicatedScheduler() throws Exception {
schedulerHelper.log("Start Scheduler in non-fork mode.");
schedulerHelper = new SchedulerTHelper(false,
new File(TestHouseKeeping.class.getResource("/functionaltests/config/functionalTSchedulerProperties-houseKeeping.ini")
.toURI()).getAbsolutePath());
}
@Test(timeout = 900000)
public void testHouseKeeping() throws Exception {
JobId jobWithRemoveTimeGIJobId = schedulerHelper.submitJob(new File(jobWithRemoveTimeGI.toURI()).getAbsolutePath());
JobId simpleJobJobId = schedulerHelper.submitJob(new File(simpleJob.toURI()).getAbsolutePath());
schedulerHelper.waitForEventJobFinished(jobWithRemoveTimeGIJobId);
schedulerHelper.waitForEventJobFinished(simpleJobJobId);
// Wait at most two minutes before the first job is removed (configured time = 1 minute + cron loop= every minute)
schedulerHelper.waitForEventJobRemoved(jobWithRemoveTimeGIJobId, (60 + 70) * 1000);
JobResult notYetRemovedJobResult = schedulerHelper.getJobResult(simpleJobJobId);
Assert.assertNotNull("Job " + simpleJobJobId + " should not be removed yet", notYetRemovedJobResult);
// Wait another two minutes before the second job is removed (configured time = 3 minutes - previous wait + cron loop = every minute)
schedulerHelper.waitForEventJobRemoved(simpleJobJobId, (60 + 70) * 1000);
}
}
......@@ -105,7 +105,7 @@ public class TerminationDataTest extends ProActiveTestClean {
public void testAddJobToTerminate() {
assertThat(terminationData.isEmpty(), is(true));
JobId jobId = new JobIdImpl(666, "readableName");
terminationData.addJobToTerminate(jobId);
terminationData.addJobToTerminate(jobId, null);
assertThat(terminationData.isEmpty(), is(false));
assertThat(terminationData.jobTerminated(jobId), is(true));
}
......@@ -142,9 +142,9 @@ public class TerminationDataTest extends ProActiveTestClean {
@Test
public void testHandleTerminationForJob() throws IOException, ClassNotFoundException {
JobId jobId = new JobIdImpl(666, "readableName");
terminationData.addJobToTerminate(jobId);
terminationData.addJobToTerminate(jobId, null);
terminationData.handleTermination(service);
Mockito.verify(service, Mockito.times(1)).terminateJobHandling(jobId);
Mockito.verify(service, Mockito.times(1)).terminateJobHandling(jobId, null);
}
@Test
......
pa.scheduler.core.automaticremovejobdelay=180
pa.scheduler.core.automaticremovejobcronexpression=* * * * *
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns="urn:proactive:jobdescriptor:dev" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:proactive:jobdescriptor:dev ../../../src/org/ow2/proactive/scheduler/common/xml/schemas/jobdescriptor/dev/schedulerjob.xsd"
name="Job_houseKeeping" priority="normal">
<description>Simple Job</description>
<taskFlow>
<task name="task1">
<description>Task 1</description>
<javaExecutable class="org.ow2.proactive.scheduler.examples.EmptyTask"/>
</task>
</taskFlow>
</job>
<?xml version="1.0" encoding="UTF-8"?>
<job xmlns="urn:proactive:jobdescriptor:dev" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:proactive:jobdescriptor:dev ../../../src/org/ow2/proactive/scheduler/common/xml/schemas/jobdescriptor/dev/schedulerjob.xsd"
name="Job_houseKeeping_with_GI" priority="normal">
<description>Simple Job</description>
<genericInformation>
<info name="REMOVE_DELAY" value="1:00"/>
</genericInformation>
<taskFlow>
<task name="task1">
<description>Task 1</description>
<javaExecutable class="org.ow2.proactive.scheduler.examples.EmptyTask"/>
</task>
</taskFlow>
</job>
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment