Unverified Commit 4ba3ecd8 authored by Fabien Viale's avatar Fabien Viale Committed by GitHub
Browse files

Merge pull request #3759 from fviale/master

Fix dataspace warnings in task logs
parents 589a7fd0 aec7d64e
......@@ -38,9 +38,9 @@ import org.ow2.proactive.scheduler.task.executors.TaskExecutor;
public class ProActiveForkedTaskLauncherFactory implements TaskLauncherFactory {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser)
throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser);
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser, taskLogger);
}
@Override
......
......@@ -38,9 +38,9 @@ import org.ow2.proactive.scheduler.task.executors.TaskExecutor;
public class ProActiveNonForkedTaskLauncherFactory implements TaskLauncherFactory {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser)
throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser);
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser, taskLogger);
}
@Override
......
......@@ -187,7 +187,8 @@ public class TaskLauncher implements InitActive {
DataSpaceNodeConfigurationAgent.lockCacheSpaceCleaning();
dataspaces = factory.createTaskDataspaces(taskId,
initializer.getNamingService(),
executableContainer.isRunAsUser());
executableContainer.isRunAsUser(),
taskLogger);
copyTaskLogsFromUserSpace(taskLogger.createLogFilePath(dataspaces.getScratchFolder()), dataspaces);
......
......@@ -36,8 +36,8 @@ import org.ow2.proactive.scheduler.task.executors.TaskExecutor;
public interface TaskLauncherFactory extends Serializable {
TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser)
throws Exception;
TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception;
TaskExecutor createTaskExecutor(File workingDir);
......
......@@ -64,6 +64,7 @@ import org.ow2.proactive.scheduler.common.SchedulerConstants;
import org.ow2.proactive.scheduler.common.task.TaskId;
import org.ow2.proactive.scheduler.common.task.dataspaces.InputSelector;
import org.ow2.proactive.scheduler.common.task.dataspaces.OutputSelector;
import org.ow2.proactive.scheduler.task.TaskLogger;
public class TaskProActiveDataspaces implements TaskDataspaces {
......@@ -98,11 +99,11 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
private SpaceInstanceInfo cacheSpaceInstanceInfo;
private transient StringBuffer clientLogs = new StringBuffer();
private transient ExecutorService executorTransfer = Executors.newFixedThreadPool(getFileTransferThreadPoolSize(),
new NamedThreadFactory("FileTransferThreadPool"));
private transient TaskLogger taskLogger;
/**
* Mainly for testing purposes
*/
......@@ -111,10 +112,16 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
}
public TaskProActiveDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) throws Exception {
this(taskId, namingService, isRunAsUser, null);
}
public TaskProActiveDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception {
this.taskId = taskId;
this.namingService = namingService;
this.runAsUser = isRunAsUser;
this.linuxOS = OperatingSystem.getOperatingSystem() == OperatingSystem.unix;
this.taskLogger = taskLogger;
initDataSpaces();
}
......@@ -375,12 +382,21 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
final String eol = System.lineSeparator();
final boolean hasEol = message.endsWith(eol);
if (level == DataspacesStatusLevel.ERROR) {
this.clientLogs.append("[DATASPACES-ERROR] ").append(message).append(hasEol ? "" : eol);
} else if (level == DataspacesStatusLevel.WARNING) {
this.clientLogs.append("[DATASPACES-WARNING] ").append(message).append(hasEol ? "" : eol);
} else if (level == DataspacesStatusLevel.INFO) {
this.clientLogs.append("[DATASPACES-INFO] ").append(message).append(hasEol ? "" : eol);
if (taskLogger != null) {
switch (level) {
case ERROR:
taskLogger.getErrorSink().print("[DATASPACES-ERROR] " + message + (hasEol ? "" : eol));
taskLogger.getErrorSink().flush();
break;
case WARNING:
taskLogger.getErrorSink().print("[DATASPACES-WARNING] " + message + (hasEol ? "" : eol));
taskLogger.getErrorSink().flush();
break;
case INFO:
taskLogger.getOutputSink().print("[DATASPACES-INFO] " + message + (hasEol ? "" : eol));
taskLogger.getOutputSink().flush();
break;
}
}
}
......@@ -404,85 +420,80 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
@Override
public void copyInputDataToScratch(List<InputSelector> inputSelectors)
throws FileSystemException, InterruptedException {
try {
if (inputSelectors == null) {
logger.debug("Input selector is empty, no file to copy");
return;
}
if (inputSelectors == null) {
logger.debug("Input selector is empty, no file to copy");
return;
}
ArrayList<DataSpacesFileObject> inputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> inputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceCacheFiles = new ArrayList<>();
findFilesToCopyFromInput(inputSelectors,
inputSpaceFiles,
outputSpaceFiles,
globalSpaceFiles,
userSpaceFiles,
inputSpaceCacheFiles,
outputSpaceCacheFiles,
globalSpaceCacheFiles,
userSpaceCacheFiles);
String inputSpaceUri = virtualResolve(INPUT);
String outputSpaceUri = virtualResolve(OUTPUT);
String globalSpaceUri = virtualResolve(GLOBAL);
String userSpaceUri = virtualResolve(USER);
boolean cacheTransferPresent = !inputSpaceCacheFiles.isEmpty() || !outputSpaceCacheFiles.isEmpty() ||
!globalSpaceCacheFiles.isEmpty() || !userSpaceCacheFiles.isEmpty();
if (cacheTransferPresent && CACHE != null) {
cacheTransferLock.lockInterruptibly();
try {
ArrayList<DataSpacesFileObject> inputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> inputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceCacheFiles = new ArrayList<>();
findFilesToCopyFromInput(inputSelectors,
inputSpaceFiles,
outputSpaceFiles,
globalSpaceFiles,
userSpaceFiles,
inputSpaceCacheFiles,
outputSpaceCacheFiles,
globalSpaceCacheFiles,
userSpaceCacheFiles);
String inputSpaceUri = virtualResolve(INPUT);
String outputSpaceUri = virtualResolve(OUTPUT);
String globalSpaceUri = virtualResolve(GLOBAL);
String userSpaceUri = virtualResolve(USER);
boolean cacheTransferPresent = !inputSpaceCacheFiles.isEmpty() || !outputSpaceCacheFiles.isEmpty() ||
!globalSpaceCacheFiles.isEmpty() || !userSpaceCacheFiles.isEmpty();
if (cacheTransferPresent && CACHE != null) {
cacheTransferLock.lockInterruptibly();
try {
Map<String, DataSpacesFileObject> filesToCopyToCache = createFolderHierarchySequentially(CACHE,
inputSpaceUri,
inputSpaceCacheFiles,
outputSpaceUri,
outputSpaceCacheFiles,
globalSpaceUri,
globalSpaceCacheFiles,
userSpaceUri,
userSpaceCacheFiles);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesCache = doCopyInputDataToSpace(CACHE, filesToCopyToCache);
handleResultsWhileTransferringFile(transferFuturesCache, "CACHE", startTime);
} finally {
if (cacheTransferPresent) {
cacheTransferLock.unlock();
}
Map<String, DataSpacesFileObject> filesToCopyToCache = createFolderHierarchySequentially(CACHE,
inputSpaceUri,
inputSpaceCacheFiles,
outputSpaceUri,
outputSpaceCacheFiles,
globalSpaceUri,
globalSpaceCacheFiles,
userSpaceUri,
userSpaceCacheFiles);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesCache = doCopyInputDataToSpace(CACHE, filesToCopyToCache);
handleResultsWhileTransferringFile(transferFuturesCache, "CACHE", startTime);
} finally {
if (cacheTransferPresent) {
cacheTransferLock.unlock();
}
} else if (cacheTransferPresent) {
logDataspacesStatus("CACHE dataspace is not available while file transfers to cache were required. Check the Node logs for errors.",
DataspacesStatusLevel.ERROR);
}
} else if (cacheTransferPresent) {
logDataspacesStatus("CACHE dataspace is not available while file transfers to cache were required. Check the Node logs for errors.",
DataspacesStatusLevel.ERROR);
}
Map<String, DataSpacesFileObject> filesToCopyToScratch = createFolderHierarchySequentially(SCRATCH,
inputSpaceUri,
inputSpaceFiles,
outputSpaceUri,
outputSpaceFiles,
globalSpaceUri,
globalSpaceFiles,
userSpaceUri,
userSpaceFiles);
Map<String, DataSpacesFileObject> filesToCopyToScratch = createFolderHierarchySequentially(SCRATCH,
inputSpaceUri,
inputSpaceFiles,
outputSpaceUri,
outputSpaceFiles,
globalSpaceUri,
globalSpaceFiles,
userSpaceUri,
userSpaceFiles);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesScratch = doCopyInputDataToSpace(SCRATCH, filesToCopyToScratch);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesScratch = doCopyInputDataToSpace(SCRATCH, filesToCopyToScratch);
handleResultsWhileTransferringFile(transferFuturesScratch, "LOCAL", startTime);
handleResultsWhileTransferringFile(transferFuturesScratch, "LOCAL", startTime);
} finally {
// display dataspaces error and warns if any
displayDataspacesStatus();
}
}
private Map<String, DataSpacesFileObject> createFolderHierarchySequentially(DataSpacesFileObject space,
......@@ -703,9 +714,8 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
break;
case CacheFromUserSpace:
userResultsCacheFutures.add(findFilesToCopyFromInput(USER, "USER", is, selector));
case none:
default:
//do nothing
break;
}
}
......@@ -849,55 +859,50 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
@Override
public void copyScratchDataToOutput(List<OutputSelector> outputSelectors) throws FileSystemException {
try {
if (outputSelectors == null) {
logger.debug("Output selector is empty, no file to copy");
return;
}
if (outputSelectors == null) {
logger.debug("Output selector is empty, no file to copy");
return;
}
SCRATCH.refresh();
checkOutputSpacesConfigured(outputSelectors);
ArrayList<DataSpacesFileObject> results = new ArrayList<>();
FileSystemException toBeThrown = null;
for (OutputSelector os : outputSelectors) {
org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector selector = new org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector();
selector.setIncludes(os.getOutputFiles().getIncludes());
selector.setExcludes(os.getOutputFiles().getExcludes());
switch (os.getMode()) {
case TransferToOutputSpace:
if (OUTPUT != null) {
toBeThrown = copyScratchDataToOutput(OUTPUT, "OUTPUT", os, selector, results);
}
break;
case TransferToGlobalSpace:
if (GLOBAL != null) {
toBeThrown = copyScratchDataToOutput(GLOBAL, "GLOBAL", os, selector, results);
}
break;
case TransferToUserSpace:
if (USER != null) {
toBeThrown = copyScratchDataToOutput(USER, "USER", os, selector, results);
break;
}
case none:
break;
}
SCRATCH.refresh();
results.clear();
}
checkOutputSpacesConfigured(outputSelectors);
ArrayList<DataSpacesFileObject> results = new ArrayList<>();
FileSystemException toBeThrown = null;
for (OutputSelector os : outputSelectors) {
org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector selector = new org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector();
selector.setIncludes(os.getOutputFiles().getIncludes());
selector.setExcludes(os.getOutputFiles().getExcludes());
if (toBeThrown != null) {
throw toBeThrown;
switch (os.getMode()) {
case TransferToOutputSpace:
if (OUTPUT != null) {
toBeThrown = copyScratchDataToOutput(OUTPUT, "OUTPUT", os, selector, results);
}
break;
case TransferToGlobalSpace:
if (GLOBAL != null) {
toBeThrown = copyScratchDataToOutput(GLOBAL, "GLOBAL", os, selector, results);
}
break;
case TransferToUserSpace:
if (USER != null) {
toBeThrown = copyScratchDataToOutput(USER, "USER", os, selector, results);
}
break;
default:
// do nothing
}
} finally {
// display dataspaces error and warns if any
displayDataspacesStatus();
results.clear();
}
if (toBeThrown != null) {
throw toBeThrown;
}
}
private void checkOutputSpacesConfigured(List<OutputSelector> outputSelectors) {
......@@ -990,16 +995,6 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
return null;
}
/**
* Display the content of the dataspaces status buffer on stderr if non empty.
*/
private void displayDataspacesStatus() {
if (this.clientLogs.length() != 0) {
logger.warn(clientLogs);
this.clientLogs = new StringBuffer();
}
}
private void handleOutput(final DataSpacesFileObject dataspaceDestination, String spaceName,
org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector selector,
List<DataSpacesFileObject> results) throws FileSystemException {
......
......@@ -51,7 +51,8 @@ public class SlowDataspacesTaskLauncherFactory extends ProActiveForkedTaskLaunch
}
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return new SlowDataspaces(taskRunning);
}
......
......@@ -225,8 +225,8 @@ public class TaskLauncherTest extends TaskLauncherTestAbstract {
runTaskLauncher(createLauncherWithInjectedMocks(initializer, new TestTaskLauncherFactory() {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService,
boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataspacesMock;
}
}), executableContainer);
......@@ -250,8 +250,8 @@ public class TaskLauncherTest extends TaskLauncherTestAbstract {
runTaskLauncher(createLauncherWithInjectedMocks(initializer, new TestTaskLauncherFactory() {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService,
boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataspacesMock;
}
}), executableContainer);
......@@ -273,8 +273,8 @@ public class TaskLauncherTest extends TaskLauncherTestAbstract {
runTaskLauncher(createLauncherWithInjectedMocks(initializer, new TestTaskLauncherFactory() {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService,
boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataspacesMock;
}
}), executableContainer);
......
......@@ -64,7 +64,8 @@ public class TestTaskLauncherFactory extends ProActiveForkedTaskLauncherFactory
}
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataSpaces;
}
......
......@@ -33,7 +33,6 @@ import org.junit.Test;
import org.objectweb.proactive.ActiveObjectCreationException;
import org.objectweb.proactive.core.node.NodeException;
import org.objectweb.proactive.extensions.dataspaces.core.naming.NamingService;
import org.ow2.proactive.scheduler.common.TaskTerminateNotification;
import org.ow2.proactive.scheduler.common.exception.WalltimeExceededException;
import org.ow2.proactive.scheduler.common.task.TaskId;
import org.ow2.proactive.scheduler.common.task.TaskResult;
......@@ -105,7 +104,8 @@ public class WalltimeTaskLauncherTest extends TaskLauncherTestAbstract {
private class ForkingTaskLauncherFactory extends ProActiveForkedTaskLauncherFactory {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return new TestTaskLauncherFactory.TaskFileDataspaces();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment