Commit 24a94e9b authored by Lukáš Marek's avatar Lukáš Marek

First java buffering attempt

parent 41647933
......@@ -1600,3 +1600,201 @@ JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendClass
struct tldata * tld = tld_get ();
pack_class(jni_env, tld->analysis_buff, tld->command_buff, to_send);
}
// *****************************************************************************
static jobject all_buffs;
static jobject full_buffs;
static jobject empty_buffs;
static jclass cl_buffer_pool;
// JBBufferPool methods
static jmethodID mtd_get_full;
static jmethodID mtd_put_empty;
// JBBuffer methods
static jmethodID mtd_get_data_array;
static jmethodID mtd_get_data_array_size;
static jmethodID mtd_get_object_tb;
// JBBuffer methods
static jmethodID mtd_get_tag_obj_array;
static jmethodID mtd_get_tag_pos_array;
static jmethodID mtd_get_tag_ele_count;
#define JB_WORKER_THREADS 3
// NOTE: this tagging can cooperate with normal tagging thread
static void jb_object_tagging(JNIEnv * jni_env, process_buffs * pb,
jint tag_size, jobjectArray tag_objects_jarray, jint * tag_pos_array) {
// TODO ! local references ???
// tag the objects - with lock
enter_critical_section(jvmti_env, tagging_lock);
{
jsize i;
for(i = 0; i < tag_size; ++i) {
jint pos = tag_pos_array[i];
jobject obj = (*jni_env)->GetObjectArrayElement(jni_env,
tag_objects_jarray, i);
ot_tag_object(jni_env, pb->analysis_buff, pos, obj,
pb->command_buff);
}
}
exit_critical_section(jvmti_env, tagging_lock);
}
static void * jb_worker_loop(void * obj) {
// TODO ! local references ???
// TODO ! err check
JNIEnv * jni_env;
(*java_vm)->AttachCurrentThreadAsDaemon(java_vm, (void **)&jni_env, NULL);
// TODO ! do better stopping
while(TRUE) {
// ** buff acquire **
// acquire full buffer
jobject obj_buff = (*jni_env)->CallStaticObjectMethod(jni_env, cl_buffer_pool, mtd_get_full);
// get size of the data
jint data_size = (*jni_env)->CallIntMethod(jni_env, obj_buff, mtd_get_data_array_size);
// get data array object
jbyteArray data_jarray = (*jni_env)->CallObjectMethod(jni_env, obj_buff, mtd_get_data_array);
// get data as C array
jbyte * data = (*jni_env)->GetByteArrayElements(jni_env, data_jarray, NULL);
// get tagging object
jobject obj_tag_buffer = (*jni_env)->CallObjectMethod(jni_env, obj_buff, mtd_get_object_tb);
// get size of the tag buffers
jint tag_size = (*jni_env)->CallIntMethod(jni_env, obj_tag_buffer, mtd_get_tag_ele_count);
// get tag object array as object
jobjectArray tag_objects_jarray = (*jni_env)->CallObjectMethod(jni_env, obj_tag_buffer, mtd_get_tag_obj_array);
// get tag pos array as object
jintArray tag_pos_jarray = (*jni_env)->CallObjectMethod(jni_env, obj_tag_buffer, mtd_get_tag_pos_array);
// get tag pos as C array
jint * tag_pos = (*jni_env)->GetIntArrayElements(jni_env, tag_pos_jarray, NULL);
// ** buff processing **
// acquire sending buffer
// TODO ! const
process_buffs * pb = buffs_get(-12345);
// copy data to the analysis buffer
buffer_fill(pb->analysis_buff, data, data_size);
// tag data
jb_object_tagging(jni_env, pb, tag_size, tag_objects_jarray, tag_pos);
// dispatch sending buffer
buffs_send(pb);
// ** tmp buff release **
//TODO ! check args + checking of return value
(*jni_env)->ReleaseByteArrayElements(jni_env, data_jarray, data, JNI_ABORT);
(*jni_env)->ReleaseIntArrayElements(jni_env, tag_pos_jarray, tag_pos, JNI_ABORT);
// ** buff return **
// return processed buffer - the reset (to empty) is done in java
(*jni_env)->CallStaticVoidMethod(jni_env, cl_buffer_pool, mtd_put_empty, obj_buff);
}
// TODO ! free thread local memory and release thread local global jobject references
return NULL;
}
static jclass get_java_class(JNIEnv * jni_env, char * class_name) {
jclass jclazz = (*jni_env)->FindClass(jni_env, class_name);
// TODO ! use check_error
if (jclazz == NULL) {
printf("Class not found: %s\n", class_name);
exit(1);
}
return jclazz;
}
static jmethodID get_java_method_id(JNIEnv * jni_env, jclass class_interface,
char * method_name, char * method_signature) {
jmethodID jmid = (*jni_env)->GetMethodID(jni_env, class_interface,
method_name, method_signature);
// TODO ! use check_error
if (jmid == NULL) {
printf("Method not found: %s\n", method_name);
exit(1);
}
return jmid;
}
static jmethodID get_java_static_method_id(JNIEnv * jni_env,
jclass class_interface, char * method_name, char * method_signature) {
jmethodID jmid = (*jni_env)->GetStaticMethodID(jni_env, class_interface,
method_name, method_signature);
// TODO ! use check_error
if (jmid == NULL) {
printf("Method not found: %s\n", method_name);
exit(1);
}
return jmid;
}
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_jb_JBBufferPool_register
(JNIEnv * jni_env, jclass this_class, jobject all_buffers,
jobject full_buffers, jobject empty_buffers) {
all_buffs = (*jni_env)->NewGlobalRef(jni_env, all_buffers);
full_buffs = (*jni_env)->NewGlobalRef(jni_env, full_buffers);
empty_buffs = (*jni_env)->NewGlobalRef(jni_env, empty_buffers);
cl_buffer_pool = get_java_class(jni_env, "ch/usi/dag/dislre/jb/JBBufferPool");
mtd_get_full = get_java_static_method_id(jni_env, cl_buffer_pool, "getFull", "()Lch/usi/dag/dislre/jb/JBBuffer;");
mtd_put_empty = get_java_static_method_id(jni_env, cl_buffer_pool, "putEmpty", "(Lch/usi/dag/dislre/jb/JBBuffer;)V");
jclass cl_buffer = get_java_class(jni_env, "ch/usi/dag/dislre/jb/JBBuffer");
mtd_get_data_array = get_java_method_id(jni_env, cl_buffer, "getDataAsArray", "()[B");
mtd_get_data_array_size = get_java_method_id(jni_env, cl_buffer, "sizeInBytes", "()I");
mtd_get_object_tb = get_java_method_id(jni_env, cl_buffer, "getObjectTB", "()Lch/usi/dag/dislre/jb/TagBuffer;");
jclass cl_tag_buffer = get_java_class(jni_env, "ch/usi/dag/dislre/jb/TagBuffer");
mtd_get_tag_obj_array = get_java_method_id(jni_env, cl_tag_buffer, "getObjArray", "()[Ljava/lang/Object;");
mtd_get_tag_pos_array = get_java_method_id(jni_env, cl_tag_buffer, "getPosArray", "()[I");
mtd_get_tag_ele_count = get_java_method_id(jni_env, cl_tag_buffer, "size", "()I");
// TODO !
//jfieldID jfid = (*jni_env)->GetStaticFieldID(jni_env, buffer_holder, "NUMBER_OF_SENDER_THREADS", "I");
//WORKER_THREADS = (*jni_env)->GetStaticIntField(jni_env, buffer_holder, jfid);
int i;
for(i = 0; i < JB_WORKER_THREADS; ++i) {
pthread_t pthread;
int pcr = pthread_create(&pthread, NULL, jb_worker_loop, NULL);
check_error(pcr != 0, "Failed to create worker thread");
}
}
package ch.usi.dag.dislre.jb;
import java.io.DataOutputStream;
import java.io.IOException;
// goal of this class is to mimic the buffering of the native agent
public final class JBBuffer {
// TODO ! JB - replace with extensible arrays and number of analysis
private static final int ONE_K = 1024;
private static final int THRESHOLD = 50 * ONE_K;
private static final int MAX_SIZE = THRESHOLD + 2 * ONE_K;
// TODO ! JB - add constant from server
private static final byte ANALYSIS_MSG_NUM = 1;
private Long ownerThread;
private UglyByteArrayOutputStream uos = new UglyByteArrayOutputStream(MAX_SIZE);
private DataOutputStream dos = new DataOutputStream(uos);
private int requestCount;
private int requestCountPos;
private int requestLenPos;
private TagBuffer objectTB = new TagBuffer(MAX_SIZE);
public void init() {
ownerThread = Thread.currentThread().getId();
}
public void reset() {
uos.reset();
objectTB.reset();
ownerThread = null;
}
public long getOwnerThread() { return ownerThread; }
public byte[] getDataAsArray() {
return uos.getBuffer();
}
public int sizeInBytes() {
return uos.size();
}
public TagBuffer getObjectTB() {
return objectTB;
}
public void analysisStart(short analysisMethodId) {
// initialize buffer
if(requestCount == 0) {
// msg type
putByte(ANALYSIS_MSG_NUM);
// TODO ! JB - this number is non-negative - problem with total order buffers
// thread id
putLong(ownerThread);
requestCountPos = sizeInBytes();
// request count space init
putInt(requestCount);
}
// analysis method id
putShort(analysisMethodId);
requestLenPos = sizeInBytes();
// request len space init
putShort((short) 0);
}
// indicates whether buffer is full
public boolean analysisEnd() {
// update the length of analysis request
int requestLen = sizeInBytes() - requestCountPos - (Short.SIZE / Byte.SIZE);
uos.setPosition(requestLenPos);
putShort((short) requestLen);
uos.resetPosition();
++requestCount;
uos.setPosition(requestCountPos);
putInt(requestCount);
uos.resetPosition();
// send the buffer
if(sizeInBytes() > THRESHOLD) {
return true;
}
return false;
}
public void putBoolean(boolean toPut) {
try {
dos.writeBoolean(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putByte(byte toPut) {
try {
dos.writeByte(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putChar(char toPut) {
try {
dos.writeChar(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putShort(short toPut) {
try {
dos.writeShort(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putInt(int toPut) {
try {
dos.writeInt(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putLong(long toPut) {
try {
dos.writeLong(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putFloat(int toPut) {
try {
dos.writeFloat(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putDouble(long toPut) {
try {
dos.writeDouble(toPut);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void putObject(Object toPut) {
objectTB.add(toPut, sizeInBytes());
// net reference type is long
putLong(0);
}
}
package ch.usi.dag.dislre.jb;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// TODO ! JB - this class is not unique
// a) instance from some jcl class
// b) pretend that we are jcl class
public class JBBufferPool {
// TODO ! JB - better exception handling
public static final int NUMBER_OF_FETCHING_THREADS = 4;
private static final int BUFFERS = NUMBER_OF_FETCHING_THREADS * 10;
private static final List<JBBuffer> allBuffers;
private static final BlockingQueue<JBBuffer> fullBuffers;
private static final BlockingQueue<JBBuffer> emptyBuffers;
static {
try {
allBuffers = new ArrayList<JBBuffer>(BUFFERS);
fullBuffers = new ArrayBlockingQueue<JBBuffer>(BUFFERS);
emptyBuffers = new ArrayBlockingQueue<JBBuffer>(BUFFERS);
// fill desired queues
for(int i = 0; i < BUFFERS; i++) {
allBuffers.add(new JBBuffer());
emptyBuffers.put(new JBBuffer());
}
register(allBuffers, fullBuffers, emptyBuffers);
} catch(Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public static JBBuffer getEmpty() {
while(true) {
try {
JBBuffer buffer = emptyBuffers.take();
buffer.init();
return buffer;
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
public static void putFull(JBBuffer buffer) {
boolean done = false;
while(!done) {
try {
fullBuffers.put(buffer);
done = true;
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
// called by the native code
private static JBBuffer getFull() {
// get buffer
while(true) {
try {
return fullBuffers.take();
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
// called (also) by the native code
private static void putEmpty(JBBuffer buffer) {
boolean done = false;
while(! done) {
try {
buffer.reset();
emptyBuffers.put(buffer);
done = true;
}
catch(InterruptedException e) {
e.printStackTrace();
}
}
}
private static native void register(
List<JBBuffer> allBuffers,
BlockingQueue<JBBuffer> fullBuffers,
BlockingQueue<JBBuffer> emptyBuffers);
}
package ch.usi.dag.dislre.jb;
import ch.usi.dag.dislre.REDispatch;
public class REDispatchJ {
// TODO ! JB - this is not unique in the case of multiple classloaders
static final ThreadLocal<JBBuffer> buffer = new ThreadLocal<JBBuffer>() {
@Override
protected JBBuffer initialValue() {
return JBBufferPool.getEmpty();
}
};
/**
* Register method and receive id for this transmission
*
* @param analysisMethodDesc
* @return
*/
public static short registerMethod(String analysisMethodDesc) {
return REDispatch.registerMethod(analysisMethodDesc);
}
/**
* Announce start of an analysis transmission
*
* @param analysisMethodDesc remote analysis method id
*/
public static void analysisStart(short analysisMethodId) {
buffer.get().analysisStart(analysisMethodId);
}
/**
* Announce start of an analysis transmission with total ordering (among
* several threads) under the same orderingId
*
* @param analysisMethodId remote analysis method id
* @param orderingId analyses with the same orderingId are guaranteed to
* be ordered. Only non-negative values are valid.
*/
public static void analysisStart(short analysisMethodId,
byte orderingId) {
// TODO ! JB
}
/**
* Announce end of an analysis transmission
*/
public static void analysisEnd() {
JBBuffer buff = buffer.get();
// submit full buffer and obtain empty one
if(buff.analysisEnd()) {
JBBufferPool.putFull(buff);
buffer.set(JBBufferPool.getEmpty());
}
}
// allows transmitting types
public static void sendBoolean(boolean booleanToSend) {
buffer.get().putBoolean(booleanToSend);
}
public static void sendByte(byte byteToSend) {
buffer.get().putByte(byteToSend);
}
public static void sendChar(char charToSend) {
buffer.get().putChar(charToSend);
}
public static void sendShort(short shortToSend) {
buffer.get().putShort(shortToSend);
}
public static void sendInt(int intToSend) {
buffer.get().putInt(intToSend);
}
public static void sendLong(long longToSend) {
buffer.get().putLong(longToSend);
}
public static void sendFloat(int floatToSend) {
buffer.get().putFloat(floatToSend);
}
public static void sendDouble(long doubleToSend) {
buffer.get().putDouble(doubleToSend);
}
public static void sendString(String stringToSend) {
// TODO ! JB
}
public static void sendObject(Object objToSend) {
buffer.get().putObject(objToSend);
}
public static void sendClass(Class<?> classToSend) {
// TODO ! JB
}
}
package ch.usi.dag.dislre.jb;
public class TagBuffer {
private Object[] objArray;
private int[] posArray;
private int count;
public TagBuffer(int size) {
this.objArray = new Object[size];;
this.posArray = new int[size];
this.count = 0;
}
public void add(Object obj, int pos) {
// TODO ! JB - throws an exception if too many added, ok for now
objArray[count] = obj;
posArray[count] = pos;
++count;
}
public void reset() {
//set buffered object references to null
for(int i = 0; i < count; ++i) {
objArray[i] = null;
}
count = 0;
}
public Object[] getObjArray() {
return objArray;
}
public int[] getPosArray() {
return posArray;
}
public int size() {
return count;
}
}
package ch.usi.dag.dislre.jb;
import java.io.ByteArrayOutputStream;
// This class allows to write in the middle of the stream by setting the count
// value of the underlying ByteArrayInputStream to specific values
public class UglyByteArrayOutputStream extends ByteArrayOutputStream {
protected int realCount = -1;
public UglyByteArrayOutputStream() {
super();
}
public UglyByteArrayOutputStream(int size) {
super(size);
}
// until the resetPosition is called, buffer is treated as smaller
public void setPosition(int pos) {
// wrong argument, position can be set only in filled space
if(pos > count) {
throw new IllegalArgumentException(
"Position is higher then (filled) size of the buffer");
}
realCount = count;
count = pos;
}
public void resetPosition() {
// nothing to reset
if(realCount < count) {
return;
}
count = realCount;
realCount = -1;
}
public byte[] getBuffer() {
return buf;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment