Commit 6bee27e0 authored by Vít Kabele's avatar Vít Kabele

Communication is done via packed Server/Client messages.

ConnectionHandler refactored.
parent e17471d3
Pipeline #3132 passed with stages
in 3 minutes and 3 seconds
......@@ -108,7 +108,8 @@ LIBRARY := $(TARGET_DIR)/$(LIBRARY_NAME)
# Source and object files needed to create the library
SOURCES = bytecode.c common.c jvmtiutil.c connection.c \
connpool.c msgchannel.c network.c classparser.c \
dislserver.pb-c.c protobuf-c.c dislagent.c
dislserver.pb-c.c protobuf-c.c dislagent.c \
sessions.c
HEADERS = $(wildcard *.h) codeflags.h dislserver.pb-c.h
GENSRCS = bytecode.c codeflags.h \
......
......@@ -218,19 +218,24 @@ __instrument_class (
// send the it to the server. Receive the response and release the
// connection again.
//
ClientMessage message = CLIENT_MESSAGE__INIT;
InstrumentClassRequest request = INSTRUMENT_CLASS_REQUEST__INIT;
request.flags = request_flags;
request.classname = (char *) class_name;
request.classbytes.len = class_def->class_byte_count;
request.classbytes.data = (uint8_t *) class_def->class_bytes;
size_t send_size = instrument_class_request__get_packed_size (&request);
message.request_case = CLIENT_MESSAGE__REQUEST_INSTRUMENT_CLASS_REQUEST;
message.instrument_class_request = &request;
size_t send_size = client_message__get_packed_size(&message);
void * send_buffer = malloc (send_size);
assert (send_buffer != NULL);
instrument_class_request__pack (&request, send_buffer);
client_message__pack(&message, send_buffer);
struct connection * conn = network_acquire_connection ();
message_send (conn, send_buffer, send_size);
free(send_buffer);
//
......@@ -238,10 +243,14 @@ __instrument_class (
size_t recv_size = message_recv (conn, &recv_buffer);
network_release_connection (conn);
InstrumentClassResponse * response = instrument_class_response__unpack (NULL, recv_size, recv_buffer);
assert (response != NULL);
ServerMessage * resp = server_message__unpack(NULL,recv_size,recv_buffer);
assert (resp != NULL);
free (recv_buffer);
assert( resp->response_case == SERVER_MESSAGE__RESPONSE_INSTRUMENT_CLASS_RESPONSE );
InstrumentClassResponse * response = resp->instrument_class_response;
//server_message__free_unpacked(resp, NULL);
//
// Check if error occurred on the server.
// The control field of the response contains the error message.
......
/*
* This file is part of src_disl_agent
*
* Created by Vít Kabele on 12/10/2018.
*/
#include <stdlib.h>
#include <assert.h>
#include "sessions.h"
#include "common.h"
#include "dislserver.pb-c.h"
#include "network.h"
int
session_start()
{
/* Create, pack and send the session init request */
ClientMessage message = CLIENT_MESSAGE__INIT;
SessionInitRequest req = SESSION_INIT_REQUEST__INIT;
message.request_case = CLIENT_MESSAGE__REQUEST_SESSION_INIT_REQUEST;
message.session_init_request = &req;
message.session_init_request->code = 42;
size_t send_size = client_message__get_packed_size(&message);
void * buffer = malloc(send_size); // FREE!
assert(buffer != NULL);
client_message__pack(&message, buffer);
debug("Attempting to acquire the session id");
struct connection * conn = network_acquire_connection();
message_send(conn, buffer, send_size);
free(buffer);
return (0);
}
/*
* Allow usage of the session enabled protocol
* This file is part of src_disl_agent
*
* Created by Vít Kabele on 12/10/2018.
*/
#ifndef _SESSIONS_H_
#define _SESSIONS_H_
#include "msgchannel.h"
/*
* Global variable that holds the id of the session.
* Zero means that there is no active session initiated.
*/
int session_id;
/*
* Negotiate the session with the server.
*
* @return Zero on success, nonzero otherwise.
*/
int
session_start();
/*
* Announce the end of the session to the server.
*/
void
session_end();
#endif /* _SESSIONS_H_ */
......@@ -17,6 +17,8 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import ch.usi.dag.dislserver.Protocol.ClientMessage;
import ch.usi.dag.dislserver.Protocol.ServerMessage;
final class ConnectionHandler implements Runnable {
......@@ -60,111 +62,55 @@ final class ConnectionHandler implements Runnable {
// Process requests until a shutdown request is received, a
// communication error occurs, or an internal error occurs.
//
final ByteBuffer headBuffer = __allocDirect (4096);
ByteBuffer recvBuffer = __allocDirect (4096);
ByteBuffer sendBuffer = __allocDirect (4096);
REQUEST_LOOP:
while (true) {
timer.reset ();
//
__log.trace ("receiving instrumentation request");
headBuffer.clear ();
__bufferRecvFrom (__clientSocket, Integer.BYTES, headBuffer);
headBuffer.flip ();
final int messageLength = headBuffer.getInt ();
__log.trace ("expecting message of length %d", messageLength);
if (messageLength == 0) {
__log.debug ("received empty message, exiting");
timer.mark (DiSLServer.ElapsedTime.RECEIVE);
stats.update (timer);
break REQUEST_LOOP;
}
recvBuffer.clear ();
if (recvBuffer.remaining () < messageLength) {
recvBuffer = __expandBuffer (recvBuffer, messageLength);
__log.debug ("expanded receive buffer to %d bytes", recvBuffer.capacity ());
}
__bufferRecvFrom (__clientSocket, messageLength, recvBuffer);
timer.mark (DiSLServer.ElapsedTime.RECEIVE);
//
__log.trace ("unpacking instrumentation request");
ClientMessage request = receiveMessage (timer,stats);
ServerMessage response = ServerMessage.getDefaultInstance ();
recvBuffer.flip ();
final CodedInputStream recvStream = CodedInputStream.newInstance (recvBuffer);
final Protocol.InstrumentClassRequest request = Protocol.InstrumentClassRequest.parseFrom (
recvStream);
timer.mark (DiSLServer.ElapsedTime.UNPACK);
if (request == null) break;
//
// Process the request and send the response to the client.
// Update the timing stats if everything goes well.
//
__log.trace ("processing instrumentation request");
ClientMessage.RequestCase requestCase = request.getRequestCase ();
final Protocol.InstrumentClassResponse response = __requestProcessor.process (request);
timer.mark (DiSLServer.ElapsedTime.PROCESS);
switch (requestCase){
case SESSION_INIT_REQUEST:
System.out.println ("Delivered code is: ");
System.out.print ( request.getSessionInitRequest ().getCode () ); System.out.println ("");
break;
case INSTRUMENTATION_DELIVERY:
break;
case INSTRUMENT_CLASS_REQUEST:
timer.mark (DiSLServer.ElapsedTime.UNPACK);
//
// Note: the CodedOutputStream remembers buffer position at
// creation time, so anything put into a byte buffer after
// CodedOutputStream has been created will be lost/corrupted.
//
// Also, it cannot be reused and needs to be created for
// every message packed.
//
__log.trace ("packing instrumentation response");
final int responseLength = response.getSerializedSize ();
sendBuffer.clear ();
sendBuffer.putInt (responseLength);
if (sendBuffer.remaining () < responseLength) {
sendBuffer = __expandBuffer (sendBuffer, responseLength);
__log.debug ("expanded send buffer to %d bytes", sendBuffer.capacity ());
}
if (responseLength > 0) {
final CodedOutputStream sendStream = CodedOutputStream.newInstance (
sendBuffer);
response.writeTo (sendStream);
sendStream.flush ();
}
timer.mark (DiSLServer.ElapsedTime.PACK);
//
__log.trace ("sending instrumentation response");
sendBuffer.flip ();
__bufferSendTo (sendBuffer, __clientSocket);
timer.mark (DiSLServer.ElapsedTime.TRANSMIT);
//
stats.update (timer);
if (response.getResult () == Protocol.InstrumentClassResponse.InstrumentClassResult.ERROR) {
//
// Error during instrumentation. Report it to the client
// and stop receiving requests from this connection.
// Process the request and send the response to the client.
// Update the timing stats if everything goes well.
//
__log.trace ("processing instrumentation request");
response = ServerMessage
.newBuilder ()
.setInstrumentClassResponse (
__requestProcessor.process (request.getInstrumentClassRequest ())
).build ();
timer.mark (DiSLServer.ElapsedTime.PROCESS);
sendMessage (response, timer);
if (response.getInstrumentClassResponse ().getResult () == Protocol.InstrumentClassResponse.InstrumentClassResult.ERROR) {
//
// Error during instrumentation. Report it to the client
// and stop receiving requests from this connection.
//
break REQUEST_LOOP;
}
break;
case CLOSE_CONNECTION:
case REQUEST_NOT_SET:
default:
break REQUEST_LOOP;
}
} // REQUEST_LOOP
}
//
// Merge thread-local stats with global stats when leaving
......@@ -199,6 +145,100 @@ final class ConnectionHandler implements Runnable {
//
/**
* Send the message.
* @param response
* @param timer
* @throws IOException
*/
private void sendMessage (
final ServerMessage response,
final IntervalTimer<DiSLServer.ElapsedTime> timer
) throws IOException {
ByteBuffer sendBuffer = __allocDirect (4096);
//
// Note: the CodedOutputStream remembers buffer position at
// creation time, so anything put into a byte buffer after
// CodedOutputStream has been created will be lost/corrupted.
//
// Also, it cannot be reused and needs to be created for
// every message packed.
//
__log.trace ("packing instrumentation response");
final int responseLength = response.getSerializedSize ();
sendBuffer.clear ();
sendBuffer.putInt (responseLength);
if (sendBuffer.remaining () < responseLength) {
sendBuffer = __expandBuffer (sendBuffer, responseLength);
__log.debug ("expanded send buffer to %d bytes", sendBuffer.capacity ());
}
if (responseLength > 0) {
final CodedOutputStream sendStream = CodedOutputStream.newInstance (
sendBuffer);
response.writeTo (sendStream);
sendStream.flush ();
}
timer.mark (DiSLServer.ElapsedTime.PACK);
//
__log.trace ("sending instrumentation response");
sendBuffer.flip ();
__bufferSendTo (sendBuffer, __clientSocket);
timer.mark (DiSLServer.ElapsedTime.TRANSMIT);
}
private ClientMessage receiveMessage(
final IntervalTimer<DiSLServer.ElapsedTime> timer,
final CounterSet<DiSLServer.ElapsedTime> stats
) throws IOException {
final ByteBuffer headBuffer = __allocDirect (4096);
ByteBuffer recvBuffer = __allocDirect (4096);
// message receive
__log.trace ("receiving instrumentation request");
headBuffer.clear ();
__bufferRecvFrom (__clientSocket, Integer.BYTES, headBuffer);
headBuffer.flip ();
final int messageLength = headBuffer.getInt ();
__log.trace ("expecting message of length %d", messageLength);
if (messageLength == 0) {
__log.debug ("received empty message, exiting");
timer.mark (DiSLServer.ElapsedTime.RECEIVE);
stats.update (timer);
return null;
}
recvBuffer.clear ();
if (recvBuffer.remaining () < messageLength) {
recvBuffer = __expandBuffer (recvBuffer, messageLength);
__log.debug ("expanded receive buffer to %d bytes", recvBuffer.capacity ());
}
__bufferRecvFrom (__clientSocket, messageLength, recvBuffer);
timer.mark (DiSLServer.ElapsedTime.RECEIVE);
//
__log.trace ("unpacking instrumentation request");
recvBuffer.flip ();
final CodedInputStream recvStream = CodedInputStream.newInstance (recvBuffer);
return ClientMessage.parseFrom (recvStream);
}
private void __bufferSendTo (
final ByteBuffer buffer, final SocketChannel sc
) throws IOException {
......
......@@ -29,7 +29,7 @@ public final class DiSLServer {
private static final String PROP_CONT = "dislserver.continuous";
public static final boolean continuous = Boolean.getBoolean (PROP_CONT);
final boolean continuous = Boolean.getBoolean (PROP_CONT);
//
......@@ -39,15 +39,15 @@ public final class DiSLServer {
//
public enum ElapsedTime {
enum ElapsedTime {
RECEIVE, UNPACK, PROCESS, PACK, TRANSMIT
}
//
public final AtomicInteger __workerCount = new AtomicInteger ();
final AtomicInteger __workerCount = new AtomicInteger ();
public final CounterSet<ElapsedTime> __globalStats = new CounterSet<ElapsedTime> (
final CounterSet<ElapsedTime> __globalStats = new CounterSet<ElapsedTime> (
ElapsedTime.class);
//
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment