Commit aec7d64e authored by Fabien Viale's avatar Fabien Viale
Browse files

Fix dataspace warnings in task logs

Warnings or errors when transferring files using dataspaces used to appear in task logs but these logs disappeared (regression).
This commit fixes the issue.
Addressed reviews
parent 11c8f454
......@@ -38,9 +38,9 @@ import org.ow2.proactive.scheduler.task.executors.TaskExecutor;
public class ProActiveForkedTaskLauncherFactory implements TaskLauncherFactory {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser)
throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser);
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser, taskLogger);
}
@Override
......
......@@ -38,9 +38,9 @@ import org.ow2.proactive.scheduler.task.executors.TaskExecutor;
public class ProActiveNonForkedTaskLauncherFactory implements TaskLauncherFactory {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser)
throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser);
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception {
return new TaskProActiveDataspaces(taskId, namingService, isRunAsUser, taskLogger);
}
@Override
......
......@@ -187,7 +187,8 @@ public class TaskLauncher implements InitActive {
DataSpaceNodeConfigurationAgent.lockCacheSpaceCleaning();
dataspaces = factory.createTaskDataspaces(taskId,
initializer.getNamingService(),
executableContainer.isRunAsUser());
executableContainer.isRunAsUser(),
taskLogger);
copyTaskLogsFromUserSpace(taskLogger.createLogFilePath(dataspaces.getScratchFolder()), dataspaces);
......
......@@ -36,8 +36,8 @@ import org.ow2.proactive.scheduler.task.executors.TaskExecutor;
public interface TaskLauncherFactory extends Serializable {
TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser)
throws Exception;
TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception;
TaskExecutor createTaskExecutor(File workingDir);
......
......@@ -64,6 +64,7 @@ import org.ow2.proactive.scheduler.common.SchedulerConstants;
import org.ow2.proactive.scheduler.common.task.TaskId;
import org.ow2.proactive.scheduler.common.task.dataspaces.InputSelector;
import org.ow2.proactive.scheduler.common.task.dataspaces.OutputSelector;
import org.ow2.proactive.scheduler.task.TaskLogger;
public class TaskProActiveDataspaces implements TaskDataspaces {
......@@ -98,11 +99,11 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
private SpaceInstanceInfo cacheSpaceInstanceInfo;
private transient StringBuffer clientLogs = new StringBuffer();
private transient ExecutorService executorTransfer = Executors.newFixedThreadPool(getFileTransferThreadPoolSize(),
new NamedThreadFactory("FileTransferThreadPool"));
private transient TaskLogger taskLogger;
/**
* Mainly for testing purposes
*/
......@@ -111,10 +112,16 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
}
public TaskProActiveDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) throws Exception {
this(taskId, namingService, isRunAsUser, null);
}
public TaskProActiveDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) throws Exception {
this.taskId = taskId;
this.namingService = namingService;
this.runAsUser = isRunAsUser;
this.linuxOS = OperatingSystem.getOperatingSystem() == OperatingSystem.unix;
this.taskLogger = taskLogger;
initDataSpaces();
}
......@@ -375,12 +382,21 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
final String eol = System.lineSeparator();
final boolean hasEol = message.endsWith(eol);
if (level == DataspacesStatusLevel.ERROR) {
this.clientLogs.append("[DATASPACES-ERROR] ").append(message).append(hasEol ? "" : eol);
} else if (level == DataspacesStatusLevel.WARNING) {
this.clientLogs.append("[DATASPACES-WARNING] ").append(message).append(hasEol ? "" : eol);
} else if (level == DataspacesStatusLevel.INFO) {
this.clientLogs.append("[DATASPACES-INFO] ").append(message).append(hasEol ? "" : eol);
if (taskLogger != null) {
switch (level) {
case ERROR:
taskLogger.getErrorSink().print("[DATASPACES-ERROR] " + message + (hasEol ? "" : eol));
taskLogger.getErrorSink().flush();
break;
case WARNING:
taskLogger.getErrorSink().print("[DATASPACES-WARNING] " + message + (hasEol ? "" : eol));
taskLogger.getErrorSink().flush();
break;
case INFO:
taskLogger.getOutputSink().print("[DATASPACES-INFO] " + message + (hasEol ? "" : eol));
taskLogger.getOutputSink().flush();
break;
}
}
}
......@@ -404,85 +420,80 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
@Override
public void copyInputDataToScratch(List<InputSelector> inputSelectors)
throws FileSystemException, InterruptedException {
try {
if (inputSelectors == null) {
logger.debug("Input selector is empty, no file to copy");
return;
}
if (inputSelectors == null) {
logger.debug("Input selector is empty, no file to copy");
return;
}
ArrayList<DataSpacesFileObject> inputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> inputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceCacheFiles = new ArrayList<>();
findFilesToCopyFromInput(inputSelectors,
inputSpaceFiles,
outputSpaceFiles,
globalSpaceFiles,
userSpaceFiles,
inputSpaceCacheFiles,
outputSpaceCacheFiles,
globalSpaceCacheFiles,
userSpaceCacheFiles);
String inputSpaceUri = virtualResolve(INPUT);
String outputSpaceUri = virtualResolve(OUTPUT);
String globalSpaceUri = virtualResolve(GLOBAL);
String userSpaceUri = virtualResolve(USER);
boolean cacheTransferPresent = !inputSpaceCacheFiles.isEmpty() || !outputSpaceCacheFiles.isEmpty() ||
!globalSpaceCacheFiles.isEmpty() || !userSpaceCacheFiles.isEmpty();
if (cacheTransferPresent && CACHE != null) {
cacheTransferLock.lockInterruptibly();
try {
ArrayList<DataSpacesFileObject> inputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> inputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> outputSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> globalSpaceCacheFiles = new ArrayList<>();
ArrayList<DataSpacesFileObject> userSpaceCacheFiles = new ArrayList<>();
findFilesToCopyFromInput(inputSelectors,
inputSpaceFiles,
outputSpaceFiles,
globalSpaceFiles,
userSpaceFiles,
inputSpaceCacheFiles,
outputSpaceCacheFiles,
globalSpaceCacheFiles,
userSpaceCacheFiles);
String inputSpaceUri = virtualResolve(INPUT);
String outputSpaceUri = virtualResolve(OUTPUT);
String globalSpaceUri = virtualResolve(GLOBAL);
String userSpaceUri = virtualResolve(USER);
boolean cacheTransferPresent = !inputSpaceCacheFiles.isEmpty() || !outputSpaceCacheFiles.isEmpty() ||
!globalSpaceCacheFiles.isEmpty() || !userSpaceCacheFiles.isEmpty();
if (cacheTransferPresent && CACHE != null) {
cacheTransferLock.lockInterruptibly();
try {
Map<String, DataSpacesFileObject> filesToCopyToCache = createFolderHierarchySequentially(CACHE,
inputSpaceUri,
inputSpaceCacheFiles,
outputSpaceUri,
outputSpaceCacheFiles,
globalSpaceUri,
globalSpaceCacheFiles,
userSpaceUri,
userSpaceCacheFiles);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesCache = doCopyInputDataToSpace(CACHE, filesToCopyToCache);
handleResultsWhileTransferringFile(transferFuturesCache, "CACHE", startTime);
} finally {
if (cacheTransferPresent) {
cacheTransferLock.unlock();
}
Map<String, DataSpacesFileObject> filesToCopyToCache = createFolderHierarchySequentially(CACHE,
inputSpaceUri,
inputSpaceCacheFiles,
outputSpaceUri,
outputSpaceCacheFiles,
globalSpaceUri,
globalSpaceCacheFiles,
userSpaceUri,
userSpaceCacheFiles);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesCache = doCopyInputDataToSpace(CACHE, filesToCopyToCache);
handleResultsWhileTransferringFile(transferFuturesCache, "CACHE", startTime);
} finally {
if (cacheTransferPresent) {
cacheTransferLock.unlock();
}
} else if (cacheTransferPresent) {
logDataspacesStatus("CACHE dataspace is not available while file transfers to cache were required. Check the Node logs for errors.",
DataspacesStatusLevel.ERROR);
}
} else if (cacheTransferPresent) {
logDataspacesStatus("CACHE dataspace is not available while file transfers to cache were required. Check the Node logs for errors.",
DataspacesStatusLevel.ERROR);
}
Map<String, DataSpacesFileObject> filesToCopyToScratch = createFolderHierarchySequentially(SCRATCH,
inputSpaceUri,
inputSpaceFiles,
outputSpaceUri,
outputSpaceFiles,
globalSpaceUri,
globalSpaceFiles,
userSpaceUri,
userSpaceFiles);
Map<String, DataSpacesFileObject> filesToCopyToScratch = createFolderHierarchySequentially(SCRATCH,
inputSpaceUri,
inputSpaceFiles,
outputSpaceUri,
outputSpaceFiles,
globalSpaceUri,
globalSpaceFiles,
userSpaceUri,
userSpaceFiles);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesScratch = doCopyInputDataToSpace(SCRATCH, filesToCopyToScratch);
long startTime = System.currentTimeMillis();
List<Future<Boolean>> transferFuturesScratch = doCopyInputDataToSpace(SCRATCH, filesToCopyToScratch);
handleResultsWhileTransferringFile(transferFuturesScratch, "LOCAL", startTime);
handleResultsWhileTransferringFile(transferFuturesScratch, "LOCAL", startTime);
} finally {
// display dataspaces error and warns if any
displayDataspacesStatus();
}
}
private Map<String, DataSpacesFileObject> createFolderHierarchySequentially(DataSpacesFileObject space,
......@@ -703,9 +714,8 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
break;
case CacheFromUserSpace:
userResultsCacheFutures.add(findFilesToCopyFromInput(USER, "USER", is, selector));
case none:
default:
//do nothing
break;
}
}
......@@ -849,55 +859,50 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
@Override
public void copyScratchDataToOutput(List<OutputSelector> outputSelectors) throws FileSystemException {
try {
if (outputSelectors == null) {
logger.debug("Output selector is empty, no file to copy");
return;
}
if (outputSelectors == null) {
logger.debug("Output selector is empty, no file to copy");
return;
}
SCRATCH.refresh();
checkOutputSpacesConfigured(outputSelectors);
ArrayList<DataSpacesFileObject> results = new ArrayList<>();
FileSystemException toBeThrown = null;
for (OutputSelector os : outputSelectors) {
org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector selector = new org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector();
selector.setIncludes(os.getOutputFiles().getIncludes());
selector.setExcludes(os.getOutputFiles().getExcludes());
switch (os.getMode()) {
case TransferToOutputSpace:
if (OUTPUT != null) {
toBeThrown = copyScratchDataToOutput(OUTPUT, "OUTPUT", os, selector, results);
}
break;
case TransferToGlobalSpace:
if (GLOBAL != null) {
toBeThrown = copyScratchDataToOutput(GLOBAL, "GLOBAL", os, selector, results);
}
break;
case TransferToUserSpace:
if (USER != null) {
toBeThrown = copyScratchDataToOutput(USER, "USER", os, selector, results);
break;
}
case none:
break;
}
SCRATCH.refresh();
results.clear();
}
checkOutputSpacesConfigured(outputSelectors);
ArrayList<DataSpacesFileObject> results = new ArrayList<>();
FileSystemException toBeThrown = null;
for (OutputSelector os : outputSelectors) {
org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector selector = new org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector();
selector.setIncludes(os.getOutputFiles().getIncludes());
selector.setExcludes(os.getOutputFiles().getExcludes());
if (toBeThrown != null) {
throw toBeThrown;
switch (os.getMode()) {
case TransferToOutputSpace:
if (OUTPUT != null) {
toBeThrown = copyScratchDataToOutput(OUTPUT, "OUTPUT", os, selector, results);
}
break;
case TransferToGlobalSpace:
if (GLOBAL != null) {
toBeThrown = copyScratchDataToOutput(GLOBAL, "GLOBAL", os, selector, results);
}
break;
case TransferToUserSpace:
if (USER != null) {
toBeThrown = copyScratchDataToOutput(USER, "USER", os, selector, results);
}
break;
default:
// do nothing
}
} finally {
// display dataspaces error and warns if any
displayDataspacesStatus();
results.clear();
}
if (toBeThrown != null) {
throw toBeThrown;
}
}
private void checkOutputSpacesConfigured(List<OutputSelector> outputSelectors) {
......@@ -990,16 +995,6 @@ public class TaskProActiveDataspaces implements TaskDataspaces {
return null;
}
/**
* Display the content of the dataspaces status buffer on stderr if non empty.
*/
private void displayDataspacesStatus() {
if (this.clientLogs.length() != 0) {
logger.warn(clientLogs);
this.clientLogs = new StringBuffer();
}
}
private void handleOutput(final DataSpacesFileObject dataspaceDestination, String spaceName,
org.objectweb.proactive.extensions.dataspaces.vfs.selector.FileSelector selector,
List<DataSpacesFileObject> results) throws FileSystemException {
......
......@@ -51,7 +51,8 @@ public class SlowDataspacesTaskLauncherFactory extends ProActiveForkedTaskLaunch
}
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return new SlowDataspaces(taskRunning);
}
......
......@@ -225,8 +225,8 @@ public class TaskLauncherTest extends TaskLauncherTestAbstract {
runTaskLauncher(createLauncherWithInjectedMocks(initializer, new TestTaskLauncherFactory() {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService,
boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataspacesMock;
}
}), executableContainer);
......@@ -250,8 +250,8 @@ public class TaskLauncherTest extends TaskLauncherTestAbstract {
runTaskLauncher(createLauncherWithInjectedMocks(initializer, new TestTaskLauncherFactory() {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService,
boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataspacesMock;
}
}), executableContainer);
......@@ -273,8 +273,8 @@ public class TaskLauncherTest extends TaskLauncherTestAbstract {
runTaskLauncher(createLauncherWithInjectedMocks(initializer, new TestTaskLauncherFactory() {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService,
boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataspacesMock;
}
}), executableContainer);
......
......@@ -64,7 +64,8 @@ public class TestTaskLauncherFactory extends ProActiveForkedTaskLauncherFactory
}
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return dataSpaces;
}
......
......@@ -33,7 +33,6 @@ import org.junit.Test;
import org.objectweb.proactive.ActiveObjectCreationException;
import org.objectweb.proactive.core.node.NodeException;
import org.objectweb.proactive.extensions.dataspaces.core.naming.NamingService;
import org.ow2.proactive.scheduler.common.TaskTerminateNotification;
import org.ow2.proactive.scheduler.common.exception.WalltimeExceededException;
import org.ow2.proactive.scheduler.common.task.TaskId;
import org.ow2.proactive.scheduler.common.task.TaskResult;
......@@ -105,7 +104,8 @@ public class WalltimeTaskLauncherTest extends TaskLauncherTestAbstract {
private class ForkingTaskLauncherFactory extends ProActiveForkedTaskLauncherFactory {
@Override
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser) {
public TaskDataspaces createTaskDataspaces(TaskId taskId, NamingService namingService, boolean isRunAsUser,
TaskLogger taskLogger) {
return new TestTaskLauncherFactory.TaskFileDataspaces();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment