Commit e17471d3 authored by Vít Kabele's avatar Vít Kabele

ConnectionHandler extracted to the separate file.

parent 9733a035
/**
* Handle the connection from a client.
*
* File: ConnectionHandler.java
* Author: Vit Kabele <vit@kabele.me>
*/
package ch.usi.dag.dislserver;
import ch.usi.dag.util.logging.Logger;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
final class ConnectionHandler implements Runnable {
private final SocketChannel __clientSocket;
private final RequestProcessor __requestProcessor;
private final Thread __serverThread;
private final DiSLServer __server;
private static final Logger __log = Logging.getPackageInstance ();
//
ConnectionHandler (
final DiSLServer server,
final SocketChannel clientSocket,
final RequestProcessor requestProcessor,
final Thread serverThread
) {
__server = server;
__clientSocket = clientSocket;
__requestProcessor = requestProcessor;
__serverThread = serverThread;
}
@Override
public void run () {
__server.__workerCount.incrementAndGet ();
final CounterSet<DiSLServer.ElapsedTime> stats = new CounterSet<DiSLServer.ElapsedTime> (
DiSLServer.ElapsedTime.class);
final IntervalTimer<DiSLServer.ElapsedTime> timer = new IntervalTimer<DiSLServer.ElapsedTime> (
DiSLServer.ElapsedTime.class);
try {
//
// Process requests until a shutdown request is received, a
// communication error occurs, or an internal error occurs.
//
final ByteBuffer headBuffer = __allocDirect (4096);
ByteBuffer recvBuffer = __allocDirect (4096);
ByteBuffer sendBuffer = __allocDirect (4096);
REQUEST_LOOP:
while (true) {
timer.reset ();
//
__log.trace ("receiving instrumentation request");
headBuffer.clear ();
__bufferRecvFrom (__clientSocket, Integer.BYTES, headBuffer);
headBuffer.flip ();
final int messageLength = headBuffer.getInt ();
__log.trace ("expecting message of length %d", messageLength);
if (messageLength == 0) {
__log.debug ("received empty message, exiting");
timer.mark (DiSLServer.ElapsedTime.RECEIVE);
stats.update (timer);
break REQUEST_LOOP;
}
recvBuffer.clear ();
if (recvBuffer.remaining () < messageLength) {
recvBuffer = __expandBuffer (recvBuffer, messageLength);
__log.debug ("expanded receive buffer to %d bytes", recvBuffer.capacity ());
}
__bufferRecvFrom (__clientSocket, messageLength, recvBuffer);
timer.mark (DiSLServer.ElapsedTime.RECEIVE);
//
__log.trace ("unpacking instrumentation request");
recvBuffer.flip ();
final CodedInputStream recvStream = CodedInputStream.newInstance (recvBuffer);
final Protocol.InstrumentClassRequest request = Protocol.InstrumentClassRequest.parseFrom (
recvStream);
timer.mark (DiSLServer.ElapsedTime.UNPACK);
//
// Process the request and send the response to the client.
// Update the timing stats if everything goes well.
//
__log.trace ("processing instrumentation request");
final Protocol.InstrumentClassResponse response = __requestProcessor.process (request);
timer.mark (DiSLServer.ElapsedTime.PROCESS);
//
// Note: the CodedOutputStream remembers buffer position at
// creation time, so anything put into a byte buffer after
// CodedOutputStream has been created will be lost/corrupted.
//
// Also, it cannot be reused and needs to be created for
// every message packed.
//
__log.trace ("packing instrumentation response");
final int responseLength = response.getSerializedSize ();
sendBuffer.clear ();
sendBuffer.putInt (responseLength);
if (sendBuffer.remaining () < responseLength) {
sendBuffer = __expandBuffer (sendBuffer, responseLength);
__log.debug ("expanded send buffer to %d bytes", sendBuffer.capacity ());
}
if (responseLength > 0) {
final CodedOutputStream sendStream = CodedOutputStream.newInstance (
sendBuffer);
response.writeTo (sendStream);
sendStream.flush ();
}
timer.mark (DiSLServer.ElapsedTime.PACK);
//
__log.trace ("sending instrumentation response");
sendBuffer.flip ();
__bufferSendTo (sendBuffer, __clientSocket);
timer.mark (DiSLServer.ElapsedTime.TRANSMIT);
//
stats.update (timer);
if (response.getResult () == Protocol.InstrumentClassResponse.InstrumentClassResult.ERROR) {
//
// Error during instrumentation. Report it to the client
// and stop receiving requests from this connection.
//
break REQUEST_LOOP;
}
} // REQUEST_LOOP
//
// Merge thread-local stats with global stats when leaving
// the request loop in a peaceful manner.
//
__server.__globalStats.update (stats);
} catch (final IOException ioe) {
//
// Communication error -- just log a message here.
//
__log.error (
"error communicating with client: %s", ioe.getMessage ()
);
} catch (final Throwable t) {
__log.error (t, "failed to process instrumentation request");
} finally {
//
// If there are no more workers left and we are not operating
// in continuous mode, shut the server down.
//
if (__server.__workerCount.decrementAndGet () == 0) {
if (!__server.continuous) {
__serverThread.interrupt ();
}
}
}
}
//
private void __bufferSendTo (
final ByteBuffer buffer, final SocketChannel sc
) throws IOException {
while (buffer.hasRemaining ()) {
sc.write (buffer);
}
}
private void __bufferRecvFrom (
final SocketChannel sc, final int length, final ByteBuffer buffer
) throws IOException {
buffer.limit (buffer.position () + length);
while (buffer.hasRemaining ()) {
final int bytesRead = sc.read (buffer);
if (bytesRead < 0) {
throw new EOFException ("unexpected end of stream");
}
}
}
private ByteBuffer __expandBuffer (
final ByteBuffer buffer, final int messageLength
) {
//
// The buffer needs to be in receive mode, i.e., the buffer
// position indicates the first available byte. Any bytes before
// current position will be copied to the new buffer.
//
final int requiredCapacity = buffer.position () + messageLength;
int newCapacity = 2 * buffer.capacity ();
while (newCapacity < requiredCapacity) {
newCapacity *= 2;
}
buffer.flip ();
return __allocDirect (newCapacity).put (buffer);
}
private ByteBuffer __allocDirect (final int capacity) {
return ByteBuffer.allocateDirect (capacity).order (ByteOrder.BIG_ENDIAN);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment