Commit 4df6fdb2 authored by Lubomir Bulej's avatar Lubomir Bulej

Merge branch 'wip-cleanups' into 'devel'

First batch of cleanups along with a few fixes

These clean-ups and the (few) fixes result from working with the integrated server for a bit. One of the bigger changes is partially deferring processing of analysis data to analysis threads. This removes some load from the main receiver thread and also allows reusing many of the objects that were previously allocated for each analysis request, which visibly reduces the allocation rate in the server.

See merge request !10
parents b3e08013 57219167
Pipeline #6760 passed with stages
in 5 minutes and 26 seconds
......@@ -56,14 +56,6 @@ test shvm:
script:
- ant test-shvm
test util:
stage: test
cache:
<<: *cache
policy: pull
script:
- ant test-util
test server:
stage: test
cache:
......
......@@ -784,18 +784,13 @@
<ant antfile="${src.shvm.prefix}/build.xml" target="test" usenativebasedir="true"/>
</target>
<!-- Run the util unit tests -->
<target name="test-util">
<ant antfile="${src.util.prefix}/build.xml" target="test" usenativebasedir="true"/>
</target>
<!-- Run the server's unit tests -->
<target name="test-server">
<ant antfile="${server.prefix}/build.xml" target="test" usenativebasedir="true"/>
</target>
<!-- Run all tests -->
<target name="test" depends="test-disl,test-shvm,test-util,test-server,test-compound"></target>
<target name="test" depends="test-disl,test-shvm,test-server,test-compound"></target>
<!-- Run compound tests -->
<target name="test-compound" depends="build,build-test" description="Runs all tests or a selected (-Dtest.name=...) test suite.">
......
......@@ -6,7 +6,7 @@ ant-contrib.org=ant-contrib
junit.rev=4.12
junit.org=junit
asm.rev=6.1
asm.rev=7.2
asm.org=org.ow2.asm
log4j.rev=1.2.17
......@@ -16,6 +16,6 @@ protobuf.rev=3.0.1
protobuf.org=com.google.protobuf
protobuf.lib=protobuf-lite
jcommander.rev=1.72
jcommander.rev=1.78
jcommander.org=com.beust
jcommander.lib=jcommander
package ch.usi.dag.dislserver;
import ch.usi.dag.util.logging.Logger;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
......@@ -17,6 +15,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import ch.usi.dag.util.CounterSet;
import ch.usi.dag.util.logging.Logger;
public final class DiSLServer {
......@@ -30,7 +31,7 @@ public final class DiSLServer {
private static final String PROP_CONT = "dislserver.continuous";
final boolean continuous = Boolean.getBoolean (PROP_CONT);
private final boolean continuous = Boolean.getBoolean (PROP_CONT);
//
......@@ -40,26 +41,21 @@ public final class DiSLServer {
//
enum ElapsedTime {
RECEIVE, UNPACK, PROCESS, PACK, TRANSMIT
}
//
private final AtomicInteger __workerCount = new AtomicInteger ();
final AtomicInteger __workerCount = new AtomicInteger ();
final CounterSet<ElapsedTime> __globalStats = new CounterSet<ElapsedTime> (
ElapsedTime.class);
private final CounterSet <ElapsedTime> __globalStats = new CounterSet <> (ElapsedTime.class);
/**
* Session credentials storage.
*/
private final ConcurrentHashMap<Integer,SessionCredentials> SessionStorage = new ConcurrentHashMap<> ();
private final ConcurrentHashMap <Integer, Session> __sessionsById = new ConcurrentHashMap <> ();
/**
* Holds the last used session id.
*/
private final AtomicInteger last_session_id = new AtomicInteger (0);
private final AtomicInteger __lastSessionId = new AtomicInteger (0);
//
/**
*
......@@ -72,6 +68,7 @@ public final class DiSLServer {
) {
try {
final Thread serverThread = Thread.currentThread ();
final Runnable terminationCallback = () -> __notifyWorkerFinished (serverThread);
while (!serverThread.isInterrupted ()) {
final SocketChannel clientSocket = serverSocket.accept ();
......@@ -82,10 +79,9 @@ public final class DiSLServer {
);
// client socket handed off to connection handler
__workerCount.incrementAndGet ();
executor.submit (new ConnectionHandler (
this,
clientSocket,
serverThread
this, clientSocket, terminationCallback
));
}
......@@ -111,6 +107,19 @@ public final class DiSLServer {
}
private void __notifyWorkerFinished (final Thread serverThread) {
//
// If there are no more workers left and we are not operating
// in continuous mode, shut the server down.
//
if (__workerCount.decrementAndGet () == 0) {
if (!continuous) {
serverThread.interrupt ();
}
}
}
private long __stats (final ElapsedTime et) {
return TimeUnit.MILLISECONDS.convert (
__globalStats.get (et), TimeUnit.NANOSECONDS
......@@ -119,27 +128,38 @@ public final class DiSLServer {
/**
* Takes care about generating unique session_ids etc.
* Current strategy: Incremental
* @return
* Generates unique session identifiers. Current strategy is incremental.
*/
SessionCredentials registerSession(){
int curr = last_session_id.incrementAndGet ();
SessionCredentials credentials = new SessionCredentials (curr);
SessionStorage.put (curr, credentials);
return credentials;
Session registerSession () {
final int sessionId = __lastSessionId.incrementAndGet ();
final Session session = new Session (sessionId);
__sessionsById.put (sessionId, session);
return session;
}
/**
* Gets the session by the session id.
* @param session_id required session id.
* @return SessionCredentials on null while not present.
* Retrieves the session with the given session id.
*
* @return {@link Session} corresponding to the given session id.
* @throws DiSLServerException If a session for the given session id does not exist.
*/
SessionCredentials getSession(int session_id){
return SessionStorage.getOrDefault (session_id, null);
Session getSession (final int sessionId) throws DiSLServerException {
final Session result = __sessionsById.get(sessionId);
if (result != null) {
return result;
}
throw new DiSLServerException ("invalid session id: "+ sessionId);
}
void mergeStats (final CounterSet <ElapsedTime> stats) {
__globalStats.update (stats);
}
//
public static void main (final String[] args) {
__log.debug ("server starting");
__log.debug ("java.home: %s", System.getProperty ("java.home", ""));
......
package ch.usi.dag.dislserver;
/**
* Represents exceptions thrown by the DiSL server.
*/
@SuppressWarnings ("serial")
final class DiSLServerException extends Exception {
private static final long serialVersionUID = 5272000884539359236L;
public DiSLServerException (final String message) {
super (message);
}
public DiSLServerException (final String message, final Throwable cause) {
super (message, cause);
}
}
package ch.usi.dag.dislserver;
enum ElapsedTime {
RECEIVE, UNPACK, PROCESS, PACK, TRANSMIT
}
package ch.usi.dag.dislserver;
import ch.usi.dag.disl.DiSL;
import ch.usi.dag.disl.DiSL.CodeOption;
import ch.usi.dag.disl.DiSLException;
import ch.usi.dag.disl.JavaNames;
import ch.usi.dag.dislre.protocol.DiSL.InstrClassInfo;
import ch.usi.dag.dislre.protocol.DiSL.InstrumentClassRequest;
import ch.usi.dag.dislre.protocol.DiSL.InstrumentClassResponse;
import ch.usi.dag.util.Strings;
import ch.usi.dag.util.logging.Logger;
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
......@@ -19,9 +9,21 @@ import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import com.google.protobuf.ByteString;
import ch.usi.dag.disl.CodeOption;
import ch.usi.dag.disl.DiSL;
import ch.usi.dag.disl.DiSLException;
import ch.usi.dag.disl.JavaNames;
import ch.usi.dag.dislre.protocol.DiSL.InstrumentClassResponse;
import ch.usi.dag.util.Strings;
import ch.usi.dag.util.logging.Logger;
final class RequestProcessor {
......@@ -29,10 +31,29 @@ final class RequestProcessor {
//
private static final String uninstrPath = System.getProperty ("dislserver.uninstrumented");
private static final String instrPath = System.getProperty ("dislserver.instrumented");
private static final Optional <String> originalDumpPath = Optional.ofNullable (
System.getProperty ("dislserver.uninstrumented")
);
private static final Optional <String> modifiedDumpPath = Optional.ofNullable (
System.getProperty ("dislserver.instrumented")
);
/**
* Determines whether to generate bypass code. If set, this property
* currently sets the {@code disl.disablebypass} property to {@code true}
* and so that DiSL will pick it up.
*/
private static final boolean disableBypass = Boolean.getBoolean ("dislserver.disablebypass");
/**
* The DiSL library does not interpret the {@code disl.exclusionList}
* property anymore, so we do it in the server.
*/
private static final Optional <File> exclusionListFile = Optional.ofNullable (
System.getProperty ("disl.exclusionList")
).map (File::new);
//
private final DiSL __disl;
......@@ -45,31 +66,29 @@ final class RequestProcessor {
//
InstrumentClassResponse process (final InstrumentClassRequest request) {
final byte [] classBytes = request.getClassBytes ().toByteArray ();
final InstrClassInfo classInfo = request.getClassInfo ();
final String className = __getClassName (classInfo.getClassName (), classBytes);
final Set <CodeOption> options = CodeOption.setOf (request.getFlags ());
InstrumentClassResponse process (
final long classLoaderTag, final Optional <String> optionalClassName,
final byte [] classBytes, final Set <CodeOption> options
) {
final String className = optionalClassName.orElseGet (() -> __getClassName (classBytes));
if (__log.debugIsLoggable ()) {
__log.debug (
"processing class %s of classloader %d [%d bytes, %s]",
className.isEmpty () ? "<unknown>" : className,
classInfo.getClassLoaderTag (), classBytes.length,
classLoaderTag, classBytes.length,
Strings.join ("+", options)
);
}
//
// If requested, dump the uninstrumented byte code, instrument the
// class, and again, if requested, dump the instrumented bytecode.
// Create a response corresponding to the request and re-throw any
// exception that might have been thrown as an server internal error.
// If requested, dump the original byte code, instrument the class, and
// again, if requested, dump the modified byte code. Create a response
// corresponding to the request and report any exception that might
// have been thrown in the response to the client.
//
try {
if (uninstrPath != null) {
__dumpClass (classBytes, className, uninstrPath);
}
originalDumpPath.ifPresent (path -> __dumpClass (classBytes, className, path));
// TODO: instrument the bytecode according to given options
// byte [] instrCode = disl.instrument (origCode, options);
......@@ -77,9 +96,7 @@ final class RequestProcessor {
final byte [] newClassBytes = __disl.instrument (classBytes);
if (newClassBytes != null) {
if (instrPath != null) {
__dumpClass (newClassBytes, className, instrPath);
}
modifiedDumpPath.ifPresent (path -> __dumpClass (newClassBytes, className, path));
return InstrumentClassResponse.newBuilder ()
.setResult (InstrumentClassResponse.InstrumentClassResult.CLASS_MODIFIED)
......@@ -114,19 +131,13 @@ final class RequestProcessor {
}
private static String __getClassName (
final String name, final byte [] classBytes
) {
if (!name.isEmpty ()) {
return name;
} else {
String result = JavaNames.getClassNameFromBytes (classBytes);
if (result == null || result.isEmpty ()) {
result = UUID.randomUUID ().toString ();
}
return result;
private static String __getClassName (final byte [] classBytes) {
final String result = JavaNames.getClassNameFromBytes (classBytes);
if (result == null || result.isEmpty ()) {
return UUID.randomUUID ().toString ();
}
return result;
}
......@@ -147,23 +158,27 @@ final class RequestProcessor {
*/
private static void __dumpClass (
final byte[] byteCode, final String className, final String root
) throws IOException {
// Create the package directory hierarchy
final Path dir = FileSystems.getDefault ().getPath (
root, JavaNames.typeToInternal (JavaNames.packageName (className))
);
) throws RuntimeException {
try {
// Create the package directory hierarchy
final Path dir = FileSystems.getDefault ().getPath (
root, JavaNames.typeToInternal (JavaNames.packageName (className))
);
Files.createDirectories (dir);
Files.createDirectories (dir);
// Dump the class byte code.
final Path file = dir.resolve (JavaNames.appendClassFileExtension (
JavaNames.simpleClassName (className)
));
// Dump the class byte code.
final Path file = dir.resolve (JavaNames.appendClassFileExtension (
JavaNames.simpleClassName (className)
));
try (
final OutputStream os = Files.newOutputStream (file);
) {
os.write (byteCode);
try (
final OutputStream os = Files.newOutputStream (file);
) {
os.write (byteCode);
}
} catch (final IOException e) {
throw new RuntimeException (e);
}
}
......@@ -175,21 +190,26 @@ final class RequestProcessor {
//
/**
* Creates new instance of RequestProcessor class.
* Creates a new {@link RequestProcessor} instance.
*
* @param instrumentationJars List of bytearrays representing instrumentation jars.
* @return New instance of RequestProcessor
* @throws DiSLException
* @param instrumentationJars
* List of byte arrays representing instrumentation jars.
* @return A new {@link RequestProcessor} instance.
* @throws DiSLException If DiSL initialization fails.
*/
static RequestProcessor newInstance (final List<byte[]> instrumentationJars) throws DiSLException {
static RequestProcessor newInstance (
final List <byte []> instrumentationJars
) throws DiSLException {
// TODO LB: Configure bypass on a per-request basis.
if (disableBypass) {
System.setProperty ("disl.disablebypass", "true");
}
final DiSL disl = new DiSL(System.getProperties (), instrumentationJars);
final DiSL disl = new DiSL (
System.getProperties (), instrumentationJars, exclusionListFile
);
return new RequestProcessor (disl);
}
......
package ch.usi.dag.dislserver;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import ch.usi.dag.disl.CodeOption;
import ch.usi.dag.disl.DiSLException;
import ch.usi.dag.dislre.protocol.DiSL.InstrClassInfo;
import ch.usi.dag.dislre.protocol.DiSL.InstrumentClassRequest;
import ch.usi.dag.dislre.protocol.DiSL.InstrumentClassResponse;
/**
* Represents a client session.
*
* @author Vit Kabele <vit@kabele.me>
* @author Lubomir Bulej <bulej@d3s.mff.cuni.cz>
*/
final class Session {
/** Session identifier. */
private final int __sessionId;
/**
* List of byte arrays representing JAR files containing instrumentation
* classes.
*/
private final List <byte []> __jarBinaries = new ArrayList <> ();
/**
* A request processor to handle the instrumentation requests.
*/
private RequestProcessor __processor;
/**
* Initialize the session.
*
* @param sessionId
* Session identifier.
*/
public Session (final int sessionId) {
__sessionId = sessionId;
}
public int id () {
return __sessionId;
}
public void addJar (final byte [] jarBytes) {
__jarBinaries.add (jarBytes);
}
public void initialize () throws DiSLException {
if (__processor == null) {
__processor = RequestProcessor.newInstance (__jarBinaries);
} else {
throw new IllegalStateException ("session already initialized");
}
}
public InstrumentClassResponse process (final InstrumentClassRequest request) {
final InstrClassInfo classInfo = request.getClassInfo ();
return __processor.process (
classInfo.getClassLoaderTag (),
Optional.of (classInfo.getClassName ()).filter (s -> !s.isEmpty ()),
request.getClassBytes ().toByteArray (),
CodeOption.setOf (request.getFlags ())
);
}
public void terminate () {
if (__processor != null) {
__processor.terminate ();
__processor = null;
} else {
throw new IllegalStateException ("session already terminated");
}
}
}
/*
* Pack the values related to one session.
*
* File: SessionCredentials.java
* Created by Vit Kabele <vit@kabele.me> on 16/10/2018
*/
package ch.usi.dag.dislserver;
import java.util.ArrayList;
import java.util.List;
/**
* Pack the values related to one session.
*/
class SessionCredentials {
/**
* Session identifier
*/
final int sessionId;
/**
* Available states of the session.
*/
enum SessionState {
/**
* Default
*/
INITIALIZED,
/**
* Instrumentation obtained and OK.
*/
READY,
/**
* Error while processing one of the previous requests.
*/
ERROR,
/**
* Session was closed by the client.
*/
CLOSED
}
/**
* State of the session.
*/
private SessionState state = SessionState.INITIALIZED;
void setState(SessionState state){ this.state = state; }
SessionState getState(){ return state; }
/**
* Byte arrays representing files with instrumentation.
*/
final List<byte[]> instrumentationJars = new ArrayList<>();
/**
* A request processor to handle the instrumentation requests.
*/
private RequestProcessor processor;
void setProcessor(RequestProcessor processor){ this.processor = processor; }
RequestProcessor getProcessor(){ return processor; }
/**
* Construct the class
* @param sessionId Session identifier.
*/
SessionCredentials(int sessionId){
this.sessionId = sessionId;