From 968edb67e92add03b36d3421dda80182f4f182ac Mon Sep 17 00:00:00 2001 From: mklkun Date: Mon, 5 Jul 2021 23:58:46 +0200 Subject: [PATCH] Add cleaning ProActive Scheduler jobs mechanism in SAL --- .../org/activeeon/morphemic/PAGateway.java | 23 +++- .../deployment/PASchedulerGateway.java | 101 +++++++++++++++--- 2 files changed, 107 insertions(+), 17 deletions(-) diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java index e5f03958..736d6568 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java @@ -7,6 +7,7 @@ import org.activeeon.morphemic.application.deployment.PASchedulerGateway; import org.activeeon.morphemic.infrastructure.deployment.PAConnectorIaasGateway; import org.activeeon.morphemic.infrastructure.deployment.PAResourceManagerGateway; import org.activeeon.morphemic.model.*; +import org.activeeon.morphemic.model.Job; import org.activeeon.morphemic.service.*; import org.apache.commons.lang3.Validate; import org.json.JSONArray; @@ -15,10 +16,7 @@ import org.ow2.proactive.resourcemanager.common.event.RMNodeEvent; import org.ow2.proactive.resourcemanager.exception.RMException; import org.ow2.proactive.scheduler.common.exception.NotConnectedException; import org.ow2.proactive.scheduler.common.exception.UserException; -import org.ow2.proactive.scheduler.common.job.JobId; -import org.ow2.proactive.scheduler.common.job.JobResult; -import org.ow2.proactive.scheduler.common.job.JobState; -import org.ow2.proactive.scheduler.common.job.TaskFlowJob; +import org.ow2.proactive.scheduler.common.job.*; import org.ow2.proactive.scheduler.common.task.ScriptTask; import org.ow2.proactive.scheduler.common.task.TaskResult; import org.ow2.proactive.scheduler.common.task.TaskVariable; @@ -702,6 +700,23 @@ public class PAGateway { //TODO } + /** + * Kill all active jobs in ProActive Scheduler + */ + public void killAllActivePAJobs() { + List activeJobInfos = schedulerGateway.getActiveJobs(0, 1000); + activeJobInfos.forEach(activeJobInfo -> schedulerGateway.killJob(activeJobInfo.getJobId().value())); + } + + /** + * Remove all jobs from the ProActive Scheduler + */ + public void removeAllPAJobs() { + killAllActivePAJobs(); + schedulerGateway.getJobs(0, 1000).getList() + .forEach(jobInfo -> schedulerGateway.removeJob(jobInfo.getJobId().value())); + } + /** * Stop jobs * @param jobIDs List of job IDs to stop diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java index bdc0d256..c49e97e2 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java @@ -2,11 +2,13 @@ package org.activeeon.morphemic.application.deployment; import lombok.extern.slf4j.Slf4j; import org.activeeon.morphemic.service.SchedulerConnectionHelper; +import org.ow2.proactive.db.SortOrder; +import org.ow2.proactive.db.SortParameter; +import org.ow2.proactive.scheduler.common.JobFilterCriteria; +import org.ow2.proactive.scheduler.common.JobSortParameter; +import org.ow2.proactive.scheduler.common.Page; import org.ow2.proactive.scheduler.common.exception.*; -import org.ow2.proactive.scheduler.common.job.Job; -import org.ow2.proactive.scheduler.common.job.JobId; -import org.ow2.proactive.scheduler.common.job.JobResult; -import org.ow2.proactive.scheduler.common.job.JobState; +import org.ow2.proactive.scheduler.common.job.*; import org.ow2.proactive.scheduler.common.task.TaskResult; import org.ow2.proactive_grid_cloud_portal.smartproxy.RestSmartProxyImpl; @@ -17,12 +19,17 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; @Slf4j public class PASchedulerGateway { private RestSmartProxyImpl restSmartProxy; + protected static final List> DEFAULT_JOB_SORT_PARAMS = Arrays.asList( + new SortParameter<>(JobSortParameter.STATE, SortOrder.ASC), + new SortParameter<>(JobSortParameter.ID, SortOrder.DESC)); + /** * Construct a gateway to the ProActive Scheduler * @param paUrl ProActive URL (exp: http://try.activeeon.com:8080/) @@ -79,7 +86,7 @@ public class PASchedulerGateway { /** * Submit a ProActive job to the scheduler - * @param xmlFile A ProActive job xml file + * @param xmlFile A ProActive job xml file * @param variables A variables map * @return JobId */ @@ -126,7 +133,7 @@ public class PASchedulerGateway { /** * Wait for a job - * @param jobId A ProActive job ID + * @param jobId A ProActive job ID * @param timeout The waiting timeout * @return The job result */ @@ -164,15 +171,17 @@ public class PASchedulerGateway { } /** - * Get job results map - * @param jobId A list of ProActive jobs ID - * @return The jobs results map + * Kill the job represented by jobId + * @param jobId A ProActive job ID + * @return true if success, false if not. */ public boolean killJob(String jobId) { reconnectIfDisconnected(); boolean result = false; + LOGGER.debug("Killing ProActive job: " + jobId); try { result = restSmartProxy.killJob(jobId); + LOGGER.info("ProActive job " + jobId + " killed successfully."); } catch (NotConnectedException nce) { LOGGER.error("ERROR: Not able to kill the job due to a NotConnectedException: " + nce.toString()); } catch (UnknownJobException uje) { @@ -183,11 +192,77 @@ public class PASchedulerGateway { return result; } + /** + * Delete a job + * @param jobId The ID of the job to delete + * @return true if success, false if the job not yet finished (not removed, kill the job then remove it + */ + public boolean removeJob(String jobId) { + reconnectIfDisconnected(); + boolean result = false; + LOGGER.debug("Removing ProActive job: " + jobId); + try { + result = restSmartProxy.removeJob(jobId); + LOGGER.info("ProActive job " + jobId + " removed successfully."); + } catch (NotConnectedException nce) { + LOGGER.error("ERROR: Not able to remove the job due to a NotConnectedException: " + Arrays.toString(nce.getStackTrace())); + } catch (UnknownJobException uje) { + LOGGER.error("ERROR: Unknown job ID: " + Arrays.toString(uje.getStackTrace())); + } catch (PermissionException pe) { + LOGGER.error("ERROR: Not able to remove the job due to a PermissionException: " + Arrays.toString(pe.getStackTrace())); + } + return result; + } + + /** + * Retrieves a job list of the scheduler. + * + * @param index says to start from this job is + * @param limit max number of jobs to retrieve + * @return jobs list according to all criteria + */ + public Page getJobs(int index, int limit) { + Page jobInfos = null; + LOGGER.debug("Retrieving from ProActive Scheduler the list of " + limit + " active jobs, starting from index " + index); + try { + jobInfos = restSmartProxy.getJobs(index, + limit, + new JobFilterCriteria(false, + true, + true, + true, + true), + DEFAULT_JOB_SORT_PARAMS); + LOGGER.info("List of jobs retrieved: " + jobInfos.toString()); + } catch (NotConnectedException nce) { + LOGGER.error("ERROR: Not able to retrieve jobs due to a NotConnectedException: " + Arrays.toString(nce.getStackTrace())); + } catch (PermissionException pe) { + LOGGER.error("ERROR: Not able to remove the job due to a PermissionException: " + Arrays.toString(pe.getStackTrace())); + } + return jobInfos; + } + + /** + * Retrieves a job list of the scheduler. + * + * @param index says to start from this job is + * @param limit max number of jobs to retrieve + * @return jobs list according to all criteria + */ + public List getActiveJobs(int index, int limit) { + List activeJobInfos = this.getJobs(index, limit).getList() + .stream() + .filter(activeJobInfo -> activeJobInfo.getStatus().isJobAlive()) + .collect(Collectors.toList()); + LOGGER.info("Job list filtered to only active ones: " + activeJobInfos.toString()); + return activeJobInfos; + } + /** * Wait for a task - * @param jobId A ProActive job ID + * @param jobId A ProActive job ID * @param taskName A task name - * @param timeout The waiting timeout + * @param timeout The waiting timeout * @return The task result */ public TaskResult waitForTask(String jobId, String taskName, long timeout) { @@ -211,7 +286,7 @@ public class PASchedulerGateway { /** * Get a task result - * @param jobId A ProActive job ID + * @param jobId A ProActive job ID * @param taskName A task name * @return The task result */ @@ -239,7 +314,7 @@ public class PASchedulerGateway { */ public void connect(String username, String password) { // Connect to the Scheduler API - restSmartProxy = SchedulerConnectionHelper.connect(username,password); + restSmartProxy = SchedulerConnectionHelper.connect(username, password); } /** -- GitLab