Commit 34640f59 authored by Lubomir Bulej's avatar Lubomir Bulej

Update comments and style

parent 5168b26d
......@@ -8,16 +8,15 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* Manages executors
* Manages analysis task executors.
*/
class ATEManager {
// we need concurrent for waitForAllToProcessEpoch method
private final ConcurrentMap<Long, AnalysisTaskExecutor> liveExecutors =
new ConcurrentHashMap<Long, AnalysisTaskExecutor>();
private final ConcurrentMap <Long, AnalysisTaskExecutor> liveExecutors = new ConcurrentHashMap <> ();
private final BlockingQueue <AnalysisTaskExecutor> endingExecutors = new LinkedBlockingQueue <> ();
private final BlockingQueue<AnalysisTaskExecutor> endingExecutors =
new LinkedBlockingQueue<AnalysisTaskExecutor>();
/**
* Retrieves executor. Creates new one if it does not exists.
......@@ -47,80 +46,81 @@ class ATEManager {
/**
* Moves executor from live queue to the ending queue
*/
void executorIsEnding(long id) {
AnalysisTaskExecutor removedATE = liveExecutors.remove(id);
void executorIsEnding (final long id) {
final AnalysisTaskExecutor removedTaskExecutor = liveExecutors.remove (id);
try {
endingExecutors.put(removedATE);
} catch (InterruptedException e) {
endingExecutors.put (removedTaskExecutor);
} catch (final InterruptedException e) {
throw new DiSLREServerFatalException (
"Cannot add executor to the ending queue", e);
"Cannot add executor to the ending queue", e
);
}
}
/**
* Changes global epoch in all executors
*/
void globalEpochChange(long newEpoch) {
for(AnalysisTaskExecutor ate : liveExecutors.values()) {
ate.globalEpochChanged(newEpoch);
void globalEpochChange (final long newEpoch) {
for (final AnalysisTaskExecutor ate : liveExecutors.values ()) {
ate.globalEpochChanged (newEpoch);
}
for(AnalysisTaskExecutor ate : endingExecutors) {
ate.globalEpochChanged(newEpoch);
for (final AnalysisTaskExecutor ate : endingExecutors) {
ate.globalEpochChanged (newEpoch);
}
}
/**
* Waits for all executors to process an epoch
*/
void waitForAllToProcessEpoch(long epochToProcess) {
void waitForAllToProcessEpoch (final long epochToProcess) {
try {
for(AnalysisTaskExecutor ate : liveExecutors.values()) {
ate.waitForEpochProcessing(epochToProcess);
for (final AnalysisTaskExecutor ate : liveExecutors.values ()) {
ate.waitForEpochProcessing (epochToProcess);
}
for(AnalysisTaskExecutor ate : endingExecutors) {
ate.waitForEpochProcessing(epochToProcess);
for (final AnalysisTaskExecutor ate : endingExecutors) {
ate.waitForEpochProcessing (epochToProcess);
}
} catch (InterruptedException e) {
throw new DiSLREServerFatalException(
"Interupt occured while waiting for processing of an epoch",
e);
} catch (final InterruptedException e) {
throw new DiSLREServerFatalException (
"Interupt occured while waiting for processing of an epoch", e
);
}
}
/**
* Waits for all executors to end
*/
void waitForAllToEnd() {
try {
for(AnalysisTaskExecutor ate : liveExecutors.values()) {
ate.awaitTermination();
/**
* Waits for all executors to end
*/
void waitForAllToEnd () {
try {
for (final AnalysisTaskExecutor ate : liveExecutors.values ()) {
ate.awaitTermination ();
}
for(AnalysisTaskExecutor ate : endingExecutors) {
ate.awaitTermination();
for (final AnalysisTaskExecutor ate : endingExecutors) {
ate.awaitTermination ();
}
} catch (InterruptedException e) {
throw new DiSLREServerFatalException(
"Interupt occured while waiting for executor termination",
e);
} catch (final InterruptedException e) {
throw new DiSLREServerFatalException (
"Interupt occured while waiting for executor termination", e
);
}
}
/**
* Announces executor end. Can be called concurrently.
*/
void executorEndConcurrentCallback(AnalysisTaskExecutor ate) {
endingExecutors.remove(ate);
void executorEndConcurrentCallback (final AnalysisTaskExecutor ate) {
endingExecutors.remove (ate);
}
}
......@@ -8,7 +8,8 @@ import ch.usi.dag.util.logging.Logger;
// Each thread has dedicated queue where new tasks are submitted.
class AnalysisDispatcher {
private final Logger __log = Logging.getPackageInstance ();
/** The logger to use. */
private final Logger __log;
// Epoch is used during object free event sending. Each task is assigned
// with current epoch number. When free event arrives it increments the
......@@ -17,13 +18,16 @@ class AnalysisDispatcher {
// with
private long __globalEpoch = 0;
private final ATEManager __ateManager = new ATEManager();
private final ATEManager __ateManager = new ATEManager ();
private final ObjectFreeTaskExecutor __objectFreeExecutor;
private final Thread __objectFreeThread;
AnalysisDispatcher(final SHVMContext shvmContext) {
AnalysisDispatcher (final SHVMContext shvmContext) {
__log = shvmContext.log ();
__objectFreeExecutor = new ObjectFreeTaskExecutor (__ateManager, shvmContext);
__objectFreeThread = new Thread (__objectFreeExecutor, "Shadow VM Object Free");
......@@ -32,16 +36,15 @@ class AnalysisDispatcher {
}
/**
* Adds an analysis invocation task to the executor corresponding to the
* given ordering id.
*/
void addTask (
final long orderingId, final List <AnalysisInvocation> invocations
) {
// add task to the executor
// create new task to add
final AnalysisTask at = new AnalysisTask (invocations, __globalEpoch);
// add task
__ateManager.getExecutor (orderingId).addTask (at);
final AnalysisTask task = new AnalysisTask (invocations, __globalEpoch);
__ateManager.getExecutor (orderingId).addTask (task);
}
......@@ -64,11 +67,11 @@ class AnalysisDispatcher {
// called by analysis handler when thread ended on the application vm
void threadEndedEvent (final long threadId) {
// create end of processing analysis task
final AnalysisTask at = new AnalysisTask ();
// Create termination task.
final AnalysisTask terminationTask = new AnalysisTask ();
// send end of processing
__ateManager.getExecutor (threadId).addTask (at);
__ateManager.getExecutor (threadId).addTask (terminationTask);
// update ate manager
__ateManager.executorIsEnding (threadId);
......
......@@ -12,7 +12,7 @@ final class AnalysisTask {
final List <AnalysisInvocation> __invocations;
/** The epoch to which the analysis invocations belong. */
final long __epoch;
private final long __epoch;
/**
......@@ -24,8 +24,8 @@ final class AnalysisTask {
/**
* Initializes this {@link AnalysisTask} with a list of invocations
* associated with the given epoch.
* Initializes this {@link AnalysisTask} with the analysis method
* invocations associated with the given epoch.
*
* @param invocations
* Analysis invocations to perform.
......@@ -44,11 +44,11 @@ final class AnalysisTask {
List <AnalysisInvocation> invocations () {
return __invocations;
public long epoch () {
return __epoch;
}
public long epoch () {
return __epoch;
}
}
......@@ -45,10 +45,11 @@ class AnalysisTaskExecutor {
private final Queue <AnalysisTask> __taskQueue = new LinkedList <> ();
AnalysisTaskExecutor (final ATEManager ateManager) {
AnalysisTaskExecutor (final long orderingId, final ATEManager ateManager) {
__ateManager = ateManager;
__executingThread = new Thread (
new AnalysisThread (this), "Shadow VM Analysis"
new AnalysisThread (this),
String.format ("Shadow VM Analysis (%d)", orderingId)
);
}
......@@ -78,10 +79,10 @@ class AnalysisTaskExecutor {
// note that at the beginning, the new task is added right away
// and executorEpoch will be updated to some meaningful value
AnalysisTask atToProcess = __taskQueue.poll ();
AnalysisTask taskToProcess = __taskQueue.poll ();
// waiting for a task
while (atToProcess == null) {
while (taskToProcess == null) {
// ** no task - epoch updating only **
// update executorEpoch, notifyAll and wait
......@@ -92,29 +93,29 @@ class AnalysisTaskExecutor {
// wait for new task or globalEpoch update
this.wait ();
atToProcess = __taskQueue.poll ();
taskToProcess = __taskQueue.poll ();
}
// ** executor end **
// set proper executorEpoch, notifyAll, notify ATEManager, and
// forward end task
if (atToProcess.isSignalingEnd ()) {
if (taskToProcess.isSignalingEnd ()) {
__executorEpoch = THREAD_SHUTDOWN;
// changed executorEpoch -> according to the rules notifyAll
notifyAll ();
__ateManager.executorEndConcurrentCallback (this);
return atToProcess;
return taskToProcess;
}
// ** normal task **
__executorEpoch = atToProcess.__epoch;
__executorEpoch = taskToProcess.epoch ();
// changed executorEpoch -> according to the rules notifyAll
notifyAll();
return atToProcess;
return taskToProcess;
}
......
......@@ -59,6 +59,10 @@ final class AnalysisThread implements Runnable {
thread.getName (), thread.getId ()
);
}
__log.debug (
"thread '%s (%d)' exitting", thread.getName (), thread.getId ()
);
}
}
......@@ -42,4 +42,5 @@ final class ObjectFreeTask {
long closingEpoch () {
return __closingEpoch;
}
}
......@@ -14,8 +14,6 @@ import ch.usi.dag.util.logging.Logger;
*/
final class ObjectFreeTaskExecutor implements Runnable {
private final Logger __log = Logging.getPackageInstance ();
private final BlockingQueue <ObjectFreeTask> __taskQueue = new LinkedBlockingQueue <> ();
private final ATEManager __ateManager;
......@@ -83,10 +81,11 @@ final class ObjectFreeTaskExecutor implements Runnable {
* to finish the closing epoch, invoke
*/
@Override
public void run() {
public void run () {
final Logger log = __shvmContext.log ();
final Thread thread = Thread.currentThread ();
__log.debug (
log.debug (
"thread '%s (%d)' started", thread.getName (), thread.getId ()
);
......@@ -113,10 +112,14 @@ final class ObjectFreeTaskExecutor implements Runnable {
// in the queue. This usually means that the server has been terminated,
// so we just log the situation and terminate this thread gracefully.
//
__log.debug (
log.debug (
"thread '%s (%d)' interrupted while waiting for a task",
thread.getName (), thread.getId ()
);
}
log.debug (
"thread '%s (%d)' exitting", thread.getName (), thread.getId ()
);
}
}
......@@ -41,10 +41,12 @@ final class AnalysisHandler {
/**
* Handle the RPC request to analysis call.
*
* @param orderingId Total ordering ID
* @param invocationCount Number of method invocations
* @param rawData Input stream with the rest of the data.
* @throws DiSLREServerException
* @param orderingId
* Total ordering ID
* @param invocationCount
* Number of method invocations
* @param rawData
* Input stream with the rest of the data.
*/
void handle (
final long orderingId, final int invocationCount,
......@@ -209,12 +211,14 @@ final class AnalysisHandler {
);
}
void threadEnded(final long threadId) {
__dispatcher.threadEndedEvent(threadId);
void threadEnded (final long threadId) {
__dispatcher.threadEndedEvent (threadId);
}
void objectsFreed(final long[] objFreeIDs) {
__dispatcher.objectsFreedEvent(objFreeIDs);
void objectsFreed (final long [] objectIds) {
__dispatcher.objectsFreedEvent (objectIds);
}
void exit() {
......
/**
* This file is part of disl project
* Author: Vit Kabele <vit@kabele.me>
* Created on the 22/02/2019
*/
package ch.usi.dag.shvm;
import ch.usi.dag.dislreserver.remoteanalysis.RemoteAnalysis;
import ch.usi.dag.dislreserver.shadow.ShadowClassTable;
import ch.usi.dag.dislreserver.shadow.ShadowObject;
import ch.usi.dag.dislreserver.shadow.ShadowObjectTable;
import ch.usi.dag.shvm.AnalysisRegistry.ResolveException;
import ch.usi.dag.util.logging.Logger;
/**
* This class is meant to keep all the information about
* Shadow object and classes, remote analysis methods etc.
* Represents a Shadow VM context. Keeps information about all
* {@link ShadowObject} and {@link RemoteAnalysis} instances, and maintains
* mapping between analysis method identifiers used by the client and the actual
* methods in analysis class instances.
*
* It's the main class that ties together the components
* of SHVM application.
* @author Vit Kabele <vit@kabele.me>
* @author Lubomir Bulej <bulej@d3s.mff.cuni.cz>
*/
public class SHVMContext {
......@@ -46,38 +46,36 @@ public class SHVMContext {
/**
* Gets the {@link ShadowClassTable}
* @return An registered instance of {@link ShadowClassTable}
* @return The {@link ShadowClassTable} associated with this context.
*/
public ShadowClassTable getShadowClassTable()
{
public ShadowClassTable getShadowClassTable () {
return __shadowClassTable;
}
/**
* Gets the {@link ShadowObjectTable}
* @return An registered instance of {@link ShadowObjectTable}
* @return The {@link ShadowObjectTable} associated with this context.
*/
public ShadowObjectTable getShadowObjectTable()
{
public ShadowObjectTable getShadowObjectTable () {
return __shadowObjectTable;
}
/**
* Gets the {@link AnalysisResolver}
* @return An registered instance of {@link AnalysisResolver}
* @return The {@link Logger} associated with this context.
*/
public AnalysisResolver getAnalysisResolver()
{
return __analysisResolver;
public Logger log () {
return __log;
}
/**
* Gets the {@link Logger}
* @return An registered instance of {@link Logger}
*/
public Logger log () {
return __log;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment