Commit f372b209 authored by Lukáš Marek's avatar Lukáš Marek

Added support for total ordering

parent 85878831
......@@ -13,7 +13,6 @@ typedef struct {
unsigned char * buff;
size_t occupied;
size_t capacity;
volatile int available;
} buffer;
// ******************* Buffer routines *******************
......@@ -23,7 +22,6 @@ void buffer_alloc(buffer * b) {
b->buff = (unsigned char *) malloc(INIT_BUFF_SIZE);
b->capacity = INIT_BUFF_SIZE;
b->occupied = 0;
b->available = TRUE;
}
void buffer_free(buffer * b) {
......@@ -32,7 +30,6 @@ void buffer_free(buffer * b) {
b->buff = NULL;
b->capacity = 0;
b->occupied = 0;
b->available = TRUE;
}
void buffer_fill(buffer * b, const void * data, size_t data_length) {
......@@ -81,7 +78,6 @@ size_t buffer_filled(buffer * b) {
}
void buffer_clean(buffer * b) {
b->occupied = 0;
}
......
......@@ -89,6 +89,22 @@ typedef struct {
jobject obj_to_tag;
} objtag_rec;
// *** buffers for total ordering ***
typedef struct {
process_buffs * pb;
jint analysis_count;
size_t analysis_count_pos;
} to_buff_struct;
#define INVALID_BUFF_ID -1
#define BYTE_MAX_VAL 127
static jrawMonitorID to_buff_lock;
static to_buff_struct to_buff_array[BYTE_MAX_VAL];
// *** Protected by tagging lock ***
// can require other locks while holding this
......@@ -104,6 +120,8 @@ static volatile jshort avail_analysis_id = 1;
// *** Thread locals ***
static __thread jlong t_net_ref = 0;
static __thread process_buffs * t_local_pb = NULL;
static __thread jbyte t_to_buff_id = INVALID_BUFF_ID;
static __thread process_buffs * t_pb = NULL;
static __thread buffer * t_analysis_buff = NULL;
static __thread buffer * t_command_buff = NULL;
......@@ -436,6 +454,61 @@ static jlong tag_thread(JNIEnv * jni_env) {
return thread_net_ref;
}
void analysis_start_buff(JNIEnv * jni_env, jshort analysis_method_id,
jbyte buffer_id) {
#ifdef DEBUG
printf("Analysis (buffer) start enter (thread %ld)\n", t_net_ref);
#endif
check_error(buffer_id < 0, "Buffer id has negative value");
if(t_local_pb == NULL) {
// tag thread
if(t_net_ref == 0) {
t_net_ref = tag_thread(jni_env);
}
// get buffers
t_local_pb = buffs_get();
}
// set local buffers for this buffering
t_analysis_buff = t_local_pb->analysis_buff;
t_command_buff = t_local_pb->command_buff;
t_to_buff_id = buffer_id;
// analysis method desc
pack_short(t_analysis_buff, analysis_method_id);
#ifdef DEBUG
printf("Analysis (buffer) start exit (thread %ld)\n", t_net_ref);
#endif
}
static size_t createAnalysisMsg(buffer * buff, jlong id) {
// crate analysis message
// analysis msg
pack_byte(buff, MSG_ANALYZE);
// thread (total order buffer) id
pack_long(buff, id);
// get pointer to the location where count of requests will stored
size_t pos = buffer_filled(buff);
// space initialization
pack_int(buff, 0);
return pos;
}
static void analysis_start(JNIEnv * jni_env, jshort analysis_method_id) {
#ifdef DEBUG
......@@ -444,6 +517,7 @@ static void analysis_start(JNIEnv * jni_env, jshort analysis_method_id) {
if(t_analysis_buff == NULL) {
// tag thread
if(t_net_ref == 0) {
t_net_ref = tag_thread(jni_env);
......@@ -454,22 +528,11 @@ static void analysis_start(JNIEnv * jni_env, jshort analysis_method_id) {
t_analysis_buff = t_pb->analysis_buff;
t_command_buff = t_pb->command_buff;
// Crate analysis message
// analysis msg
pack_byte(t_analysis_buff, MSG_ANALYZE);
// thread id
pack_long(t_analysis_buff, t_net_ref);
// determines, how many analysis requests are send in one message
t_analysis_count = 0;
// get pointer to the location where count of requests will stored
t_analysis_count_pos = buffer_filled(t_analysis_buff);
// space initialization
pack_int(t_analysis_buff, 0);
// crate analysis message
t_analysis_count_pos = createAnalysisMsg(t_analysis_buff, t_net_ref);
}
// analysis method desc
......@@ -480,12 +543,108 @@ static void analysis_start(JNIEnv * jni_env, jshort analysis_method_id) {
#endif
}
static void analysis_end_buff() {
#ifdef DEBUG
printf("Analysis (buffer) end enter (thread %ld)\n", t_net_ref);
#endif
// TODO lock for each buffer id
// sending of half-full buffer is done in shutdown hook
// write analysis to total order buffer - with lock
enter_critical_section(jvmti_env, to_buff_lock);
{
// pointer to the total order buffer structure
to_buff_struct * tobs = &(to_buff_array[t_to_buff_id]);
// allocate new buffer
if(tobs->pb == NULL) {
tobs->pb = buffs_get();
// set thread_net_ref as t_buffid
tobs->pb->thread_net_ref = t_to_buff_id;
// determines, how many analysis requests are send in one message
tobs->analysis_count = 0;
// crate analysis message
tobs->analysis_count_pos = createAnalysisMsg(
tobs->pb->analysis_buff, t_to_buff_id);
}
// fill total order buffers
buffer_fill(tobs->pb->analysis_buff,
// NOTE: normally access the buffer using methods
t_local_pb->analysis_buff->buff,
t_local_pb->analysis_buff->occupied);
printf("Analysis buffer put %ld\n", t_local_pb->analysis_buff->occupied);
buffer_fill(tobs->pb->command_buff,
// NOTE: normally access the buffer using methods
t_local_pb->command_buff->buff,
t_local_pb->command_buff->occupied);
printf("Command buffer put %ld\n", t_local_pb->command_buff->occupied);
// empty local buffers
buffer_clean(t_local_pb->analysis_buff);
buffer_clean(t_local_pb->command_buff);
// add number of completed requests
++(tobs->analysis_count);
// buffer has to be updated each time because jvm could end and buffer
// has to be up-to date
buff_put_int(tobs->pb->analysis_buff, tobs->analysis_count_pos,
tobs->analysis_count);
// send only when the method count is reached
if(tobs->analysis_count >= ANALYSIS_COUNT) {
// send buffers for object tagging
buffs_objtag(tobs->pb);
// invalidate buffer pointer
tobs->pb = NULL;
}
}
exit_critical_section(jvmti_env, to_buff_lock);
// reset analysis and command buffers for normal buffering
if(t_pb != NULL) {
t_analysis_buff = t_pb->analysis_buff;
t_command_buff = t_pb->command_buff;
}
else {
t_analysis_buff = NULL;
t_command_buff = NULL;
}
// invalidate buffer id
t_to_buff_id = INVALID_BUFF_ID;
#ifdef DEBUG
printf("Analysis (buffer) end exit (thread %ld)\n", t_net_ref);
#endif
}
static void analysis_end() {
#ifdef DEBUG
printf("Analysis end enter (thread %ld)\n", t_net_ref);
#endif
// same method is called for end of the analysis for ordering buffer
if(t_to_buff_id >= 0) {
analysis_end_buff();
return;
}
// sending of half-full buffer is done in thread end hook
// add number of completed requests
......@@ -504,6 +663,9 @@ static void analysis_end() {
// send buffers for object tagging
buffs_objtag(t_pb);
// invalidate buffer pointer
t_pb = NULL;
}
#ifdef DEBUG
......@@ -513,7 +675,7 @@ static void analysis_end() {
// ******************* Object tagging thread *******************
// TODO ! add cache - ??
// TODO add cache - ??
static void ot_pack_string_cache(JNIEnv * jni_env, buffer * buff,
jstring to_send, jlong str_net_ref) {
......@@ -685,6 +847,7 @@ static void * objtag_thread_loop(void * obj) {
static void _send_buffer(buffer * b) {
// send data
// NOTE: normally access the buffer using methods
send_data(connection, b->buff, b->occupied);
}
......@@ -744,6 +907,9 @@ static void * send_thread_loop(void * obj) {
while(! (no_sending_work && bq_length(&send_q) == 0) ) {
// get buffer
// TODO thread could timeout here with timeout about 5 sec and check
// if all of the buffers are allocated by the application threads
// and all application threads are waiting on free buffer - deadlock
process_buffs * pb = _buffs_send_get();
#ifdef DEBUG
......@@ -830,6 +996,23 @@ void JNICALL jvmti_callback_class_file_load_hook(jvmtiEnv *jvmti_env,
// ******************* OBJECT FREE callback *******************
static void send_all_to_buffers() {
int i;
for(i = 0; i < BYTE_MAX_VAL; ++i) {
// send all buffers for occupied ids
if(to_buff_array[i].pb != NULL) {
// send buffers for object tagging
buffs_objtag(to_buff_array[i].pb);
// invalidate buffer pointer
to_buff_array[i].pb = NULL;
}
}
}
void JNICALL jvmti_callback_object_free_hook(jvmtiEnv *jvmti_env,
jlong tag) {
......@@ -837,9 +1020,12 @@ void JNICALL jvmti_callback_object_free_hook(jvmtiEnv *jvmti_env,
printf("Sending object free (thread %ld)\n", t_net_ref);
#endif
// send all buffers for total order
send_all_to_buffers();
// send new obj free message
// TODO ! buffer more msgs - ??
// TODO buffer more msgs (send buffer at shutdown) - ??
// obtain buffer
process_buffs * buffs = buffs_get();
buffer * buff = buffs->command_buff;
......@@ -896,7 +1082,8 @@ void JNICALL jvmti_callback_vm_death_hook(jvmtiEnv *jvmti_env,
printf("Shutting down (thread %ld)\n", t_net_ref);
#endif
// TODO ! send obj free buff
// send all buffers for total order
send_all_to_buffers();
// shutdown - first tagging then sending thread
......@@ -1091,6 +1278,10 @@ JNIEXPORT jint JNICALL Agent_OnLoad(JavaVM *jvm, char *options, void *reserved)
&tagging_lock);
check_jvmti_error(jvmti_env, error, "Cannot create raw monitor");
error = (*jvmti_env)->CreateRawMonitor(jvmti_env, "buffids",
&to_buff_lock);
check_jvmti_error(jvmti_env, error, "Cannot create raw monitor");
// read options (port/hostname)
parse_agent_options(options);
......@@ -1119,6 +1310,11 @@ JNIEXPORT jint JNICALL Agent_OnLoad(JavaVM *jvm, char *options, void *reserved)
_buffs_release(pb);
}
for(i = 0; i < BYTE_MAX_VAL; ++i) {
to_buff_array[i].pb = NULL;
}
return 0;
}
......@@ -1130,12 +1326,19 @@ JNIEXPORT jshort JNICALL Java_ch_usi_dag_dislre_REDispatch_registerMethod
return register_method(jni_env, analysis_method_desc);
}
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart__S
(JNIEnv * jni_env, jclass this_class, jshort analysis_method_id) {
analysis_start(jni_env, analysis_method_id);
}
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart__SB
(JNIEnv * jni_env, jclass this_class, jshort analysis_method_id,
jbyte buffer_id) {
analysis_start_buff(jni_env, analysis_method_id, buffer_id);
}
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisEnd
(JNIEnv * jni_env, jclass this_class) {
......
......@@ -25,9 +25,17 @@ JNIEXPORT jshort JNICALL Java_ch_usi_dag_dislre_REDispatch_registerMethod
* Method: analysisStart
* Signature: (S)V
*/
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart__S
(JNIEnv *, jclass, jshort);
/*
* Class: ch_usi_dag_dislre_REDispatch
* Method: analysisStart
* Signature: (SB)V
*/
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart__SB
(JNIEnv *, jclass, jshort, jbyte);
/*
* Class: ch_usi_dag_dislre_REDispatch
* Method: analysisEnd
......
......@@ -2,14 +2,31 @@ package ch.usi.dag.dislre;
public class REDispatch {
/**
* Register method and receive id for this transmission
*
* @param analysisMethodDesc
* @return
*/
public static native short registerMethod(String analysisMethodDesc);
/**
* Announce start of an analysis transmission
*
* @param analysisMethodDesc remote analysis method descriptor
* @param analysisMethodDesc remote analysis method id
*/
public static native void analysisStart(short analysisMethodId);
/**
* Announce start of an analysis transmission with total ordering (among
* several threads) under the same orderingId
*
* @param analysisMethodId remote analysis method id
* @param orderingId analyses with the same orderingId are guaranteed to
* be ordered. Only non-negative values are valid.
*/
public static native void analysisStart(short analysisMethodId,
byte orderingId);
/**
* Announce end of an analysis transmission
......
......@@ -26,7 +26,8 @@ public class CodeExecutedRE {
public static void bytecodesExecuted(int count) {
REDispatch.analysisStart(beId);
final byte orderingid = 1;
REDispatch.analysisStart(beId, orderingid);
REDispatch.sendInt(count);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment