Commit 76eb4501 authored by Lubomir Bulej's avatar Lubomir Bulej

Switch to handle-based analysis invocation

parent 399e767e
package ch.usi.dag.shvm;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -21,27 +20,13 @@ class ATEManager {
/**
* Retrieves executor. Creates new one if it does not exists.
*/
AnalysisTaskExecutor getExecutor(long id) {
AnalysisTaskExecutor ate = liveExecutors.get(id);
// create new executor
if(ate == null) {
ate = new AnalysisTaskExecutor(this);
liveExecutors.put(id, ate);
}
return ate;
AnalysisTaskExecutor getExecutor (final long orderingId) {
// Get or create executor.
return liveExecutors.computeIfAbsent (
orderingId, id -> new AnalysisTaskExecutor (id, this)
);
}
/**
* Retrieves all live executors
*/
Iterable<AnalysisTaskExecutor> getAllLiveExecutors() {
return Collections.unmodifiableCollection(liveExecutors.values());
}
/**
* Moves executor from live queue to the ending queue
......@@ -94,7 +79,15 @@ class ATEManager {
/**
* Signals all live task executors to stop processing tasks.
*/
void signalEndToAll () {
// Create a termination task and submit it to all task executors.
final AnalysisTask terminationTask = new AnalysisTask ();
for (final AnalysisTaskExecutor ate : liveExecutors.values ()) {
ate.addTask (terminationTask);
}
}
/**
......
package ch.usi.dag.shvm;
import java.util.List;
import ch.usi.dag.shvm.AnalysisMethod.Invocation;
import ch.usi.dag.util.logging.Logger;
......@@ -41,7 +40,7 @@ class AnalysisDispatcher {
* given ordering id.
*/
void addTask (
final long orderingId, final List <AnalysisInvocation> invocations
final long orderingId, final Invocation [] invocations
) {
final AnalysisTask task = new AnalysisTask (invocations, __globalEpoch);
__ateManager.getExecutor (orderingId).addTask (task);
......@@ -79,13 +78,7 @@ class AnalysisDispatcher {
void exit() {
// create end of processing analysis task
final AnalysisTask at = new AnalysisTask ();
// broadcast end of processing to all
for (final AnalysisTaskExecutor ate : __ateManager.getAllLiveExecutors ()) {
ate.addTask (at);
}
__ateManager.signalEndToAll ();
// NOTE: we are not updating the executor is ending state because
// whole shvm vm is ending
......
package ch.usi.dag.shvm;
import java.util.List;
import ch.usi.dag.shvm.AnalysisMethod.Invocation;
import ch.usi.dag.util.logging.Logger;
/**
* Holds unprocessed task data for some thread.
* Represents a task responsible for invoking analysis methods.
*/
final class AnalysisTask {
/** Analysis invocations to be performed. */
final List <AnalysisInvocation> __invocations;
private final Invocation [] __invocations;
/** The epoch to which the analysis invocations belong. */
private final long __epoch;
......@@ -31,7 +32,9 @@ final class AnalysisTask {
* Analysis invocations to perform.
* @param epoch
*/
AnalysisTask (final List <AnalysisInvocation> invocations, final long epoch) {
AnalysisTask (
final Invocation [] invocations, final long epoch
) {
__invocations = invocations;
__epoch = epoch;
}
......@@ -42,13 +45,27 @@ final class AnalysisTask {
}
List <AnalysisInvocation> invocations () {
return __invocations;
public long epoch () {
return __epoch;
}
/**
* Invokes all analysis methods methods in this task. Exceptions occurring
* during execution of analysis methods are logged, but do not prevent the
* task from completing all invocations.
*/
public void run (final Logger log) {
for (final Invocation invocation : __invocations) {
try {
invocation.invoke ();
} catch (final Throwable t) {
log.error (
t, "analysis method invocation failed: %s", invocation
);
}
}
}
}
......@@ -31,18 +31,8 @@ final class AnalysisThread implements Runnable {
AnalysisTask task = __taskExecutor.getTask ();
while (!task.isSignalingEnd ()) {
// invoke all methods in this task
for (final AnalysisInvocation ai : task.invocations ()) {
try {
ai.invoke();
} catch (final Exception e) {
// Report failures in analysis invocations but keep going.
__log.error (e, "exception in analysis %s: %s\n",
ai.toString (), e.getMessage ()
);
}
}
// Invoke analysis methods.
task.run (__log);
// Wait for the next task to process.
task = __taskExecutor.getTask ();
......
......@@ -3,9 +3,6 @@ package ch.usi.dag.shvm;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import ch.usi.dag.dislreserver.remoteanalysis.RemoteAnalysis;
import ch.usi.dag.dislreserver.shadow.ShadowObject;
import ch.usi.dag.dislreserver.shadow.ShadowObjectTable;
import ch.usi.dag.util.logging.Logger;
......@@ -45,35 +42,6 @@ final class ObjectFreeTaskExecutor implements Runnable {
__taskQueue.add (task);
}
private void invokeObjectFreeAnalysisHandlers (final long objectId) {
//
// Convert object id to shadow object, notify all analyzes about the
// object being released, and then release it.
//
// TODO YZ: Object-free events should only be sent to analyzes that
// have seen the shadow object.
//
final ShadowObjectTable shadowObjectTable = __shvmContext.getShadowObjectTable ();
final ShadowObject object = shadowObjectTable.get (objectId);
for (final RemoteAnalysis ra : __shvmContext.getAnalysisResolver ().getAllAnalyses ()) {
try {
ra.objectFree (object);
} catch (final Throwable t) {
// Report failures in analysis invocations but keep going.
__log.error (
t, "exception in analysis %s.objectFree(): %s\n",
ra.getClass ().getName (), t.getMessage ()
);
}
}
shadowObjectTable.freeShadowObject (object);
}
/**
* Thread main method.
*
......@@ -97,10 +65,7 @@ final class ObjectFreeTaskExecutor implements Runnable {
// Wait for all analysis executors to finish the closing epoch.
__ateManager.waitForAllToProcessEpoch (task.closingEpoch ());
// Invoke object free analysis handler for each free object.
for (final long objectFreeID : task.objectIds ()) {
invokeObjectFreeAnalysisHandlers (objectFreeID);
}
__shvmContext.notifyAnalysesOnObjectFree (task.objectIds ());
// Wait for next task to process.
task = __taskQueue.take();
......
......@@ -3,11 +3,13 @@ package ch.usi.dag.shvm;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import ch.usi.dag.dislreserver.remoteanalysis.RemoteAnalysis;
import ch.usi.dag.dislreserver.shadow.ShadowObject;
import ch.usi.dag.dislreserver.shadow.ShadowObjectTable;
import ch.usi.dag.shvm.AnalysisResolver.AnalysisMethodHolder;
import ch.usi.dag.shvm.ArgumentUnmarshaller.UnmarshallerException;
import ch.usi.dag.shvm.SHVMContext.LookupException;
import ch.usi.dag.util.logging.Logger;
......@@ -47,19 +49,24 @@ final class AnalysisHandler {
* Number of method invocations
* @param rawData
* Input stream with the rest of the data.
* @throws LookupException
* @throws UnmarshallerException
*/
void handle (
final long orderingId, final int invocationCount,
final ByteBuffer rawData
) throws DiSLREServerException {
if (invocationCount < 0) {
throw new DiSLREServerException (
"invalid number of analysis invocation requests: %d",
invocationCount
);
}
final List <AnalysisInvocation> invocations = __unmarshalInvocations (
) throws UnmarshallerException, LookupException {
//
// TODO LB: Consider unmarshalling in the analysis invocation thread.
//
// The data has been already received, so the unpacking could be
// handled in the thread corresponding to the ordering id instead of
// the main thread. The client is not expecting answers anyway.
//
// Also, the invocations could be kept in a per-thread array list that
// would only grow, but would not have to be reallocated all the time.
//
final AnalysisMethod.Invocation [] invocations = __unmarshalInvocations (
invocationCount, rawData
);
......@@ -67,13 +74,16 @@ final class AnalysisHandler {
}
private List <AnalysisInvocation> __unmarshalInvocations (
private AnalysisMethod.Invocation [] __unmarshalInvocations (
final int invocationCount, final ByteBuffer rawData
) throws DiSLREServerException {
final List <AnalysisInvocation> result = new LinkedList <> ();
) throws UnmarshallerException, LookupException {
final AnalysisMethod.Invocation [] result = new AnalysisMethod.Invocation [invocationCount];
final ShadowObjectTable shadowObjects = __shvmContext.getShadowObjectTable ();
for (int i = 0; i < invocationCount; ++i) {
result.add (__unmarshalInvocation (rawData));
final short methodId = rawData.getShort ();
final AnalysisMethod am = __shvmContext.lookupAnalysisMethod (methodId);
result [i] = am.unmarshalInvocation (rawData, shadowObjects);
}
return result;
......@@ -88,8 +98,7 @@ final class AnalysisHandler {
// read method id from network and retrieve method
final short methodId = rawData.getShort ();
final AnalysisMethodHolder amh = __shvmContext
.getAnalysisResolver ().getMethod (methodId);
final AnalysisMethodHolder amh = null; // __shvmContext.getAnalysisResolver ().getMethod (methodId);
// *** retrieve method argument values ***
......@@ -221,7 +230,14 @@ final class AnalysisHandler {
__dispatcher.objectsFreedEvent (objectIds);
}
void exit() {
__dispatcher.exit();
/**
* Waits for all analyses to finish and calls the
* {@link RemoteAnalysis#atExit()} method on each of them. *
*/
void exit () {
__dispatcher.exit ();
__shvmContext.notifyAnalysesOnVmExit ();
}
}
......@@ -5,6 +5,7 @@ import ch.usi.dag.dislreserver.shadow.ShadowClassTable;
import ch.usi.dag.dislreserver.shadow.ShadowObject;
import ch.usi.dag.dislreserver.shadow.ShadowObjectTable;
import ch.usi.dag.shvm.AnalysisRegistry.ResolveException;
import ch.usi.dag.shvm.ArgumentUnmarshaller.UnmarshallerException;
import ch.usi.dag.util.logging.Logger;
......@@ -25,7 +26,17 @@ public class SHVMContext {
private final ShadowObjectTable __shadowObjectTable;
private final AnalysisResolver __analysisResolver;
/**
* A registry of {@link RemoteAnalysis} instances. Thread unsafe.
*/
private final AnalysisRegistry __analyses;
/**
* A registry of {@link AnalysisMethod} instances. Thread unsafe.
*/
private final AnalysisMethodRegistry __analysisMethods;
/**
......@@ -40,7 +51,8 @@ public class SHVMContext {
SHVMContext(final Logger log) {
__shadowClassTable = new ShadowClassTable (this);
__shadowObjectTable = new ShadowObjectTable (this);
__analysisResolver = new AnalysisResolver ();
__analyses = new AnalysisRegistry (this);
__analysisMethods = new AnalysisMethodRegistry (this);
__log = log;
}
......@@ -62,20 +74,85 @@ public class SHVMContext {
/**
* Gets the {@link AnalysisResolver}
* @return An registered instance of {@link AnalysisResolver}
* @return The {@link Logger} associated with this context.
*/
public AnalysisResolver getAnalysisResolver()
{
return __analysisResolver;
public Logger log () {
return __log;
}
/**
* Resolves an analysis method and registers it under the given analysis id.
* This will cause analysis class to be loaded and instantiated (if it does
* not already exist).
*
* @param analysisId
* The analysis method identifier (assigned by client).
* @param methodQualifier
* The fully qualified name of the analysis method (without
* signature).
* @throws ResolveException
* If the method could not be resolved.
* @throws UnmarshallerException
* If an argument unmarshaller cannot be created.
* @throws IllegalAccessException
* If a method handle cannot be created.
*/
public void registerAnalysisMethod (
final short analysisId, final String methodQualifier
) throws ResolveException, IllegalAccessException, UnmarshallerException {
__analysisMethods.register (
analysisId,
__analyses.resolveMethod (methodQualifier)
);
}
public AnalysisMethod lookupAnalysisMethod (
final short analysisId
) throws LookupException {
final AnalysisMethod result = __analysisMethods.lookup (analysisId);
if (result != null) {
return result;
}
throw new LookupException ("invalid analysis method id: %s", analysisId);
}
public void notifyAnalysesOnVmExit () {
__analyses.notifyAnalysesOnVmExit ();
}
public void notifyAnalysesOnObjectFree (final long [] objectIds) {
final ShadowObjectTable shadowObjects = __shadowObjectTable;
for (final long objectId : objectIds) {
//
// Convert object id to shadow object, notify all analyzes about the
// object being released, and then release it.
//
// TODO YZ: Object-free events should be (ideally) only sent to analyzes
// that have seen the shadow object.
//
final ShadowObject object = shadowObjects.get (objectId);
__analyses.notifyAnalysesOnObjectFree (object);
shadowObjects.freeShadowObject (object);
}
}
//
@SuppressWarnings ("serial")
static final class LookupException extends Exception {
public LookupException (final String format, final Object... args) {
super (String.format (format, args));
}
public LookupException (final Throwable cause, final String format,final Object... args) {
super (String.format (format, args), cause);
}
}
}
......@@ -2,7 +2,9 @@ package ch.usi.dag.shvm;
import java.nio.ByteBuffer;
import ch.usi.dag.dislreserver.remoteanalysis.RemoteAnalysis;
import ch.usi.dag.shvm.AnalysisRegistry.ResolveException;
import ch.usi.dag.shvm.ArgumentUnmarshaller.UnmarshallerException;
import ch.usi.dag.shvm.SHVMContext.LookupException;
import ch.usi.dag.util.logging.Logger;
......@@ -64,9 +66,22 @@ public final class SHVMImpl implements SHVM {
public void HandleAnalyze (
final long orderingId, final int invocationCount, final ByteBuffer rawData
) throws DiSLREServerException {
__analysisHandler.handle (
orderingId, invocationCount, rawData
);
if (invocationCount < 0) {
throw new DiSLREServerException (
"invalid number of invocation requests: %d", invocationCount
);
}
try {
__analysisHandler.handle (
orderingId, invocationCount, rawData
);
} catch (UnmarshallerException | LookupException e) {
throw new DiSLREServerException (
"failed to handle analysis invocations for ordering id %d", orderingId
);
}
}
......@@ -96,27 +111,11 @@ public final class SHVMImpl implements SHVM {
/**
* Handles the request to close connection. Waits for all analyses to finish
* and calls the {@link RemoteAnalysis#atExit()} method on each of them.
* Handles the request to close connection.
*/
@Override
public void HandleClose () {
// Wait for all executors to complete run of the analysis invocation
__analysisHandler.exit ();
// invoke atExit on all analyses
for (final RemoteAnalysis analysis : __shvmContext.getAnalysisResolver ().getAllAnalyses ()) {
try {
analysis.atExit ();
} catch (final Exception e) {
// report error during analysis invocation
__log.error (
e, "exception in analysis %s.atExit(): %s\n",
analysis.getClass ().getName (), e.getMessage ()
);
}
}
}
......@@ -153,19 +152,29 @@ public final class SHVMImpl implements SHVM {
* Id of the analysis method.
* @param methodQualifier
* Fully qualified method name without signature.
* @throws DiSLREServerException
* If analysis registraction fails.
*/
@Override
public void HandleRegisterAnalysis (
final int analysisId, final String methodQualifier
) throws DiSLREServerException {
__shvmContext.getAnalysisResolver ().registerMethodId (
(short) analysisId, methodQualifier
);
__log.debug (
"registered '%s' as analysis method %d",
methodQualifier, analysisId
);
try {
__shvmContext.registerAnalysisMethod (
(short) analysisId, methodQualifier
);
__log.debug (
"registered '%s' as analysis method %d",
methodQualifier, analysisId
);
} catch (final ResolveException | UnmarshallerException | IllegalAccessException e) {
throw new DiSLREServerException (
"failed to register '%s' as analysis %d: %s",
methodQualifier, analysisId, e.getMessage ()
);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment