Commit f26380e7 authored by Lubomir Bulej's avatar Lubomir Bulej

Use protocol buffers in DiSLServer and RequestProcessor.

parent 7cc011f0
package ch.usi.dag.dislserver;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
......@@ -14,6 +17,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import ch.usi.dag.dislserver.Protocol.InstrumentClassRequest;
import ch.usi.dag.dislserver.Protocol.InstrumentClassResponse;
import ch.usi.dag.dislserver.Protocol.InstrumentClassResult;
import ch.usi.dag.util.logging.Logger;
......@@ -37,7 +46,7 @@ public final class DiSLServer {
//
private enum ElapsedTime {
RECEIVE, PROCESS, TRANSMIT;
RECEIVE, UNPACK, PROCESS, PACK, TRANSMIT;
}
//
......@@ -69,42 +78,108 @@ public final class DiSLServer {
public void run () {
__workerCount.incrementAndGet ();
try (
final MessageChannel mc = new MessageChannel (__clientSocket);
) {
final CounterSet <ElapsedTime> stats = new CounterSet <ElapsedTime> (ElapsedTime.class);
final IntervalTimer <ElapsedTime> timer = new IntervalTimer <ElapsedTime> (ElapsedTime.class);
try {
//
// Process requests until a shutdown request is received, a
// communication error occurs, or an internal error occurs.
//
final CounterSet <ElapsedTime> stats = new CounterSet <ElapsedTime> (ElapsedTime.class);
final IntervalTimer <ElapsedTime> timer = new IntervalTimer <ElapsedTime> (ElapsedTime.class);
final ByteBuffer headBuffer = __allocBuffer (Integer.BYTES);
ByteBuffer recvBuffer = __allocDirect (4096);
ByteBuffer sendBuffer = __allocDirect (4096);
REQUEST_LOOP: while (true) {
timer.reset ();
final Message request = mc.recvMessage ();
timer.mark (ElapsedTime.RECEIVE);
//
__log.trace ("receiving instrumentation request");
headBuffer.clear ();
__bufferRecvFrom (__clientSocket, Integer.BYTES, headBuffer);
headBuffer.flip ();
final int messageLength = headBuffer.getInt ();
__log.trace ("expecting message of length %d", messageLength);
if (request.isShutdown ()) {
if (messageLength == 0) {
__log.debug ("received empty message, exiting");
timer.mark (ElapsedTime.RECEIVE);
stats.update (timer);
break REQUEST_LOOP;
}
recvBuffer.clear ();
if (recvBuffer.remaining () < messageLength) {
recvBuffer = __expandBuffer (recvBuffer, messageLength);
__log.debug ("expanded receive buffer to %d bytes", recvBuffer.capacity ());
}
__bufferRecvFrom (__clientSocket, messageLength, recvBuffer);
timer.mark (ElapsedTime.RECEIVE);
//
__log.trace ("unpacking instrumentation request");
recvBuffer.flip ();
final CodedInputStream recvStream = CodedInputStream.newInstance (recvBuffer);
final InstrumentClassRequest request = InstrumentClassRequest.parseFrom (recvStream);
timer.mark (ElapsedTime.UNPACK);
//
// Process the request and send the response to the client.
// Update the timing stats if everything goes well.
//
final Message response = __requestProcessor.process (request);
__log.trace ("processing instrumentation request");
final InstrumentClassResponse response = __requestProcessor.process (request);
timer.mark (ElapsedTime.PROCESS);
mc.sendMessage (response);
//
// Note: the CodedOutputStream remembers buffer position at
// creation time, so anything put into a byte buffer after
// CodedOutputStream has been created will be lost/corrupted.
//
// Also, it cannot be reused and needs to be created for
// every message packed.
//
__log.trace ("packing instrumentation response");
final int responseLength = response.getSerializedSize ();
sendBuffer.clear ();
sendBuffer.putInt (responseLength);
if (sendBuffer.remaining () < responseLength) {
sendBuffer = __expandBuffer (sendBuffer, responseLength);
__log.debug ("expanded send buffer to %d bytes", sendBuffer.capacity ());
}
if (responseLength > 0) {
final CodedOutputStream sendStream = CodedOutputStream.newInstance (sendBuffer);
response.writeTo (sendStream);
sendStream.flush ();
}
timer.mark (ElapsedTime.PACK);
//
__log.trace ("sending instrumentation response");
sendBuffer.flip ();
__bufferSendTo (sendBuffer, __clientSocket);
timer.mark (ElapsedTime.TRANSMIT);
//
stats.update (timer);
if (response.isError ()) {
if (response.getResult () == InstrumentClassResult.ERROR) {
//
// Error during instrumentation. Report it to the client
// and stop receiving requests from this connection.
......@@ -112,15 +187,15 @@ public final class DiSLServer {
break REQUEST_LOOP;
}
} // REQUEST_LOOP
//
// Merge thread-local stats with global stats when leaving
// the request loop.
// the request loop in a peaceful manner.
//
__globalStats.update (stats);
} catch (final IOException ioe) {
//
// Communication error -- just log a message here.
......@@ -145,6 +220,58 @@ public final class DiSLServer {
}
}
//
private void __bufferSendTo (
final ByteBuffer buffer, final SocketChannel sc
) throws IOException {
while (buffer.hasRemaining ()) {
final int bytesWritten = sc.write (buffer);
__log.trace ("sent %d bytes, %d remaining", bytesWritten, buffer.remaining ());
}
}
private void __bufferRecvFrom (
final SocketChannel sc, final int length, final ByteBuffer buffer
) throws IOException {
final int expectedPosition = buffer.position () + length;
while (buffer.position () < expectedPosition) {
final int bytesRead = sc.read (buffer);
if (bytesRead < 0) {
throw new EOFException ("unexpected end of stream");
}
}
}
private ByteBuffer __expandBuffer (
final ByteBuffer buffer, final int messageLength
) {
//
// The buffer needs to be in receive mode, i.e., the buffer
// position indicates the first available byte. Any bytes before
// current position will be copied to the new buffer.
//
final int requiredCapacity = buffer.position () + messageLength;
int newCapacity = 2 * buffer.capacity ();
while (newCapacity < requiredCapacity) {
newCapacity *= 2;
}
buffer.flip ();
return __allocDirect (newCapacity).put (buffer);
}
private ByteBuffer __allocBuffer (final int capacity) {
return ByteBuffer.allocate (capacity).order (ByteOrder.BIG_ENDIAN);
}
private ByteBuffer __allocDirect (final int capacity) {
return ByteBuffer.allocateDirect (capacity).order (ByteOrder.BIG_ENDIAN);
}
}
//
......@@ -186,7 +313,9 @@ public final class DiSLServer {
//
__log.debug ("receiving data took %d ms", __stats (ElapsedTime.RECEIVE));
__log.debug ("unpacking data took %d ms", __stats (ElapsedTime.UNPACK));
__log.debug ("processing took %d ms", __stats (ElapsedTime.PROCESS));
__log.debug ("packing data took %d ms", __stats (ElapsedTime.PACK));
__log.debug ("transmitting data took %d ms", __stats (ElapsedTime.TRANSMIT));
}
......
......@@ -12,10 +12,14 @@ import java.util.UUID;
import org.objectweb.asm.ClassReader;
import com.google.protobuf.ByteString;
import ch.usi.dag.disl.DiSL;
import ch.usi.dag.disl.DiSL.CodeOption;
import ch.usi.dag.disl.exception.DiSLException;
import ch.usi.dag.disl.util.JavaNames;
import ch.usi.dag.dislserver.Protocol.InstrumentClassRequest;
import ch.usi.dag.dislserver.Protocol.InstrumentClassResponse;
import ch.usi.dag.util.Strings;
import ch.usi.dag.util.logging.Logger;
......@@ -42,10 +46,10 @@ final class RequestProcessor {
//
public Message process (final Message request) throws DiSLServerException {
final byte [] classBytes = request.payload ();
final String className = __getClassName (request.control (), classBytes);
final Set <CodeOption> options = CodeOption.setOf (request.flags ());
public InstrumentClassResponse process (final InstrumentClassRequest request) {
final byte [] classBytes = request.getClassBytes ().toByteArray ();
final String className = __getClassName (request.getClassNameBytes ().toByteArray (), classBytes);
final Set <CodeOption> options = CodeOption.setOf (request.getFlags ());
if (__log.traceIsLoggable ()) {
__log.trace (
......@@ -76,10 +80,15 @@ final class RequestProcessor {
__dumpClass (newClassBytes, className, instrPath);
}
return Message.createClassModifiedResponse (newClassBytes);
return InstrumentClassResponse.newBuilder ()
.setResult (Protocol.InstrumentClassResult.CLASS_MODIFIED)
.setClassBytes (ByteString.copyFrom (newClassBytes))
.build ();
} else {
return Message.createNoOperationResponse ();
return InstrumentClassResponse.newBuilder ()
.setResult (Protocol.InstrumentClassResult.CLASS_UNMODIFIED)
.build ();
}
} catch (final Exception e) {
......@@ -89,7 +98,10 @@ final class RequestProcessor {
__log.error (message);
return Message.createErrorResponse (message);
return InstrumentClassResponse.newBuilder ()
.setResult (Protocol.InstrumentClassResult.ERROR)
.setErrorMessage (message)
.build ();
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment