Commit 197d34f0 authored by Lukáš Marek's avatar Lukáš Marek

Small fix to the ShadowObjectTable interface for object release

Fixed dispatch test
Object free events now running in parallel (based on epoch) with analysis events
One to one mapping of application VM treads producing events and ShadowVM processing threads (+ special threads for global buffers)
 - we can use std. java thread locals in analysis
parent c8a111a1
package ch.usi.dag.dislreserver.msg.analyze;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import ch.usi.dag.dislreserver.exception.DiSLREServerFatalException;
// NOTE: Task is list of analysis methods from the same thread.
// Dispatches new task when new one is added or when the running one is
// completed so there is at most one task running.
public class AnalysisDispatcher {
ConcurrentMap<Long, AnalysisThreadTasks> threadMap =
new ConcurrentHashMap<Long, AnalysisThreadTasks>();
ExecutorService execSrvc = Executors.newCachedThreadPool();
Object waitObject = new Object();
// NOTE: access to this object should be synchronized
// Holds all unprocessed tasks for some thread. It also dispatches tasks
// as necessary.
private static class AnalysisThreadTasks {
ExecutorService execSrvc;
private boolean inProgress = false;
private Queue<AnalysisTask> tasks = new LinkedList<AnalysisTask>();
public AnalysisThreadTasks(ExecutorService execSrvc) {
this.execSrvc = execSrvc;
}
// Dispatches (adds to the thread pool) new task from dispatch record if
// there is one and can be dispatched.
private void dispatchTask() {
// nothing is currently executed and something can be :)
if(inProgress == false && ! tasks.isEmpty()) {
inProgress = true;
// exec new task using thread pool
// hopefully we can do it from multiple thread see:
// http://stackoverflow.com/questions/1702386/is-threadpoolexecutor-thread-safe
execSrvc.submit(tasks.poll());
}
}
// Adds new task to the record
public synchronized void addTask(AnalysisTask newTaskToAdd) {
tasks.add(newTaskToAdd);
dispatchTask();
}
// Announces the completion of previous task
public synchronized void taskCompleted() {
inProgress = false;
// notify all waiting threads
this.notifyAll();
dispatchTask();
}
// Returns when no task is running and no task can be scheduled
public synchronized void awaitProcessing() throws InterruptedException {
while(true) {
if(inProgress == false && tasks.isEmpty()) {
return;
}
this.wait();
}
}
}
private static class AnalysisTask implements Runnable {
// holds reference to the recored where it was stored for next task
// dispatch
AnalysisThreadTasks att;
List<AnalysisInvocation> invocations;
public AnalysisTask(AnalysisThreadTasks att,
List<AnalysisInvocation> invocations) {
super();
this.att = att;
this.invocations = invocations;
}
public void run() {
// invoke all methods in this task
for(AnalysisInvocation ai : invocations) {
ai.invoke();
}
// this task is completed - dispatch new one if possible
att.taskCompleted();
}
}
public void addTask(long orderingID,
List<AnalysisInvocation> invocations) {
// add task to the dispatch record
AnalysisThreadTasks att = threadMap.get(orderingID);
if(att == null) {
// create new dispatch record
// in the case of concurrent allocations putIfAbsent guarantees only
// one proper value
att = new AnalysisThreadTasks(execSrvc);
AnalysisThreadTasks old = threadMap.putIfAbsent(orderingID, att);
// replace with proper value
if(old != null) {
att = old;
}
}
// create new task to add
AnalysisTask at = new AnalysisTask(att, invocations);
// add task - dispatch, if it is possible
att.addTask(at);
}
public void awaitProcessing() {
try {
// no new thread will be added, no new task will be added
// - only main thread is adding tasks and it will wait here :)
for(AnalysisThreadTasks att : threadMap.values()) {
att.awaitProcessing();
}
}
catch (InterruptedException e) {
throw new DiSLREServerFatalException("Main thread interupted while"
+ " waiting on analysis to complete", e);
}
}
public void exit() {
execSrvc.shutdown();
try {
// wait for termination in the loop
while(! execSrvc.awaitTermination(60, TimeUnit.SECONDS));
}
catch (InterruptedException e) {
throw new DiSLREServerFatalException("Main thread interupted while"
+ " waiting on analysis to complete", e);
}
}
}
......@@ -9,6 +9,7 @@ import java.util.List;
import ch.usi.dag.dislreserver.exception.DiSLREServerException;
import ch.usi.dag.dislreserver.msg.analyze.AnalysisResolver.AnalysisMethodHolder;
import ch.usi.dag.dislreserver.msg.analyze.mtdispatch.AnalysisDispatcher;
import ch.usi.dag.dislreserver.reqdispatch.RequestHandler;
import ch.usi.dag.dislreserver.shadow.ShadowObject;
import ch.usi.dag.dislreserver.shadow.ShadowObjectTable;
......@@ -18,6 +19,9 @@ public final class AnalysisHandler implements RequestHandler {
private AnalysisDispatcher dispatcher = new AnalysisDispatcher ();
public AnalysisDispatcher getDispatcher() {
return dispatcher;
}
public void handle (
final DataInputStream is, final DataOutputStream os, final boolean debug
......@@ -190,16 +194,14 @@ public final class AnalysisHandler implements RequestHandler {
}
public void threadEnded(long threadId) {
// TODO !
dispatcher.threadEndedEvent(threadId);
}
public void awaitProcessing () {
dispatcher.awaitProcessing ();
public void objectsFreed(long[] objFreeIDs) {
dispatcher.objectsFreedEvent(objFreeIDs);
}
public void exit () {
dispatcher.exit ();
public void exit() {
dispatcher.exit();
}
}
package ch.usi.dag.dislreserver.msg.analyze.mtdispatch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import ch.usi.dag.dislreserver.exception.DiSLREServerFatalException;
/**
* Manages executors
*/
class ATEManager {
protected final Map<Long, AnalysisTaskExecutor> liveExecutors =
new HashMap<Long, AnalysisTaskExecutor>();
protected final BlockingQueue<AnalysisTaskExecutor> endingExecutors =
new LinkedBlockingQueue<AnalysisTaskExecutor>();
/**
* Retrieves executor. Creates new one if it does not exists.
*/
public AnalysisTaskExecutor getExecutor(long id) {
AnalysisTaskExecutor ate = liveExecutors.get(id);
// create new executor
if(ate == null) {
ate = new AnalysisTaskExecutor(this);
liveExecutors.put(id, ate);
}
return ate;
}
/**
* Retrieves all live executors
*/
public Collection<AnalysisTaskExecutor> getAllLiveExecutors() {
// return copy
return new ArrayList<AnalysisTaskExecutor>(liveExecutors.values());
}
/**
* Moves executor from live queue to the ending queue
*/
public void executorIsEnding(long id) {
AnalysisTaskExecutor removedATE = liveExecutors.remove(id);
try {
endingExecutors.put(removedATE);
} catch (InterruptedException e) {
throw new DiSLREServerFatalException(
"Cannot add executor to the ending queue", e);
}
}
/**
* Changes global epoch in all executors
*/
public void globalEpochChange(long newEpoch) {
for(AnalysisTaskExecutor ate : liveExecutors.values()) {
ate.globalEpochChanged(newEpoch);
}
for(AnalysisTaskExecutor ate : endingExecutors) {
ate.globalEpochChanged(newEpoch);
}
}
/**
* Waits for all executors to process an epoch
*/
public void waitForAllToProcessEpoch(long epochToProcess) {
try {
for(AnalysisTaskExecutor ate : liveExecutors.values()) {
ate.waitForEpochProcessing(epochToProcess);
}
for(AnalysisTaskExecutor ate : endingExecutors) {
ate.waitForEpochProcessing(epochToProcess);
}
} catch (InterruptedException e) {
throw new DiSLREServerFatalException(
"Interupt occured while waiting for processing of an epoch",
e);
}
}
/**
* Waits for all executors to end
*/
public void waitForAllToEnd() {
try {
for(AnalysisTaskExecutor ate : liveExecutors.values()) {
ate.awaitTermination();
}
for(AnalysisTaskExecutor ate : endingExecutors) {
ate.awaitTermination();
}
} catch (InterruptedException e) {
throw new DiSLREServerFatalException(
"Interupt occured while waiting for executor termination",
e);
}
}
/**
* Announces executor end. Can be called concurrently.
*/
public void executorEndConcurrentCallback(AnalysisTaskExecutor ate) {
endingExecutors.remove(ate);
}
}
package ch.usi.dag.dislreserver.msg.analyze.mtdispatch;
import java.util.List;
import ch.usi.dag.dislreserver.msg.analyze.AnalysisInvocation;
// Each thread has dedicated queue where new tasks are submitted.
public class AnalysisDispatcher {
// Epoch is used during object free event sending. Each task is assigned
// with current epoch number. When free event arrives it increments the
// epoch and adds task for object free thread. The thread has to wait until
// all threads processed all tasks from this epoch and then can start
// with
protected long globalEpoch = 0;
protected final ATEManager ateManager = new ATEManager();
protected final ObjectFreeTaskExecutor oftExec =
new ObjectFreeTaskExecutor(ateManager);
public AnalysisDispatcher() {
super();
// start object free thread
oftExec.start();
}
public void addTask(long orderingID,
List<AnalysisInvocation> invocations) {
// add task to the executor
// create new task to add
AnalysisTask at = new AnalysisTask(invocations, globalEpoch);
// add task
ateManager.getExecutor(orderingID).addTask(at);
}
public void objectsFreedEvent(long[] objFreeIDs) {
// create object free task
ObjectFreeTask oft = new ObjectFreeTask(objFreeIDs, globalEpoch);
// send event
oftExec.addTask(oft);
// start new epoch
++globalEpoch;
// notify all analysis executors about epoch change
// this is important for AnalysisTaskExecutors that do not have tasks
// from new epoch
ateManager.globalEpochChange(globalEpoch);
}
// called by analysis handler when thread ended on the application vm
public void threadEndedEvent(long threadId) {
// create end of processing analysis task
AnalysisTask at = new AnalysisTask();
// send end of processing
ateManager.getExecutor(threadId).addTask(at);
// update ate manager
ateManager.executorIsEnding(threadId);
}
public void exit() {
// create end of processing analysis task
AnalysisTask at = new AnalysisTask();
// broadcast end of processing to all
for(AnalysisTaskExecutor ate : ateManager.getAllLiveExecutors()) {
ate.addTask(at);
}
// NOTE: we are not updating the executor is ending state because
// whole shadow vm is ending
// wait for ending
ateManager.waitForAllToEnd();
}
}
package ch.usi.dag.dislreserver.msg.analyze.mtdispatch;
import java.util.List;
import ch.usi.dag.dislreserver.msg.analyze.AnalysisInvocation;
/**
* Holds unprocessed task data for some thread
*/
class AnalysisTask {
protected boolean signalsEnd = false;
protected List<AnalysisInvocation> invocations;
protected long epoch;
/**
* Constructed task signals end of the processing
*/
public AnalysisTask() {
signalsEnd = true;
}
public AnalysisTask(List<AnalysisInvocation> invocations, long epoch) {
super();
this.invocations = invocations;
this.epoch = epoch;
}
public boolean isSignalingEnd() {
return signalsEnd;
}
public List<AnalysisInvocation> getInvocations() {
return invocations;
}
public long getEpoch() {
return epoch;
}
}
\ No newline at end of file
package ch.usi.dag.dislreserver.msg.analyze.mtdispatch;
import java.util.LinkedList;
import java.util.Queue;
class AnalysisTaskExecutor {
// NOTE: no epoch should have this value
private static final long THREAD_SHUTDOWN = -1;
final protected ATEManager ateManager;
protected final AnalysisThread executingThread;
// !! RULES !!
// Lock on "this" is protecting globalEpoch, executorEpoch, and taskQueue.
// All methods working with the values should be synchronized.
// Every change to any of the variable (queue) should trigger notifyAll().
// If we follow the rules, it should simply work :)
// Note that we could use two locking objects.
// One for announcing executorEpoch changes and one for announcing
// globalEpoch and taskQueue changes, but it would require more
// sophisticated locking also.
protected long globalEpoch = 0;
protected long executorEpoch = 0;
protected final Queue<AnalysisTask> taskQueue;
public AnalysisTaskExecutor(ATEManager ateManager) {
super();
this.ateManager = ateManager;
this.taskQueue = new LinkedList<AnalysisTask>();
this.executingThread = new AnalysisThread(this);
}
public synchronized void addTask(AnalysisTask at) {
taskQueue.add(at);
// changed taskQueue -> according to the rules notifyAll
this.notifyAll();
// start thread if it is not started
// we cannot start the thread in the constructor because it has
// pointer to "this"
// we could have some init function but this is exposing better API
if(! executingThread.isAlive()) {
executingThread.start();
}
}
public synchronized AnalysisTask getTask() throws InterruptedException {
// executor thread is driving whole executor from this method
// - the state of the executor epoch is updated here
// - tasks are requested here
// - executor is finalized from here
// note that at the beginning, the new task is added right away
// and executorEpoch will be updated to some meaningful value
AnalysisTask atToProcess = taskQueue.poll();
// waiting for a task
while(atToProcess == null) {
// ** no task - epoch updating only **
// update executorEpoch, notifyAll and wait
executorEpoch = globalEpoch;
// changed executorEpoch -> according to the rules notifyAll
this.notifyAll();
// wait for new task or globalEpoch update
this.wait();
atToProcess = taskQueue.poll();
}
// ** executor end **
// set proper executorEpoch, notifyAll, notify ATEManager, and
// forward end task
if(atToProcess.isSignalingEnd()) {
executorEpoch = THREAD_SHUTDOWN;
// changed executorEpoch -> according to the rules notifyAll
this.notifyAll();
ateManager.executorEndConcurrentCallback(this);
return atToProcess;
}
// ** normal task **
executorEpoch = atToProcess.epoch;
// changed executorEpoch -> according to the rules notifyAll
this.notifyAll();
return atToProcess;
}
// works with executorEpoch -> synchronized
public synchronized void globalEpochChanged(long newEpoch) {
globalEpoch = newEpoch;
// changed globalEpoch -> according to the rules notifyAll
this.notifyAll();
}
// works with executorEpoch -> synchronized
public synchronized void waitForEpochProcessing(long epochToProcess)
throws InterruptedException {
while(true) {
// epoch was reached or executor thread is shutting down
if(executorEpoch > epochToProcess
|| executorEpoch == THREAD_SHUTDOWN) {
return;
}
// wait for globalEpoch change
this.wait();
}
}
// await for executor to finish all jobs
public void awaitTermination() throws InterruptedException {
executingThread.join();
}
}
\ No newline at end of file
package ch.usi.dag.dislreserver.msg.analyze.mtdispatch;
import ch.usi.dag.dislreserver.exception.DiSLREServerFatalException;
import ch.usi.dag.dislreserver.msg.analyze.AnalysisInvocation;
/**
* Thread processing analysis tasks
*/
class AnalysisThread extends Thread {
final protected AnalysisTaskExecutor taskExecutor;
public AnalysisThread(AnalysisTaskExecutor taskHolder) {
this.taskExecutor = taskHolder;
}
public void run() {
try {
// get task to process
AnalysisTask at = taskExecutor.getTask();
while(! at.isSignalingEnd()) {
// invoke all methods in this task
for(AnalysisInvocation ai : at.getInvocations()) {
ai.invoke();
}
// get task to process
at = taskExecutor.getTask();
}
} catch (InterruptedException e) {
throw new DiSLREServerFatalException(
"Object free thread interupted while waiting on task", e);
}
}
}
\ No newline at end of file
package ch.usi.dag.dislreserver.msg.analyze.mtdispatch;
class ObjectFreeTask {
protected boolean signalsEnd = false;
protected long[] objFreeIDs;
protected long closingEpoch;
/**
* Constructed task signals end of the processing
*/
public ObjectFreeTask() {
signalsEnd = true;
}
public ObjectFreeTask(long[] objFreeIDs, long closingEpoch) {
super();
this.objFreeIDs = objFreeIDs;
this.closingEpoch = closingEpoch;
}
public boolean isSignalingEnd() {
return signalsEnd;
}
public long[] getObjFreeIDs() {
return objFreeIDs;
}
public long getClosingEpoch() {
return closingEpoch;
}
}
package ch.usi.dag.dislreserver.msg.analyze.mtdispatch;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;