Commit 0a52618e authored by Vít Kabele's avatar Vít Kabele

Analysis RPC via protobuffs

parent c1460b46
Pipeline #4581 passed with stages
in 4 minutes and 7 seconds
......@@ -7,6 +7,8 @@ import ch.usi.dag.util.logging.Logger;
/**
* Request dispatcher will take the protocol buffer message and pass the data to appropriate method
* call.
*
* TODO: Consider making this class static, since it's only helper class with only one method.
*/
final class RequestDispatcher {
......@@ -17,6 +19,14 @@ final class RequestDispatcher {
__shadowVM = shadowVM;
}
/**
* Process the agent message, and pass the unpacked data to {@link ISHVM} instance.
*
* @param agentMessage
* @param logger
* @throws DiSLREServerException
*/
void ProcessMessage(final AgentMessage agentMessage, final Logger logger)
throws DiSLREServerException
{
......@@ -24,7 +34,12 @@ final class RequestDispatcher {
switch (agentMessage.getRequestCase ())
{
case ANALYZE:
__shadowVM.HandleAnalyze (agentMessage.getAnalyze ().getRawData ().toByteArray ());
Analyze analyze = agentMessage.getAnalyze ();
__shadowVM.HandleAnalyze (
analyze.getOrderingID (),
analyze.getInvocationCount (),
analyze.getRawData ().toByteArray ()
);
break;
case CLOSE:
__shadowVM.HandleClose ();
......@@ -91,6 +106,9 @@ final class RequestDispatcher {
break;
case REQUEST_NOT_SET:
default:
logger.error (
"Serious problem in data transfer: Unknown type of message from agent"
);
}
}
......
......@@ -7,7 +7,6 @@
package ch.usi.dag.dislreserver;
import java.io.*;
import java.nio.channels.ServerSocketChannel;
import ch.usi.dag.dislreserver.remoteanalysis.RemoteAnalysis;
import ch.usi.dag.util.logging.Logger;
......@@ -57,14 +56,17 @@ public class SHVM implements ISHVM {
* Process the batch of analysis invocations on the ShadowVM. Only shadowVM knows, how to
* unmarshall the data.
*
* @param orderingId The total ordering ID
* @param invocationCount Number of method invocations
* @param rawData Packed raw data
*/
@Override
public void HandleAnalyze (byte[] rawData) throws DiSLREServerException
public void HandleAnalyze (long orderingId, int invocationCount, byte[] rawData) throws DiSLREServerException
{
__analysisHandler
.handle (
orderingId,
invocationCount,
new DataInputStream (new ByteArrayInputStream (rawData))
);
}
......
......@@ -39,32 +39,30 @@ public final class AnalysisHandler {
dispatcher = new AnalysisDispatcher (__shvmContext);
}
public void handle (
final DataInputStream is
) throws DiSLREServerException {
try {
// get net reference for the thread
long orderingID = is.readLong ();
// read and create method invocations
final int invocationCount = is.readInt ();
if (invocationCount < 0) {
/**
* Handle the RPC request to analysis call.
*
* @param orderingID Total ordering ID
* @param invocationCount Number of method invocations
* @param is Input stream with the rest of the data.
* @throws DiSLREServerException
*/
public void handle (final long orderingID, final int invocationCount, final DataInputStream is)
throws DiSLREServerException
{
if (invocationCount < 0) {
throw new DiSLREServerException (String.format (
"invalid number of analysis invocation requests: %d",
invocationCount
));
}
}
List <AnalysisInvocation> invocations = __unmarshalInvocations (
invocationCount, is
);
List <AnalysisInvocation> invocations = __unmarshalInvocations (invocationCount, is);
dispatcher.addTask (orderingID, invocations);
dispatcher.addTask (orderingID, invocations);
} catch (final IOException ioe) {
throw new DiSLREServerException(ioe);
}
}
......
......@@ -13,7 +13,17 @@ public final class Protocol {
com.google.protobuf.MessageLiteOrBuilder {
/**
* <code>optional bytes RawData = 1;</code>
* <code>optional int64 OrderingID = 1;</code>
*/
long getOrderingID();
/**
* <code>optional int32 InvocationCount = 2;</code>
*/
int getInvocationCount();
/**
* <code>optional bytes RawData = 3;</code>
*/
com.google.protobuf.ByteString getRawData();
}
......@@ -28,16 +38,62 @@ public final class Protocol {
private Analyze() {
rawData_ = com.google.protobuf.ByteString.EMPTY;
}
public static final int RAWDATA_FIELD_NUMBER = 1;
public static final int ORDERINGID_FIELD_NUMBER = 1;
private long orderingID_;
/**
* <code>optional int64 OrderingID = 1;</code>
*/
public long getOrderingID() {
return orderingID_;
}
/**
* <code>optional int64 OrderingID = 1;</code>
*/
private void setOrderingID(long value) {
orderingID_ = value;
}
/**
* <code>optional int64 OrderingID = 1;</code>
*/
private void clearOrderingID() {
orderingID_ = 0L;
}
public static final int INVOCATIONCOUNT_FIELD_NUMBER = 2;
private int invocationCount_;
/**
* <code>optional int32 InvocationCount = 2;</code>
*/
public int getInvocationCount() {
return invocationCount_;
}
/**
* <code>optional int32 InvocationCount = 2;</code>
*/
private void setInvocationCount(int value) {
invocationCount_ = value;
}
/**
* <code>optional int32 InvocationCount = 2;</code>
*/
private void clearInvocationCount() {
invocationCount_ = 0;
}
public static final int RAWDATA_FIELD_NUMBER = 3;
private com.google.protobuf.ByteString rawData_;
/**
* <code>optional bytes RawData = 1;</code>
* <code>optional bytes RawData = 3;</code>
*/
public com.google.protobuf.ByteString getRawData() {
return rawData_;
}
/**
* <code>optional bytes RawData = 1;</code>
* <code>optional bytes RawData = 3;</code>
*/
private void setRawData(com.google.protobuf.ByteString value) {
if (value == null) {
......@@ -47,7 +103,7 @@ public final class Protocol {
rawData_ = value;
}
/**
* <code>optional bytes RawData = 1;</code>
* <code>optional bytes RawData = 3;</code>
*/
private void clearRawData() {
......@@ -56,8 +112,14 @@ public final class Protocol {
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (orderingID_ != 0L) {
output.writeInt64(1, orderingID_);
}
if (invocationCount_ != 0) {
output.writeInt32(2, invocationCount_);
}
if (!rawData_.isEmpty()) {
output.writeBytes(1, rawData_);
output.writeBytes(3, rawData_);
}
}
......@@ -66,9 +128,17 @@ public final class Protocol {
if (size != -1) return size;
size = 0;
if (orderingID_ != 0L) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(1, orderingID_);
}
if (invocationCount_ != 0) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(2, invocationCount_);
}
if (!rawData_.isEmpty()) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(1, rawData_);
.computeBytesSize(3, rawData_);
}
memoizedSerializedSize = size;
return size;
......@@ -157,13 +227,59 @@ public final class Protocol {
/**
* <code>optional bytes RawData = 1;</code>
* <code>optional int64 OrderingID = 1;</code>
*/
public long getOrderingID() {
return instance.getOrderingID();
}
/**
* <code>optional int64 OrderingID = 1;</code>
*/
public Builder setOrderingID(long value) {
copyOnWrite();
instance.setOrderingID(value);
return this;
}
/**
* <code>optional int64 OrderingID = 1;</code>
*/
public Builder clearOrderingID() {
copyOnWrite();
instance.clearOrderingID();
return this;
}
/**
* <code>optional int32 InvocationCount = 2;</code>
*/
public int getInvocationCount() {
return instance.getInvocationCount();
}
/**
* <code>optional int32 InvocationCount = 2;</code>
*/
public Builder setInvocationCount(int value) {
copyOnWrite();
instance.setInvocationCount(value);
return this;
}
/**
* <code>optional int32 InvocationCount = 2;</code>
*/
public Builder clearInvocationCount() {
copyOnWrite();
instance.clearInvocationCount();
return this;
}
/**
* <code>optional bytes RawData = 3;</code>
*/
public com.google.protobuf.ByteString getRawData() {
return instance.getRawData();
}
/**
* <code>optional bytes RawData = 1;</code>
* <code>optional bytes RawData = 3;</code>
*/
public Builder setRawData(com.google.protobuf.ByteString value) {
copyOnWrite();
......@@ -171,7 +287,7 @@ public final class Protocol {
return this;
}
/**
* <code>optional bytes RawData = 1;</code>
* <code>optional bytes RawData = 3;</code>
*/
public Builder clearRawData() {
copyOnWrite();
......@@ -200,6 +316,10 @@ public final class Protocol {
case VISIT: {
Visitor visitor = (Visitor) arg0;
ch.usi.dag.dislreserver.Protocol.Analyze other = (ch.usi.dag.dislreserver.Protocol.Analyze) arg1;
orderingID_ = visitor.visitLong(orderingID_ != 0L, orderingID_,
other.orderingID_ != 0L, other.orderingID_);
invocationCount_ = visitor.visitInt(invocationCount_ != 0, invocationCount_,
other.invocationCount_ != 0, other.invocationCount_);
rawData_ = visitor.visitByteString(rawData_ != com.google.protobuf.ByteString.EMPTY, rawData_,
other.rawData_ != com.google.protobuf.ByteString.EMPTY, other.rawData_);
if (visitor == com.google.protobuf.GeneratedMessageLite.MergeFromVisitor
......@@ -226,7 +346,17 @@ public final class Protocol {
}
break;
}
case 10: {
case 8: {
orderingID_ = input.readInt64();
break;
}
case 16: {
invocationCount_ = input.readInt32();
break;
}
case 26: {
rawData_ = input.readBytes();
break;
......
......@@ -30,9 +30,11 @@ public interface ISHVM {
* Process the batch of analysis invocations on the ShadowVM. Only shadowVM knows, how to
* unmarshall the data.
*
* @param orderingId The total ordering ID
* @param invocationCount Number of method invocations
* @param rawData Packed raw data
*/
void HandleAnalyze(byte[] rawData) throws DiSLREServerException;
void HandleAnalyze(long orderingId, int invocationCount, byte[] rawData) throws DiSLREServerException;
/**
* Handle the message about closing the connection and end of the analysis.
......
#include "bytebuffer.h"
#include "bufferpack.h"
#include "../shvm.pb-c.h"
#include "messagetype.h"
#include <stdint.h>
#include <stdlib.h>
......@@ -22,6 +23,8 @@
#include <endian.h>
#endif
#define INCOMPATIBLE_SIZES "Incompatible sizes of real and transport data types"
// ****************************************************************************
// DATA PACKING OPERATIONS
......@@ -145,10 +148,25 @@ buffer_repack_analysis(buffer_t * restrict source, buffer_t * destination)
if(buffer_length(source) <= 0)
return;
// Shift by raw data header. See messager_analyze_header()
uint8_t shift = sizeof (uint8_t);
// If the size of buffer is smaller then header, probably
// we get a corrupted buffer
assert(buffer_length(source) >= MSG_ANALYZE_HEADER_SIZE);
Analyze analyze = ANALYZE__INIT;
// Shift by raw data header. See messager_analyze_header()
uint8_t shift = sizeof (uint8_t); // MSG_ANALYZE
_Static_assert(sizeof (analyze.orderingid) >= sizeof (jlong),
INCOMPATIBLE_SIZES);
analyze.orderingid = htobe64(*((jlong *)(source->bytes + shift)));
shift += sizeof(jlong);
_Static_assert(sizeof (analyze.invocationcount) >= sizeof (jint),
INCOMPATIBLE_SIZES);
analyze.invocationcount = htonl(*((jint *)(source->bytes + shift)));
shift += sizeof(jint);
analyze.rawdata.len = source->position - shift;
analyze.rawdata.data = source->bytes + shift;
......@@ -169,12 +187,18 @@ buffer_repack_analysis(buffer_t * restrict source, buffer_t * destination)
void
buffer_repack_objfree(buffer_t * restrict source, buffer_t * destination)
{
uint8_t shift = sizeof (uint8_t) + sizeof (jint);
if (buffer_length(source) <= 0)
return;
uint8_t shift = sizeof (uint8_t);
ObjectFree objectFree = OBJECT_FREE__INIT;
_Static_assert(sizeof(jint) == sizeof(int32_t), "Unable to transport data");
objectFree.count = *(source->bytes + sizeof(uint8_t));
_Static_assert(sizeof (objectFree.count) >= sizeof (jint),
INCOMPATIBLE_SIZES);
objectFree.count = htonl(*((jint*)(source->bytes + shift)));
shift += sizeof (jint);
objectFree.rawdata.len = source->position - shift;
objectFree.rawdata.data = source->bytes + shift;
......
......@@ -19,6 +19,8 @@
//
#define MSG_ANALYZE_HEADER_SIZE (sizeof (uint8_t) + sizeof (jlong) + sizeof (jint))
void messager_close (buffer_t * buff);
size_t messager_analyze_header (buffer_t * buffer, jlong ordering_id);
......
......@@ -457,12 +457,36 @@ void agent_message__free_unpacked
assert(message->base.descriptor == &agent_message__descriptor);
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
}
static const ProtobufCFieldDescriptor analyze__field_descriptors[1] =
static const ProtobufCFieldDescriptor analyze__field_descriptors[3] =
{
{
"RawData",
"OrderingID",
1,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_INT64,
0, /* quantifier_offset */
offsetof(Analyze, orderingid),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"InvocationCount",
2,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_INT32,
0, /* quantifier_offset */
offsetof(Analyze, invocationcount),
NULL,
NULL,
0, /* flags */
0,NULL,NULL /* reserved1,reserved2, etc */
},
{
"RawData",
3,
PROTOBUF_C_LABEL_NONE,
PROTOBUF_C_TYPE_BYTES,
0, /* quantifier_offset */
offsetof(Analyze, rawdata),
......@@ -473,12 +497,14 @@ static const ProtobufCFieldDescriptor analyze__field_descriptors[1] =
},
};
static const unsigned analyze__field_indices_by_name[] = {
0, /* field[0] = RawData */
1, /* field[1] = InvocationCount */
0, /* field[0] = OrderingID */
2, /* field[2] = RawData */
};
static const ProtobufCIntRange analyze__number_ranges[1 + 1] =
{
{ 1, 0 },
{ 0, 1 }
{ 0, 3 }
};
const ProtobufCMessageDescriptor analyze__descriptor =
{
......@@ -488,7 +514,7 @@ const ProtobufCMessageDescriptor analyze__descriptor =
"Analyze",
"",
sizeof(Analyze),
1,
3,
analyze__field_descriptors,
analyze__field_indices_by_name,
1, analyze__number_ranges,
......
......@@ -35,11 +35,13 @@ typedef struct _AgentMessage AgentMessage;
struct _Analyze
{
ProtobufCMessage base;
int64_t orderingid;
int32_t invocationcount;
ProtobufCBinaryData rawdata;
};
#define ANALYZE__INIT \
{ PROTOBUF_C_MESSAGE_INIT (&analyze__descriptor) \
, {0,NULL} }
, 0, 0, {0,NULL} }
struct _Close
......
......@@ -5,7 +5,9 @@ option java_outer_classname = "Protocol";
option optimize_for = LITE_RUNTIME;
message Analyze {
bytes RawData = 1;
int64 OrderingID = 1;
int32 InvocationCount = 2;
bytes RawData = 3;
}
message Close {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment