Commit ddac3ef8 authored by Lukáš Marek's avatar Lukáš Marek

Implemented better shutdown for java buffering

parent 7a0fabce
......@@ -1234,6 +1234,9 @@ static void send_thread_buffers(struct tldata * tld) {
tld->pb = NULL;
}
// forward declaration
static void cleanup_jb_workers(JNIEnv * jni_env);
void JNICALL jvmti_callback_vm_death_hook(
jvmtiEnv *jvmti_env, JNIEnv* jni_env
) {
......@@ -1243,6 +1246,8 @@ void JNICALL jvmti_callback_vm_death_hook(
printf("Shutting down (thread %ld)\n", tld_get()->id);
#endif
cleanup_jb_workers(jni_env);
// send all buffers for total order
send_all_to_buffers();
......@@ -1603,6 +1608,14 @@ JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendClass
// *****************************************************************************
static int jb_initialized = FALSE;
// java buffering threads
#define JB_WORKER_THREADS 3
static pthread_t jb_workers[JB_WORKER_THREADS];
static volatile int no_jb_work = FALSE;
static jobject all_buffs;
static jobject full_buffs;
static jobject empty_buffs;
......@@ -1610,8 +1623,12 @@ static jobject empty_buffs;
static jclass cl_buffer_pool;
// JBBufferPool methods
static jmethodID mtd_get_empty;
static jmethodID mtd_put_full;
static jmethodID mtd_get_full;
static jmethodID mtd_put_empty;
static jmethodID mtd_full_queue_size;
// JBBuffer methods
static jmethodID mtd_get_data_array;
......@@ -1623,8 +1640,6 @@ static jmethodID mtd_get_tag_obj_array;
static jmethodID mtd_get_tag_pos_array;
static jmethodID mtd_get_tag_ele_count;
#define JB_WORKER_THREADS 3
// NOTE: this tagging can cooperate with normal tagging thread
static void jb_object_tagging(JNIEnv * jni_env, process_buffs * pb,
jint tag_size, jobjectArray tag_objects_jarray, jint * tag_pos_array) {
......@@ -1660,8 +1675,8 @@ static void * jb_worker_loop(void * obj) {
JNIEnv * jni_env;
(*java_vm)->AttachCurrentThreadAsDaemon(java_vm, (void **)&jni_env, NULL);
// TODO ! do better stopping
while(TRUE) {
// exit when requested
while(no_jb_work) {
// ** buff acquire **
......@@ -1772,8 +1787,11 @@ JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_jb_JBBufferPool_register
cl_buffer_pool = get_java_class(jni_env, "ch/usi/dag/dislre/jb/JBBufferPool");
mtd_get_empty = get_java_static_method_id(jni_env, cl_buffer_pool, "getEmpty", "()Lch/usi/dag/dislre/jb/JBBuffer;");
mtd_put_full = get_java_static_method_id(jni_env, cl_buffer_pool, "putFull", "(Lch/usi/dag/dislre/jb/JBBuffer;)V");
mtd_get_full = get_java_static_method_id(jni_env, cl_buffer_pool, "getFull", "()Lch/usi/dag/dislre/jb/JBBuffer;");
mtd_put_empty = get_java_static_method_id(jni_env, cl_buffer_pool, "putEmpty", "(Lch/usi/dag/dislre/jb/JBBuffer;)V");
mtd_full_queue_size = get_java_static_method_id(jni_env, cl_buffer_pool, "fullQueueSize", "()I");
jclass cl_buffer = get_java_class(jni_env, "ch/usi/dag/dislre/jb/JBBuffer");
......@@ -1795,8 +1813,44 @@ JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_jb_JBBufferPool_register
int i;
for(i = 0; i < JB_WORKER_THREADS; ++i) {
pthread_t pthread;
int pcr = pthread_create(&pthread, NULL, jb_worker_loop, NULL);
int pcr = pthread_create(&jb_workers[i], NULL, jb_worker_loop, NULL);
check_error(pcr != 0, "Failed to create worker thread");
}
jb_initialized = TRUE;
}
static jint full_queue_size(JNIEnv * jni_env) {
return (*jni_env)->CallStaticIntMethod(jni_env, cl_buffer_pool, mtd_full_queue_size);
}
static void cleanup_jb_workers(JNIEnv * jni_env) {
if(jb_initialized) {
// wait for full buffer queue to get empty
while(full_queue_size(jni_env) == 0) {
sleep(1);
}
no_jb_work = TRUE;
// send empty buff according to the thread count
// this should provoke the threads to check no_jb_work
int i;
for(i = 0; i < JB_WORKER_THREADS; ++i) {
jobject obj_buff = (*jni_env)->CallStaticObjectMethod(jni_env, cl_buffer_pool, mtd_get_empty);
(*jni_env)->CallStaticVoidMethod(jni_env, cl_buffer_pool, mtd_put_full, obj_buff);
}
// wait for thread exits
int j;
for(j = 0; j < JB_WORKER_THREADS; ++j) {
int rc = pthread_join(jb_workers[j], NULL);
check_error(rc != 0, "Cannot join java buffering thread.");
}
}
}
......@@ -61,6 +61,9 @@ public final class JBBuffer {
public void analysisStart(short analysisMethodId) {
// initialization (see below) should not be in init() because sometimes
// we need really empty buffer
// initialize buffer
if (requestCount == 0) {
......
......@@ -13,6 +13,7 @@ public class JBBufferPool {
// TODO ! JB - better exception handling
// TODO ! variable not in sync with native code
public static final int NUMBER_OF_FETCHING_THREADS = 4;
private static final int BUFFERS = NUMBER_OF_FETCHING_THREADS * 10;
......@@ -43,6 +44,7 @@ public class JBBufferPool {
}
}
// called (also) by native code - during cleanup
public static JBBuffer getEmpty() {
while(true) {
......@@ -58,6 +60,7 @@ public class JBBufferPool {
}
}
// called (also) by native code - during cleanup
public static void putFull(JBBuffer buffer) {
boolean done = false;
......@@ -75,7 +78,7 @@ public class JBBufferPool {
}
}
// called by the native code
// called by native code
private static JBBuffer getFull() {
// get buffer
......@@ -91,7 +94,7 @@ public class JBBufferPool {
}
}
// called (also) by the native code
// called (also) by native code
private static void putEmpty(JBBuffer buffer) {
boolean done = false;
......@@ -109,6 +112,11 @@ public class JBBufferPool {
}
}
}
// called by native code
private static int fullQueueSize() {
return fullBuffers.size();
}
private static native void register(
List<JBBuffer> allBuffers,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment