Commit 48ece151 authored by Lukáš Marek's avatar Lukáš Marek

dropped java buffering support

parent 9bde191a
......@@ -1234,9 +1234,6 @@ static void send_thread_buffers(struct tldata * tld) {
tld->pb = NULL;
}
// forward declaration
static void cleanup_jb_workers(JNIEnv * jni_env);
void JNICALL jvmti_callback_vm_death_hook(
jvmtiEnv *jvmti_env, JNIEnv* jni_env
) {
......@@ -1246,8 +1243,6 @@ void JNICALL jvmti_callback_vm_death_hook(
printf("Shutting down (thread %ld)\n", tld_get()->id);
#endif
cleanup_jb_workers(jni_env);
// send all buffers for total order
send_all_to_buffers();
......@@ -1605,260 +1600,3 @@ JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendClass
struct tldata * tld = tld_get ();
pack_class(jni_env, tld->analysis_buff, tld->command_buff, to_send);
}
// *****************************************************************************
static int jb_initialized = FALSE;
// java buffering threads
#define JB_WORKER_THREADS 3
static pthread_t jb_workers[JB_WORKER_THREADS];
static volatile int no_jb_work = FALSE;
static jobject all_buffs;
static jobject full_buffs;
static jobject empty_buffs;
static jclass cl_buffer_pool;
// JBBufferPool methods
static jmethodID mtd_get_empty;
static jmethodID mtd_put_full;
static jmethodID mtd_get_full;
static jmethodID mtd_put_empty;
static jmethodID mtd_full_queue_size;
// JBBuffer methods
static jmethodID mtd_get_data_array;
static jmethodID mtd_get_data_array_size;
static jmethodID mtd_get_object_tb;
// JBBuffer methods
static jmethodID mtd_get_tag_obj_array;
static jmethodID mtd_get_tag_pos_array;
static jmethodID mtd_get_tag_ele_count;
// NOTE: this tagging can cooperate with normal tagging thread
static void jb_object_tagging(JNIEnv * jni_env, process_buffs * pb,
jint tag_size, jobjectArray tag_objects_jarray, jint * tag_pos_array) {
// TODO ! local references ???
// tag the objects - with lock
enter_critical_section(jvmti_env, tagging_lock);
{
jsize i;
for(i = 0; i < tag_size; ++i) {
jint pos = tag_pos_array[i];
jobject obj = (*jni_env)->GetObjectArrayElement(jni_env,
tag_objects_jarray, i);
ot_tag_object(jni_env, pb->analysis_buff, pos, obj,
pb->command_buff);
(*jni_env)->DeleteLocalRef(jni_env, obj);
}
}
exit_critical_section(jvmti_env, tagging_lock);
}
// TODO !! buffers are not sorted correctly - buffer id needed for each thread
static void * jb_worker_loop(void * obj) {
// TODO ! local references ???
// TODO ! err check
JNIEnv * jni_env;
(*java_vm)->AttachCurrentThreadAsDaemon(java_vm, (void **)&jni_env, NULL);
// exit when requested
while(! no_jb_work) {
// ** buff acquire **
// acquire full buffer
jobject obj_buff = (*jni_env)->CallStaticObjectMethod(jni_env, cl_buffer_pool, mtd_get_full);
// get size of the data
jint data_size = (*jni_env)->CallIntMethod(jni_env, obj_buff, mtd_get_data_array_size);
// get data array object
jbyteArray data_jarray = (*jni_env)->CallObjectMethod(jni_env, obj_buff, mtd_get_data_array);
// get data as C array
jbyte * data = (*jni_env)->GetByteArrayElements(jni_env, data_jarray, NULL);
// get tagging object
jobject obj_tag_buffer = (*jni_env)->CallObjectMethod(jni_env, obj_buff, mtd_get_object_tb);
// get size of the tag buffers
jint tag_size = (*jni_env)->CallIntMethod(jni_env, obj_tag_buffer, mtd_get_tag_ele_count);
// get tag object array as object
jobjectArray tag_objects_jarray = (*jni_env)->CallObjectMethod(jni_env, obj_tag_buffer, mtd_get_tag_obj_array);
// get tag pos array as object
jintArray tag_pos_jarray = (*jni_env)->CallObjectMethod(jni_env, obj_tag_buffer, mtd_get_tag_pos_array);
// get tag pos as C array
jint * tag_pos = (*jni_env)->GetIntArrayElements(jni_env, tag_pos_jarray, NULL);
// ** buff processing **
// acquire sending buffer
// TODO ! const
process_buffs * pb = buffs_get(-12345);
// TODO perf improvement - you are copying the data twice
// copy data to the analysis buffer
buffer_fill(pb->analysis_buff, data, data_size);
// tag data
jb_object_tagging(jni_env, pb, tag_size, tag_objects_jarray, tag_pos);
// dispatch sending buffer
buffs_send(pb);
// ** buff return **
// return processed buffer - the reset (to empty) is done in java
(*jni_env)->CallStaticVoidMethod(jni_env, cl_buffer_pool, mtd_put_empty, obj_buff);
// ** tmp buff + local ref release **
// JNI_ABORT releases the memory without copying the data back to java array
(*jni_env)->ReleaseByteArrayElements(jni_env, data_jarray, data, JNI_ABORT);
(*jni_env)->ReleaseIntArrayElements(jni_env, tag_pos_jarray, tag_pos, JNI_ABORT);
(*jni_env)->DeleteLocalRef(jni_env, obj_buff);
(*jni_env)->DeleteLocalRef(jni_env, data_jarray);
(*jni_env)->DeleteLocalRef(jni_env, obj_tag_buffer);
(*jni_env)->DeleteLocalRef(jni_env, tag_objects_jarray);
(*jni_env)->DeleteLocalRef(jni_env, tag_pos_jarray);
}
// TODO ! free thread local memory and release thread local global jobject references
return NULL;
}
static jclass get_java_class(JNIEnv * jni_env, char * class_name) {
jclass jclazz = (*jni_env)->FindClass(jni_env, class_name);
// TODO ! use check_error
if (jclazz == NULL) {
printf("Class not found: %s\n", class_name);
exit(1);
}
return jclazz;
}
static jmethodID get_java_method_id(JNIEnv * jni_env, jclass class_interface,
char * method_name, char * method_signature) {
jmethodID jmid = (*jni_env)->GetMethodID(jni_env, class_interface,
method_name, method_signature);
// TODO ! use check_error
if (jmid == NULL) {
printf("Method not found: %s\n", method_name);
exit(1);
}
return jmid;
}
static jmethodID get_java_static_method_id(JNIEnv * jni_env,
jclass class_interface, char * method_name, char * method_signature) {
jmethodID jmid = (*jni_env)->GetStaticMethodID(jni_env, class_interface,
method_name, method_signature);
// TODO ! use check_error
if (jmid == NULL) {
printf("Method not found: %s\n", method_name);
exit(1);
}
return jmid;
}
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_jb_JBBufferPool_register
(JNIEnv * jni_env, jclass this_class, jobject all_buffers,
jobject full_buffers, jobject empty_buffers) {
all_buffs = (*jni_env)->NewGlobalRef(jni_env, all_buffers);
full_buffs = (*jni_env)->NewGlobalRef(jni_env, full_buffers);
empty_buffs = (*jni_env)->NewGlobalRef(jni_env, empty_buffers);
cl_buffer_pool = get_java_class(jni_env, "ch/usi/dag/dislre/jb/JBBufferPool");
mtd_get_empty = get_java_static_method_id(jni_env, cl_buffer_pool, "getEmpty", "()Lch/usi/dag/dislre/jb/JBBuffer;");
mtd_put_full = get_java_static_method_id(jni_env, cl_buffer_pool, "putFull", "(Lch/usi/dag/dislre/jb/JBBuffer;)V");
mtd_get_full = get_java_static_method_id(jni_env, cl_buffer_pool, "getFull", "()Lch/usi/dag/dislre/jb/JBBuffer;");
mtd_put_empty = get_java_static_method_id(jni_env, cl_buffer_pool, "putEmpty", "(Lch/usi/dag/dislre/jb/JBBuffer;)V");
mtd_full_queue_size = get_java_static_method_id(jni_env, cl_buffer_pool, "fullQueueSize", "()I");
jclass cl_buffer = get_java_class(jni_env, "ch/usi/dag/dislre/jb/JBBuffer");
mtd_get_data_array = get_java_method_id(jni_env, cl_buffer, "getDataAsArray", "()[B");
mtd_get_data_array_size = get_java_method_id(jni_env, cl_buffer, "sizeInBytes", "()I");
mtd_get_object_tb = get_java_method_id(jni_env, cl_buffer, "getObjectTB", "()Lch/usi/dag/dislre/jb/TagBuffer;");
jclass cl_tag_buffer = get_java_class(jni_env, "ch/usi/dag/dislre/jb/TagBuffer");
mtd_get_tag_obj_array = get_java_method_id(jni_env, cl_tag_buffer, "getObjArray", "()[Ljava/lang/Object;");
mtd_get_tag_pos_array = get_java_method_id(jni_env, cl_tag_buffer, "getPosArray", "()[I");
mtd_get_tag_ele_count = get_java_method_id(jni_env, cl_tag_buffer, "size", "()I");
// TODO !
//jfieldID jfid = (*jni_env)->GetStaticFieldID(jni_env, buffer_holder, "NUMBER_OF_SENDER_THREADS", "I");
//WORKER_THREADS = (*jni_env)->GetStaticIntField(jni_env, buffer_holder, jfid);
int i;
for(i = 0; i < JB_WORKER_THREADS; ++i) {
int pcr = pthread_create(&jb_workers[i], NULL, jb_worker_loop, NULL);
check_error(pcr != 0, "Failed to create worker thread");
}
jb_initialized = TRUE;
}
static jint full_queue_size(JNIEnv * jni_env) {
return (*jni_env)->CallStaticIntMethod(jni_env, cl_buffer_pool, mtd_full_queue_size);
}
static void cleanup_jb_workers(JNIEnv * jni_env) {
if(jb_initialized) {
// wait for full buffer queue to get empty
while(full_queue_size(jni_env) != 0) {
sleep(1);
}
no_jb_work = TRUE;
// send empty buff according to the thread count
// this should provoke the threads to check no_jb_work
int i;
for(i = 0; i < JB_WORKER_THREADS; ++i) {
jobject obj_buff = (*jni_env)->CallStaticObjectMethod(jni_env, cl_buffer_pool, mtd_get_empty);
(*jni_env)->CallStaticVoidMethod(jni_env, cl_buffer_pool, mtd_put_full, obj_buff);
}
// wait for thread exits
int j;
for(j = 0; j < JB_WORKER_THREADS; ++j) {
int rc = pthread_join(jb_workers[j], NULL);
check_error(rc != 0, "Cannot join java buffering thread.");
}
}
}
package ch.usi.dag.dislre.jb;
import java.io.DataOutputStream;
import java.io.IOException;
// goal of this class is to mimic the buffering of the native agent
public final class JBBuffer {
// TODO ! JB - replace with extensible arrays and number of analysis
private static final int ONE_K = 1024;
private static final int THRESHOLD = 50 * ONE_K;
private static final int MAX_SIZE = THRESHOLD + 2 * ONE_K;
// TODO ! JB - add constant from server
private static final byte ANALYSIS_MSG_NUM = 1;
private Long ownerThread;
private UglyByteArrayOutputStream uos = new UglyByteArrayOutputStream(
MAX_SIZE);
private DataOutputStream dos = new DataOutputStream(uos);
private int requestCount;
private int requestCountPos;
private int requestLenPos;
private TagBuffer objectTB = new TagBuffer(MAX_SIZE);
public void init() {
ownerThread = Thread.currentThread().getId();
}
public void reset() {
uos.reset();
objectTB.reset();
ownerThread = null;
requestCount = 0;
requestCountPos = -1;
requestLenPos = -1;
}
public long getOwnerThread() {
return ownerThread;
}
public byte[] getDataAsArray() {
return uos.getBuffer();
}
public int sizeInBytes() {
return uos.size();
}
public TagBuffer getObjectTB() {
return objectTB;
}
public void analysisStart(short analysisMethodId) {
// initialization (see below) should not be in init() because sometimes
// we need really empty buffer
// initialize buffer
if (requestCount == 0) {
// msg type
putByte(ANALYSIS_MSG_NUM);
// TODO ! JB - this number is non-negative - problem with total
// order buffers
// thread id
putLong(ownerThread);
requestCountPos = sizeInBytes();
// request count space init
putInt(requestCount);
}
// analysis method id
putShort(analysisMethodId);
requestLenPos = sizeInBytes();
// request len space init
putShort((short) 0);
}
// indicates whether buffer is full
public boolean analysisEnd() {
// update the length of analysis request
int requestLen = sizeInBytes() - requestLenPos
- (Short.SIZE / Byte.SIZE);
uos.setPosition(requestLenPos);
putShort((short) requestLen);
uos.resetPosition();
++requestCount;
uos.setPosition(requestCountPos);
putInt(requestCount);
uos.resetPosition();
// send the buffer
if (sizeInBytes() > THRESHOLD) {
return true;
}
return false;
}
public void putBoolean(boolean toPut) {
try {
dos.writeBoolean(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putByte(byte toPut) {
try {
dos.writeByte(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putChar(char toPut) {
try {
dos.writeChar(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putShort(short toPut) {
try {
dos.writeShort(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putInt(int toPut) {
try {
dos.writeInt(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putLong(long toPut) {
try {
dos.writeLong(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putFloat(int toPut) {
try {
dos.writeFloat(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putDouble(long toPut) {
try {
dos.writeDouble(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putObject(Object toPut) {
objectTB.add(toPut, sizeInBytes());
// net reference type is long
putLong(0);
}
}
package ch.usi.dag.dislre.jb;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// TODO ! JB - this class is not unique
// a) instance from some jcl class
// b) pretend that we are jcl class
public class JBBufferPool {
// TODO ! JB - better exception handling
// TODO ! variable not in sync with native code
public static final int NUMBER_OF_FETCHING_THREADS = 4;
private static final int BUFFERS = NUMBER_OF_FETCHING_THREADS * 10;
private static final List<JBBuffer> allBuffers;
private static final BlockingQueue<JBBuffer> fullBuffers;
private static final BlockingQueue<JBBuffer> emptyBuffers;
static {
try {
allBuffers = new ArrayList<JBBuffer>(BUFFERS);
fullBuffers = new ArrayBlockingQueue<JBBuffer>(BUFFERS);
emptyBuffers = new ArrayBlockingQueue<JBBuffer>(BUFFERS);
// fill desired queues
for(int i = 0; i < BUFFERS; i++) {
allBuffers.add(new JBBuffer());
emptyBuffers.put(new JBBuffer());
}
register(allBuffers, fullBuffers, emptyBuffers);
} catch(Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
// called (also) by native code - during cleanup
public static JBBuffer getEmpty() {
while(true) {
try {
JBBuffer buffer = emptyBuffers.take();
buffer.init();
return buffer;
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
// called (also) by native code - during cleanup
public static void putFull(JBBuffer buffer) {
boolean done = false;
while(!done) {
try {
fullBuffers.put(buffer);
done = true;
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
// called by native code
private static JBBuffer getFull() {
// get buffer
while(true) {
try {
return fullBuffers.take();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
// called (also) by native code
private static void putEmpty(JBBuffer buffer) {
boolean done = false;
while(! done) {
try {
buffer.reset();
emptyBuffers.put(buffer);
done = true;
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
// called by native code
private static int fullQueueSize() {
return fullBuffers.size();
}
private static native void register(
List<JBBuffer> allBuffers,
BlockingQueue<JBBuffer> fullBuffers,
BlockingQueue<JBBuffer> emptyBuffers);
}
package ch.usi.dag.dislre.jb;
import ch.usi.dag.dislre.REDispatch;
public class REDispatchJ {
// TODO ! JB - this is not unique in the case of multiple classloaders
static final ThreadLocal<JBBuffer> buffer = new ThreadLocal<JBBuffer>() {
@Override
protected JBBuffer initialValue() {
return JBBufferPool.getEmpty();
}
};
/**
* Register method and receive id for this transmission
*
* @param analysisMethodDesc
* @return
*/
public static short registerMethod(String analysisMethodDesc) {
return REDispatch.registerMethod(analysisMethodDesc);
}
/**
* Announce start of an analysis transmission
*
* @param analysisMethodDesc remote analysis method id
*/
public static void analysisStart(short analysisMethodId) {
buffer.get().analysisStart(analysisMethodId);
}
/**
* Announce start of an analysis transmission with total ordering (among
* several threads) under the same orderingId
*
* @param analysisMethodId remote analysis method id
* @param orderingId analyses with the same orderingId are guaranteed to
* be ordered. Only non-negative values are valid.
*/
public static void analysisStart(short analysisMethodId,
byte orderingId) {
// TODO ! JB
}
/**
* Announce end of an analysis transmission
*/
public static void analysisEnd() {
JBBuffer buff = buffer.get();
// submit full buffer and obtain empty one
if(buff.analysisEnd()) {
JBBufferPool.putFull(buff);
buffer.set(JBBufferPool.getEmpty());
}
}
// allows transmitting types
public static void sendBoolean(boolean booleanToSend) {
buffer.get().putBoolean(booleanToSend);
}
public static void sendByte(byte byteToSend) {
buffer.get().putByte(byteToSend);
}
public static void sendChar(char charToSend) {
buffer.get().putChar(charToSend);
}
public static void sendShort(short shortToSend) {
buffer.get().putShort(shortToSend);
}
public static void sendInt(int intToSend) {
buffer.get().putInt(intToSend);