Unverified Commit 778fcaf1 authored by Fabien Viale's avatar Fabien Viale Committed by GitHub
Browse files

Merge pull request #3978 from fviale/master

SchedulingMethodImpl: improve needed nodes calculation performance
parents 432d8e10 645ff420
......@@ -108,10 +108,6 @@ pa.scheduler.policy=org.ow2.proactive.scheduler.policy.ExtendedSchedulerPolicy
# Defines the maximum number of tasks to be scheduled in each scheduling loop.
pa.scheduler.policy.nbtaskperloop=10
# if set to true, the scheduling loop will partition the task list according to the amount of free nodes.
# Enabling it can cause performance issues
pa.scheduler.policy.use.free.nodes=false
# Path of the license properties file
pa.scheduler.license.policy.configuration=config/scheduler/license.properties
......
......@@ -61,9 +61,6 @@ public enum PASchedulerProperties implements PACommonProperties {
/** Defines the maximum number of tasks to be scheduled in each scheduling loop (not used any more). */
SCHEDULER_POLICY_NBTASKPERLOOP("pa.scheduler.policy.nbtaskperloop", PropertyType.INTEGER, "10"),
/** If set to true, the scheduling loop will partition the task list according to the amount of free nodes. Enabling it can cause performance issues */
SCHEDULER_POLICY_USE_FREE_NODES("pa.scheduler.policy.use.free.nodes", PropertyType.BOOLEAN, "false"),
/** Path of the license properties file. */
SCHEDULER_LICENSE_POLICY_CONFIGURATION("pa.scheduler.license.policy.configuration", PropertyType.STRING),
......
......@@ -201,13 +201,15 @@ public final class SchedulingMethodImpl implements SchedulingMethod {
}
private void updateNeededNodes() {
updateNeededNodes(Collections.EMPTY_LIST);
updateNeededNodes(0);
}
private void updateNeededNodes(Collection<? extends TaskDescriptor> eligibleByPolicyTasks) {
private int computeNeededNodes(Collection<? extends TaskDescriptor> eligibleByPolicyTasks) {
// Needed nodes
final int neededNodes = eligibleByPolicyTasks.stream().mapToInt(TaskDescriptor::getNumberOfNodesNeeded).sum();
return eligibleByPolicyTasks.stream().mapToInt(TaskDescriptor::getNumberOfNodesNeeded).sum();
}
private void updateNeededNodes(int neededNodes) {
// for statistics used in RM portal
getRMProxiesManager().getRmProxy().setNeededNodes(neededNodes);
......@@ -235,7 +237,7 @@ public final class SchedulingMethodImpl implements SchedulingMethod {
//if there is no free resources, stop it right now without starting any task
if (freeResources.isEmpty()) {
updateNeededNodes(fullListOfTaskRetrievedFromPolicy);
updateNeededNodes(computeNeededNodes(fullListOfTaskRetrievedFromPolicy));
return 0;
}
......@@ -314,160 +316,138 @@ public final class SchedulingMethodImpl implements SchedulingMethod {
}
private int selectAndStartTasks(Policy currentPolicy, Map<JobId, JobDescriptor> jobMap, Set<String> freeResources,
LinkedList<EligibleTaskDescriptor> fullListOfTaskRetrievedFromPolicy) {
LinkedList<EligibleTaskDescriptor> tasksRetrievedFromPolicy) {
int numberOfTaskStarted = 0;
VariableBatchSizeIterator progressiveIterator = new VariableBatchSizeIterator(fullListOfTaskRetrievedFromPolicy);
Set<EligibleTaskDescriptor> rest = new HashSet<>(fullListOfTaskRetrievedFromPolicy);
while (progressiveIterator.hasMoreElements() && !freeResources.isEmpty()) {
schedulingMainLoopTimingLogger.start("updateVariablesForTasksToSchedule");
LinkedList<EligibleTaskDescriptor> taskRetrievedFromPolicy = new LinkedList<>(progressiveIterator.getNextElements(PASchedulerProperties.SCHEDULER_POLICY_USE_FREE_NODES.getValueAsBoolean() ? freeResources.size()
: Integer.MAX_VALUE));
if (logger.isDebugEnabled()) {
loggingEligibleTasksDetails(fullListOfTaskRetrievedFromPolicy, taskRetrievedFromPolicy);
}
int neededNodes = computeNeededNodes(tasksRetrievedFromPolicy);
schedulingMainLoopTimingLogger.start("updateVariablesForTasksToSchedule");
if (logger.isDebugEnabled()) {
loggingEligibleTasksDetails(tasksRetrievedFromPolicy);
}
updateVariablesForTasksToSchedule(taskRetrievedFromPolicy);
updateVariablesForTasksToSchedule(tasksRetrievedFromPolicy);
schedulingMainLoopTimingLogger.end("updateVariablesForTasksToSchedule");
schedulingMainLoopTimingLogger.end("updateVariablesForTasksToSchedule");
schedulingMainLoopTimingLogger.start("loadAndInit");
schedulingMainLoopTimingLogger.start("loadAndInit");
for (Iterator<EligibleTaskDescriptor> iterator = taskRetrievedFromPolicy.iterator(); iterator.hasNext();) {
EligibleTaskDescriptorImpl taskDescriptor = (EligibleTaskDescriptorImpl) iterator.next();
// load and Initialize the executable container
InternalTask internalTask = taskDescriptor.getInternal();
try {
loadAndInit(internalTask);
} catch (Exception e) {
handleLoadExecutableContainerError(internalTask, iterator, e);
}
for (Iterator<EligibleTaskDescriptor> iterator = tasksRetrievedFromPolicy.iterator(); iterator.hasNext();) {
EligibleTaskDescriptorImpl taskDescriptor = (EligibleTaskDescriptorImpl) iterator.next();
// load and Initialize the executable container
InternalTask internalTask = taskDescriptor.getInternal();
try {
loadAndInit(internalTask);
} catch (Exception e) {
handleLoadExecutableContainerError(internalTask, iterator, e);
}
}
schedulingMainLoopTimingLogger.end("loadAndInit");
while (!taskRetrievedFromPolicy.isEmpty()) {
if (freeResources.isEmpty()) {
break;
}
schedulingMainLoopTimingLogger.end("loadAndInit");
//get the next compatible tasks from the whole returned policy tasks
LinkedList<EligibleTaskDescriptor> tasksToSchedule = new LinkedList<>();
int neededResourcesNumber = 0;
while (!tasksRetrievedFromPolicy.isEmpty() && !freeResources.isEmpty()) {
schedulingMainLoopTimingLogger.start("getNextcompatibleTasks");
//get the next compatible tasks from the whole returned policy tasks
LinkedList<EligibleTaskDescriptor> tasksToSchedule = new LinkedList<>();
int neededResourcesNumber = 0;
while (!taskRetrievedFromPolicy.isEmpty() && neededResourcesNumber == 0) {
//the loop will search for next compatible task until it find something
neededResourcesNumber = getNextcompatibleTasks(jobMap, taskRetrievedFromPolicy, tasksToSchedule);
}
schedulingMainLoopTimingLogger.start("getNextcompatibleTasks");
schedulingMainLoopTimingLogger.end("getNextcompatibleTasks");
while (!tasksRetrievedFromPolicy.isEmpty() && neededResourcesNumber == 0) {
//the loop will search for next compatible task until it find something
neededResourcesNumber = getNextcompatibleTasks(jobMap, tasksRetrievedFromPolicy, tasksToSchedule);
}
if (logger.isDebugEnabled()) {
logger.debug("tasksToSchedule : " + tasksToSchedule);
}
schedulingMainLoopTimingLogger.end("getNextcompatibleTasks");
logger.debug("required number of nodes : " + neededResourcesNumber);
if (neededResourcesNumber == 0 || tasksToSchedule.isEmpty()) {
break;
}
if (logger.isDebugEnabled()) {
logger.debug("tasksToSchedule : " + tasksToSchedule);
}
schedulingMainLoopTimingLogger.start("getRMNodes");
NodeSet nodeSet = getRMNodes(jobMap, neededResourcesNumber, tasksToSchedule, freeResources);
schedulingMainLoopTimingLogger.end("getRMNodes");
logger.debug("required number of nodes : " + neededResourcesNumber);
if (neededResourcesNumber == 0 || tasksToSchedule.isEmpty()) {
break;
}
if (nodeSet != null) {
freeResources.removeAll(nodeSet.getAllNodesUrls());
}
schedulingMainLoopTimingLogger.start("getRMNodes");
NodeSet nodeSet = getRMNodes(jobMap, neededResourcesNumber, tasksToSchedule, freeResources);
schedulingMainLoopTimingLogger.end("getRMNodes");
//start selected tasks
Node node = null;
InternalJob currentJob = null;
try {
while (nodeSet != null && !nodeSet.isEmpty()) {
EligibleTaskDescriptor taskDescriptor = tasksToSchedule.removeFirst();
currentJob = ((JobDescriptorImpl) jobMap.get(taskDescriptor.getJobId())).getInternal();
InternalTask internalTask = ((EligibleTaskDescriptorImpl) taskDescriptor).getInternal();
if (nodeSet != null) {
freeResources.removeAll(nodeSet.getAllNodesUrls());
}
if (currentPolicy.isTaskExecutable(nodeSet, taskDescriptor)) {
//create launcher and try to start the task
node = nodeSet.get(0);
//start selected tasks
Node node = null;
InternalJob currentJob = null;
try {
while (nodeSet != null && !nodeSet.isEmpty()) {
EligibleTaskDescriptor taskDescriptor = tasksToSchedule.removeFirst();
currentJob = ((JobDescriptorImpl) jobMap.get(taskDescriptor.getJobId())).getInternal();
InternalTask internalTask = ((EligibleTaskDescriptorImpl) taskDescriptor).getInternal();
schedulingMainLoopTimingLogger.start("createExecution");
if (currentPolicy.isTaskExecutable(nodeSet, taskDescriptor)) {
//create launcher and try to start the task
node = nodeSet.get(0);
if (createExecution(nodeSet, node, currentJob, internalTask, taskDescriptor)) {
rest.remove(taskDescriptor);
numberOfTaskStarted++;
}
schedulingMainLoopTimingLogger.end("createExecution");
schedulingMainLoopTimingLogger.start("createExecution");
if (createExecution(nodeSet, node, currentJob, internalTask, taskDescriptor)) {
neededNodes -= taskDescriptor.getNumberOfNodesNeeded();
numberOfTaskStarted++;
}
schedulingMainLoopTimingLogger.end("createExecution");
//if every task that should be launched have been removed
if (tasksToSchedule.isEmpty()) {
//get back unused nodes to the RManager
if (!nodeSet.isEmpty()) {
schedulingMainLoopTimingLogger.start("releaseNodes");
releaseNodes(currentJob, nodeSet);
freeResources.addAll(nodeSet.getAllNodesUrls());
schedulingMainLoopTimingLogger.end("releaseNodes");
}
//and leave the loop
break;
}
}
} catch (ActiveObjectCreationException e1) {
//Something goes wrong with the active object creation (createLauncher)
logger.warn("An exception occured while creating the task launcher.", e1);
//so try to get back every remaining nodes to the resource manager
try {
releaseNodes(currentJob, nodeSet);
freeResources.addAll(nodeSet.getAllNodesUrls());
} catch (Exception e2) {
logger.info("Unable to get back the nodeSet to the RM", e2);
}
if (--activeObjectCreationRetryTimeNumber == 0) {
//if every task that should be launched have been removed
if (tasksToSchedule.isEmpty()) {
//get back unused nodes to the RManager
if (!nodeSet.isEmpty()) {
schedulingMainLoopTimingLogger.start("releaseNodes");
releaseNodes(currentJob, nodeSet);
freeResources.addAll(nodeSet.getAllNodesUrls());
schedulingMainLoopTimingLogger.end("releaseNodes");
}
//and leave the loop
break;
}
} catch (Exception e1) {
//if we are here, it is that something append while launching the current task.
logger.warn("An exception occured while starting task.", e1);
//so try to get back every remaining nodes to the resource manager
try {
releaseNodes(currentJob, nodeSet);
freeResources.addAll(nodeSet.getAllNodesUrls());
} catch (Exception e2) {
logger.info("Unable to get back the nodeSet to the RM", e2);
}
}
}
if (freeResources.isEmpty()) {
break;
}
if (activeObjectCreationRetryTimeNumber == 0) {
break;
} catch (ActiveObjectCreationException e1) {
//Something goes wrong with the active object creation (createLauncher)
logger.warn("An exception occured while creating the task launcher.", e1);
//so try to get back every remaining nodes to the resource manager
try {
releaseNodes(currentJob, nodeSet);
freeResources.addAll(nodeSet.getAllNodesUrls());
} catch (Exception e2) {
logger.info("Unable to get back the nodeSet to the RM", e2);
}
if (--activeObjectCreationRetryTimeNumber == 0) {
break;
}
} catch (Exception e1) {
//if we are here, it is that something append while launching the current task.
logger.warn("An exception occured while starting task.", e1);
//so try to get back every remaining nodes to the resource manager
try {
releaseNodes(currentJob, nodeSet);
freeResources.addAll(nodeSet.getAllNodesUrls());
} catch (Exception e2) {
logger.info("Unable to get back the nodeSet to the RM", e2);
}
}
}
// number of nodes needed to start all pending tasks
updateNeededNodes(rest);
updateNeededNodes(neededNodes);
return numberOfTaskStarted;
}
private void loggingEligibleTasksDetails(LinkedList<EligibleTaskDescriptor> fullListOfTaskRetrievedFromPolicy,
LinkedList<EligibleTaskDescriptor> taskRetrievedFromPolicy) {
logger.debug("full list of eligible tasks: " +
(fullListOfTaskRetrievedFromPolicy.size() < 5 ? fullListOfTaskRetrievedFromPolicy
: fullListOfTaskRetrievedFromPolicy.size()));
logger.debug("working list of eligible tasks: " +
private void loggingEligibleTasksDetails(LinkedList<EligibleTaskDescriptor> taskRetrievedFromPolicy) {
logger.debug("list of eligible tasks: " +
(taskRetrievedFromPolicy.size() < 5 ? taskRetrievedFromPolicy : taskRetrievedFromPolicy.size()));
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment