Commit aae4d5ef authored by Lubomir Bulej's avatar Lubomir Bulej

Initial style cleanups for Shadow VM

parent 5ad37be0
......@@ -2,103 +2,116 @@ package ch.usi.dag.shvm;
import java.util.List;
import ch.usi.dag.util.logging.Logger;
// Each thread has dedicated queue where new tasks are submitted.
class AnalysisDispatcher {
private final Logger __log = Logging.getPackageInstance ();
// Epoch is used during object free event sending. Each task is assigned
// with current epoch number. When free event arrives it increments the
// epoch and adds task for object free thread. The thread has to wait until
// all threads processed all tasks from this epoch and then can start
// with
private long globalEpoch = 0;
private long __globalEpoch = 0;
private final ATEManager ateManager = new ATEManager();
private final ATEManager __ateManager = new ATEManager();
private final ObjectFreeTaskExecutor oftExec;
private final ObjectFreeTaskExecutor __objectFreeExecutor;
AnalysisDispatcher(final SHVMContext shvmContext) {
super();
private final Thread __objectFreeThread;
oftExec =
new ObjectFreeTaskExecutor(ateManager, shvmContext);
AnalysisDispatcher(final SHVMContext shvmContext) {
__objectFreeExecutor = new ObjectFreeTaskExecutor (__ateManager, shvmContext);
__objectFreeThread = new Thread (__objectFreeExecutor, "Shadow VM Object Free");
// start object free thread
oftExec.start();
__objectFreeThread.start ();
}
void addTask(long orderingID,
List<AnalysisInvocation> invocations) {
void addTask (
final long orderingId, final List <AnalysisInvocation> invocations
) {
// add task to the executor
// create new task to add
AnalysisTask at = new AnalysisTask(invocations, globalEpoch);
final AnalysisTask at = new AnalysisTask (invocations, __globalEpoch);
// add task
ateManager.getExecutor(orderingID).addTask(at);
__ateManager.getExecutor (orderingId).addTask (at);
}
void objectsFreedEvent(long[] objFreeIDs) {
void objectsFreedEvent (final long [] objectIds) {
// create object free task
ObjectFreeTask oft = new ObjectFreeTask(objFreeIDs, globalEpoch);
final ObjectFreeTask oft = new ObjectFreeTask (objectIds, __globalEpoch);
// send event
oftExec.addTask(oft);
__objectFreeExecutor.addTask (oft);
// start new epoch
++globalEpoch;
__globalEpoch++;
// notify all analysis executors about epoch change
// this is important for AnalysisTaskExecutors that do not have tasks
// from new epoch
ateManager.globalEpochChange(globalEpoch);
__ateManager.globalEpochChange (__globalEpoch);
}
// called by analysis handler when thread ended on the application vm
void threadEndedEvent(long threadId) {
// called by analysis handler when thread ended on the application vm
void threadEndedEvent (final long threadId) {
// create end of processing analysis task
AnalysisTask at = new AnalysisTask();
final AnalysisTask at = new AnalysisTask ();
// send end of processing
ateManager.getExecutor(threadId).addTask(at);
__ateManager.getExecutor (threadId).addTask (at);
// update ate manager
ateManager.executorIsEnding(threadId);
__ateManager.executorIsEnding (threadId);
}
void exit() {
void exit() {
// create end of processing analysis task
AnalysisTask at = new AnalysisTask();
final AnalysisTask at = new AnalysisTask ();
// broadcast end of processing to all
for(AnalysisTaskExecutor ate : ateManager.getAllLiveExecutors()) {
ate.addTask(at);
for (final AnalysisTaskExecutor ate : __ateManager.getAllLiveExecutors ()) {
ate.addTask (at);
}
// NOTE: we are not updating the executor is ending state because
// whole shvm vm is ending
// wait for analysis threads
ateManager.waitForAllToEnd();
__ateManager.waitForAllToEnd ();
// wait for free thread
try {
// signal end
oftExec.addTask(new ObjectFreeTask());
__objectFreeExecutor.addTask (new ObjectFreeTask ());
// wait for end
oftExec.join();
} catch (InterruptedException e) {
throw new DiSLREServerFatalException (
"Interrupted while waiting on obj free thread to finish",
e);
__objectFreeThread.join ();
} catch (final InterruptedException e) {
//
// The thread has been interrupted while waiting for the object free
// thread to finish. This usually means that the server has been
// terminated, so we just log the situation and exit gracefully.
//
__log.debug (
"thread '%s' interrupted while waiting for thread '%s'",
__threadInfo (Thread.currentThread ()), __threadInfo (__objectFreeThread)
);
}
}
private static final String __threadInfo (final Thread thread) {
return String.format ("%s (%d)", thread.getName (), thread.getId ());
}
}
......@@ -2,55 +2,53 @@ package ch.usi.dag.shvm;
import java.util.List;
/**
* Holds unprocessed task data for some thread
* Holds unprocessed task data for some thread.
*/
class AnalysisTask {
final class AnalysisTask {
/**
* Used as mark at the end of queue.
*/
private boolean signalsEnd = false;
/** Analysis invocations to be performed. */
final List <AnalysisInvocation> __invocations;
/**
* List of invocations in this task
*/
List<AnalysisInvocation> invocations;
/** The epoch to which the analysis invocations belong. */
final long __epoch;
/**
* The epoch determined by objectFree events
*/
long epoch;
/**
* Constructed task signals end of the processing
* Initializes this {@link AnalysisTask} to signal end of processing.
*/
AnalysisTask() {
signalsEnd = true;
AnalysisTask () {
this (null, -1);
}
/**
* Construct non empty task.
* Initializes this {@link AnalysisTask} with a list of invocations
* associated with the given epoch.
*
* @param invocations List of analysis invocations
* @param invocations
* Analysis invocations to perform.
* @param epoch
*/
AnalysisTask(List<AnalysisInvocation> invocations, long epoch) {
super();
this.invocations = invocations;
this.epoch = epoch;
AnalysisTask (final List <AnalysisInvocation> invocations, final long epoch) {
__invocations = invocations;
__epoch = epoch;
}
boolean isSignalingEnd() {
return signalsEnd;
boolean isSignalingEnd () {
return __invocations == null;
}
List<AnalysisInvocation> getInvocations() {
return invocations;
List <AnalysisInvocation> invocations () {
return __invocations;
}
public long getEpoch() {
return epoch;
public long epoch () {
return __epoch;
}
}
......@@ -23,9 +23,9 @@ class AnalysisTaskExecutor {
// NOTE: no epoch should have this value
private static final long THREAD_SHUTDOWN = -1;
private final ATEManager ateManager;
private final ATEManager __ateManager;
private final AnalysisThread executingThread;
private final Thread __executingThread;
// !! RULES !!
// Lock on "this" is protecting globalEpoch, executorEpoch, and taskQueue.
......@@ -38,35 +38,38 @@ class AnalysisTaskExecutor {
// globalEpoch and taskQueue changes, but it would require more
// sophisticated locking also.
private long globalEpoch = 0;
private long __globalEpoch = 0;
private long executorEpoch = 0;
private final Queue<AnalysisTask> taskQueue;
private long __executorEpoch = 0;
AnalysisTaskExecutor(ATEManager ateManager) {
super();
this.ateManager = ateManager;
this.taskQueue = new LinkedList<AnalysisTask>();
this.executingThread = new AnalysisThread(this);
private final Queue <AnalysisTask> __taskQueue = new LinkedList <> ();
AnalysisTaskExecutor (final ATEManager ateManager) {
__ateManager = ateManager;
__executingThread = new Thread (
new AnalysisThread (this), "Shadow VM Analysis"
);
}
synchronized void addTask(AnalysisTask at) {
taskQueue.add(at);
synchronized void addTask (final AnalysisTask at) {
__taskQueue.add (at);
// changed taskQueue -> according to the rules notifyAll
this.notifyAll();
notifyAll ();
// start thread if it is not started
// we cannot start the thread in the constructor because it has
// pointer to "this"
// we could have some init function but this is exposing better API
if (!executingThread.isAlive()) {
executingThread.start();
if (!__executingThread.isAlive ()) {
__executingThread.start ();
}
}
synchronized AnalysisTask getTask() throws InterruptedException {
synchronized AnalysisTask getTask () throws InterruptedException {
// executor thread is driving whole executor from this method
// - the state of the executor epoch is updated here
// - tasks are requested here
......@@ -75,63 +78,61 @@ class AnalysisTaskExecutor {
// note that at the beginning, the new task is added right away
// and executorEpoch will be updated to some meaningful value
AnalysisTask atToProcess = taskQueue.poll();
AnalysisTask atToProcess = __taskQueue.poll ();
// waiting for a task
while (atToProcess == null) {
// ** no task - epoch updating only **
// update executorEpoch, notifyAll and wait
executorEpoch = globalEpoch;
__executorEpoch = __globalEpoch;
// changed executorEpoch -> according to the rules notifyAll
this.notifyAll();
notifyAll ();
// wait for new task or globalEpoch update
this.wait();
this.wait ();
atToProcess = taskQueue.poll();
atToProcess = __taskQueue.poll ();
}
// ** executor end **
// set proper executorEpoch, notifyAll, notify ATEManager, and
// forward end task
if (atToProcess.isSignalingEnd()) {
executorEpoch = THREAD_SHUTDOWN;
if (atToProcess.isSignalingEnd ()) {
__executorEpoch = THREAD_SHUTDOWN;
// changed executorEpoch -> according to the rules notifyAll
this.notifyAll();
ateManager.executorEndConcurrentCallback(this);
notifyAll ();
__ateManager.executorEndConcurrentCallback (this);
return atToProcess;
}
// ** normal task **
executorEpoch = atToProcess.epoch;
__executorEpoch = atToProcess.__epoch;
// changed executorEpoch -> according to the rules notifyAll
this.notifyAll();
notifyAll();
return atToProcess;
}
// works with executorEpoch -> synchronized
synchronized void globalEpochChanged(long newEpoch) {
globalEpoch = newEpoch;
// works with executorEpoch -> synchronized
synchronized void globalEpochChanged (final long newEpoch) {
__globalEpoch = newEpoch;
// changed globalEpoch -> according to the rules notifyAll
this.notifyAll();
notifyAll ();
}
// works with executorEpoch -> synchronized
synchronized void waitForEpochProcessing(long epochToProcess)
throws InterruptedException {
// works with executorEpoch -> synchronized
synchronized void waitForEpochProcessing (
final long epochToProcess
) throws InterruptedException {
while (true) {
// epoch was reached or executor thread is shutting down
if (executorEpoch > epochToProcess
|| executorEpoch == THREAD_SHUTDOWN) {
if (__executorEpoch > epochToProcess || __executorEpoch == THREAD_SHUTDOWN) {
return;
}
......@@ -140,8 +141,9 @@ class AnalysisTaskExecutor {
}
}
// await for executor to finish all jobs
void awaitTermination() throws InterruptedException {
executingThread.join();
void awaitTermination () throws InterruptedException {
__executingThread.join ();
}
}
package ch.usi.dag.shvm;
import ch.usi.dag.util.logging.Logger;
/**
* Thread processing analysis tasks
* Thread processing analysis tasks.
*/
class AnalysisThread extends Thread {
final class AnalysisThread implements Runnable {
private final Logger __log = Logging.getPackageInstance ();
private final AnalysisTaskExecutor __taskExecutor;
private final AnalysisTaskExecutor taskExecutor;
AnalysisThread(AnalysisTaskExecutor taskHolder) {
this.taskExecutor = taskHolder;
AnalysisThread (final AnalysisTaskExecutor taskExecutor) {
__taskExecutor = taskExecutor;
}
public void run() {
try {
@Override
public void run () {
final Thread thread = Thread.currentThread ();
// get task to process
AnalysisTask at = taskExecutor.getTask();
__log.debug (
"thread '%s (%d)' started", thread.getName (), thread.getId ()
);
while(! at.isSignalingEnd()) {
try {
// Wait for the first task to process.
AnalysisTask task = __taskExecutor.getTask ();
while (!task.isSignalingEnd ()) {
// invoke all methods in this task
for(AnalysisInvocation ai : at.getInvocations()) {
ai.invoke();
for (final AnalysisInvocation ai : task.invocations ()) {
try {
ai.invoke();
} catch (final Exception e) {
// Report failures in analysis invocations but keep going.
__log.error (e, "exception in analysis %s: %s\n",
ai.toString (), e.getMessage ()
);
}
}
// get task to process
at = taskExecutor.getTask();
// Wait for the next task to process.
task = __taskExecutor.getTask ();
}
} catch (InterruptedException e) {
throw new DiSLREServerFatalException (
"Object free thread interupted while waiting on task", e);
} catch (final InterruptedException e) {
//
// The thread has been interrupted while waiting for a task to arrive
// in the queue. This usually means that the server has been terminated,
// so we just log the situation and terminate this thread gracefully.
//
__log.debug (
"thread '%s (%d)' interrupted while waiting for a task",
thread.getName (), thread.getId ()
);
}
}
......
package ch.usi.dag.shvm;
class ObjectFreeTask {
final class ObjectFreeTask {
private final long [] __objectIds;
private final long __closingEpoch;
private boolean signalsEnd = false;
private long[] objFreeIDs;
private long closingEpoch;
/**
* Constructed task signals end of the processing
* Initializes this {@link ObjectFreeTask} to signal end of processing.
*/
ObjectFreeTask() {
signalsEnd = true;
ObjectFreeTask () {
this (null, -1);
}
ObjectFreeTask(long[] objFreeIDs, long closingEpoch) {
super();
this.objFreeIDs = objFreeIDs;
this.closingEpoch = closingEpoch;
/**
* Initializes this {@link ObjectFreeTask} with an array of object
* identifiers to be freed, marking the end of the given epoch.
*
* @param objectIds
* Object ids corresponding to garbage-collected objects.
* @param closingEpoch
*/
ObjectFreeTask (final long [] objectIds, final long closingEpoch) {
__objectIds = objectIds;
__closingEpoch = closingEpoch;
}
boolean isSignalingEnd() {
return signalsEnd;
boolean isSignalingEnd () {
return __objectIds == null;
}
long[] getObjFreeIDs() {
return objFreeIDs;
long [] objectIds () {
return __objectIds;
}
long getClosingEpoch() {
return closingEpoch;
long closingEpoch () {
return __closingEpoch;
}
}
package ch.usi.dag.shvm;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import ch.usi.dag.dislreserver.remoteanalysis.RemoteAnalysis;
import ch.usi.dag.dislreserver.shadow.ShadowObject;
import ch.usi.dag.dislreserver.shadow.ShadowObjectTable;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import ch.usi.dag.util.logging.Logger;
/**
* The executor for {@link ObjectFreeTask} tasks.
*/
class ObjectFreeTaskExecutor extends Thread {
final class ObjectFreeTaskExecutor implements Runnable {
private final ATEManager ateManager;
private final Logger __log = Logging.getPackageInstance ();
private final BlockingQueue<ObjectFreeTask> taskQueue = new LinkedBlockingQueue<ObjectFreeTask>();
private final BlockingQueue <ObjectFreeTask> __taskQueue = new LinkedBlockingQueue <> ();
/**
* Current shvm VM context.
*/
private final ATEManager __ateManager;
/** Current Shadow VM context. */
private final SHVMContext __shvmContext;
......@@ -30,59 +30,49 @@ class ObjectFreeTaskExecutor extends Thread {
* @param ateManager
* @param shvmContext Shadow VM context.
*/
ObjectFreeTaskExecutor(
final ATEManager ateManager,
final SHVMContext shvmContext
)
{
super();
this.ateManager = ateManager;
ObjectFreeTaskExecutor (
final ATEManager ateManager, final SHVMContext shvmContext
) {
__ateManager = ateManager;
__shvmContext = shvmContext;
}
/**
* Enqueue new task to execute.
* Enqueues a new task to be executed.
*
* @param oft
* @param task The task.
*/
void addTask(ObjectFreeTask oft) {
taskQueue.add(oft);
void addTask (final ObjectFreeTask task) {
__taskQueue.add (task);
}
private void invokeObjectFreeAnalysisHandlers(long objectFreeID) {
// TODO free events should be sent to analysis that sees the shvm object
private void invokeObjectFreeAnalysisHandlers (final long objectId) {
//
// Convert object id to shadow object, notify all analyzes about the
// object being released, and then release it.
//
// TODO YZ: Object-free events should only be sent to analyzes that
// have seen the shadow object.
//
final ShadowObjectTable shadowObjectTable = __shvmContext.getShadowObjectTable ();
final ShadowObject object = shadowObjectTable.get (objectId);
// retrieve shvm object
ShadowObject obj = shadowObjectTable.get(objectFreeID);
// get all analysis objects
Set<RemoteAnalysis> raSet = __shvmContext.getAnalysisResolver ().getAllAnalyses();
// invoke object free
for (RemoteAnalysis ra : raSet) {
for (final RemoteAnalysis ra : __shvmContext.getAnalysisResolver ().getAllAnalyses ()) {
try {
ra.objectFree(obj);
ra.objectFree (object);
} catch (final Exception e) {
// report error during analysis invocation
System.err.format (
"DiSL-RE: exception in analysis %s.objectFree(): %s\n",
ra.getClass ().getName (), e.getMessage ()
} catch (final Throwable t) {
// Report failures in analysis invocations but keep going.
__log.error (
t, "exception in analysis %s.objectFree(): %s\n",
ra.getClass ().getName (), t.getMessage ()
);
final Throwable cause = e.getCause ();
if (cause != null) {
cause.printStackTrace (System.err);
}
}
}
// release shvm object
shadowObjectTable.freeShadowObject(obj);
shadowObjectTable.freeShadowObject (object);
}
......@@ -92,30 +82,41 @@ class ObjectFreeTaskExecutor extends Thread {
* Dequeue single {@link ObjectFreeTask} wait for all the analysis executors
* to finish the closing epoch, invoke
*/
@Override
public void run() {
final Thread thread = Thread.currentThread ();
try {
ObjectFreeTask oft = taskQueue.take();
__log.debug (