dislreagent.c 50.1 KB
Newer Older
1 2 3 4 5 6 7 8
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <netdb.h>
#include <unistd.h>
#include <netinet/tcp.h>

9 10
#include <pthread.h>

11 12 13
#include <jvmti.h>
#include <jni.h>

14 15
// has to be defined for jvmtihelper.h
#define ERR_PREFIX "DiSL-RE agent error: "
16

17 18
#include "jvmtihelper.h"
#include "comm.h"
19

20 21 22
#include "messagetype.h"
#include "buffer.h"
#include "buffpack.h"
23 24
#include "blockingqueue.h"
#include "netref.h"
25 26 27 28 29

#include "dislreagent.h"

static const int ERR_SERVER = 10003;

30 31 32 33 34 35 36 37 38
// defaults - be sure that space in host_name is long enough
static const char * DEFAULT_HOST = "localhost";
static const char * DEFAULT_PORT = "11218";

// port and name of the instrumentation server
static char host_name[1024];
static char port_number[6]; // including final 0

static jvmtiEnv * jvmti_env;
39 40
static JavaVM * java_vm;

Lukáš Marek's avatar
Lukáš Marek committed
41
static int jvm_started = FALSE;
42

43 44
static volatile int no_tagging_work = FALSE;
static volatile int no_sending_work = FALSE;
Lukáš Marek's avatar
Lukáš Marek committed
45

46 47 48
// *** Sync queues ***

// queues contain process_buffs structure
Lukáš Marek's avatar
Lukáš Marek committed
49

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
// Utility queue (buffer) is specifically reserved for sending different
// messages then analysis messages. The rationale behind utility buffers is that
// at least one utility buffer is available or will be available in the near
// future no matter what. One of the two must hold:
// 1) Acquired buffer is send without any additional locking in between.
// 2) Particular "usage" (place of use) may request only constant number of
//    buffers. Place and constant is described right below and the queue size is
//    sum of all constants here

//    buffer for case 1)                     1
//    object free message                    1
//    new class info message                 1
//    just to be sure (parallelism for 1)    3
#define BQ_UTILITY 6

// queue with empty utility buffers
static blocking_queue utility_q;

// number of all buffers - used for analysis with some exceptions
#define BQ_BUFFERS 32

// number of analysis requests in one message
#define ANALYSIS_COUNT 16384


// queue with empty buffers
static blocking_queue empty_q;
77

78 79
// queue where buffers are queued for sending
static blocking_queue send_q;
Lukáš Marek's avatar
Lukáš Marek committed
80

81 82
// queue where buffers are queued for object
static blocking_queue objtag_q;
83

84 85 86
typedef struct {
	buffer * command_buff;
	buffer * analysis_buff;
87
	jlong owner_id;
88 89 90
} process_buffs;

// list of all allocated bq buffers
91
static process_buffs pb_list[BQ_BUFFERS + BQ_UTILITY];
92 93

#define OT_OBJECT 1
94
#define OT_DATA_OBJECT 2
95 96 97 98 99 100

typedef struct {
	unsigned char obj_type;
	size_t buff_pos;
	jobject obj_to_tag;
} objtag_rec;
101

102 103 104 105 106 107 108 109 110 111
// *** buffers for total ordering ***

typedef struct {
	process_buffs * pb;
	jint analysis_count;
	size_t analysis_count_pos;
} to_buff_struct;

#define INVALID_BUFF_ID -1

112 113
#define TO_BUFFER_MAX_ID 127 // byte is the holding type
#define TO_BUFFER_COUNT (TO_BUFFER_MAX_ID + 1) // +1 for buffer id 0
114 115 116

static jrawMonitorID to_buff_lock;

117
static to_buff_struct to_buff_array[TO_BUFFER_COUNT];
118

119 120 121 122 123 124 125 126 127 128
// *** buffer for object free ***

#define MAX_OBJ_FREE_EVENTS 4096

static process_buffs * obj_free_buff = NULL;
static jint obj_free_event_count = 0;
static size_t obj_free_event_count_pos = 0;

static jrawMonitorID obj_free_lock;

129 130
// *** Protected by tagging lock ***
// can require other locks while holding this
Lukáš Marek's avatar
Lukáš Marek committed
131

132 133
#define NULL_NET_REF 0

134
static jrawMonitorID tagging_lock;
135

136 137 138 139 140 141
// first available id for object tagging
static volatile jlong avail_object_id = 1;
static volatile jint avail_class_id = 1;

// first available id for new messages
static volatile jshort avail_analysis_id = 1;
142

143 144 145 146
// *** Thread ids ***

#define INVALID_THREAD_ID -1

Lukáš Marek's avatar
Lukáš Marek committed
147 148
#define STARTING_THREAD_ID (TO_BUFFER_MAX_ID + 1)

149
// initial ids are reserved for total ordering buffers
Lukáš Marek's avatar
Lukáš Marek committed
150
static volatile jlong avail_thread_id = STARTING_THREAD_ID;
151

152

153
// *** Thread locals ***
Lukáš Marek's avatar
Lukáš Marek committed
154

155 156 157 158 159
// NOTE: The JVMTI functionality allows to implement everything
// using JVM, but the GNU implementation is faster and WORKING


struct tldata {
160 161 162 163 164 165 166 167 168
	jlong id;
	process_buffs * local_pb;
	jbyte to_buff_id;
	process_buffs * pb;
	buffer * analysis_buff;
	buffer * command_buff;
	jint analysis_count;
	size_t analysis_count_pos;
	size_t args_length_pos;
169 170 171 172 173 174 175 176 177 178 179 180 181
};


#if defined (__APPLE__) && defined (__MACH__)

//
// Use pthreads on Mac OS X
//

static pthread_key_t tls_key;


static void tls_init () {
182 183
	int result = pthread_key_create (& tls_key, NULL);
	check_error(result != 0, "Failed to allocate thread-local storage key");
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
}


inline static struct tldata * tld_init (struct tldata * tld) {
	tld->id= INVALID_THREAD_ID;
	tld->local_pb = NULL;
	tld->to_buff_id = INVALID_BUFF_ID;
	tld->pb = NULL;
	tld->analysis_buff = NULL;
	tld->analysis_count = 0;
	tld->analysis_count_pos = 0;

	return tld;
}

static struct tldata * tld_create ()  {
	struct tldata * tld = malloc (sizeof (struct tldata));
	check_error (tld == NULL, "Failed to allocate thread-local data");
	int result = pthread_setspecific (tls_key, tld);
	check_error (result != 0, "Failed to store thread-local data");
	return tld_init (tld);
}

inline static struct tldata * tld_get () {
	struct tldata * tld = pthread_getspecific (tls_key);
	return (tld != NULL) ? tld : tld_create ();
}

#else

//
// Use GNU __thread where supported
//

static void tls_init () {
219
	// empty
220 221 222 223
}


static __thread struct tldata tld = {
224 225 226 227 228 229 230 231
		.id = INVALID_THREAD_ID,
		.local_pb = NULL,
		.to_buff_id = INVALID_BUFF_ID,
		.pb = NULL,
		.analysis_buff = NULL,
		.command_buff = NULL,
		.analysis_count = 0,
		.analysis_count_pos = 0,
232 233 234 235 236 237 238
};

inline static struct tldata * tld_get () {
	return & tld;
}

#endif
Lukáš Marek's avatar
Lukáš Marek committed
239

240 241 242 243 244

// *** Threads ***

static pthread_t objtag_thread;
static pthread_t send_thread;
Lukáš Marek's avatar
Lukáš Marek committed
245

246 247
// ******************* Helper routines *******************

248
static void parse_agent_options(char *options) {
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

	static const char PORT_DELIM = ':';

	// assign defaults
	strcpy(host_name, DEFAULT_HOST);
	strcpy(port_number, DEFAULT_PORT);

	// no options found
	if (options == NULL) {
		return;
	}

	char * port_start = strchr(options, PORT_DELIM);

	// process port number
	if(port_start != NULL) {

		// replace PORT_DELIM with end of the string (0)
		port_start[0] = '\0';

		// move one char forward to locate port number
		++port_start;

		// convert number
		int fitsP = strlen(port_start) < sizeof(port_number);
274
		check_error(! fitsP, "Port number is too long");
275 276 277 278 279 280

		strcpy(port_number, port_start);
	}

	// check if host_name is big enough
	int fitsH = strlen(options) < sizeof(host_name);
281
	check_error(! fitsH, "Host name is too long");
282 283 284 285

	strcpy(host_name, options);
}

Lukáš Marek's avatar
Lukáš Marek committed
286
// ******************* Advanced buffer routines *******************
287

288 289 290
// owner_id can have several states
// > 0 && <= TO_BUFFER_MAX_ID
//    - means that buffer is reserved for total ordering events
Lukáš Marek's avatar
Lukáš Marek committed
291
// >= STARTING_THREAD_ID
292 293
//    - means that buffer is owned by some thread that is marked
// == -1 - means that buffer is owned by some thread that is NOT tagged
294

295
// == PB_FREE - means that buffer is currently free
296
static const jlong PB_FREE = -100;
297

298
// == PB_OBJTAG - means that buffer is scheduled (processed) for object tagging
299
static const jlong PB_OBJTAG = -101;
300

301
// == PB_SEND - means that buffer is scheduled (processed) for sending
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
static const jlong PB_SEND = -102;

// == PB_UTILITY - means that this is special utility buffer
static const jlong PB_UTILITY = -1000;

static process_buffs * buffs_utility_get() {
#ifdef DEBUG
	printf("Acquiring buffer -- utility (thread %ld)\n", tld_get()->id);
#endif

	// retrieves pointer to buffer
	process_buffs * buffs;
	bq_pop(&utility_q, &buffs);

	// no owner setting - it is already PB_UTILITY

#ifdef DEBUG
	printf("Buffer acquired -- utility (thread %ld)\n", tld_get()->id);
#endif

	return buffs;
}

static process_buffs * buffs_utility_send(process_buffs * buffs) {
#ifdef DEBUG
	printf("Queuing buffer (utility) -- send (thread %ld)\n", tld_get()->id);
#endif

	// no owner setting - it is already PB_UTILITY
	bq_push(&send_q, &buffs);

#ifdef DEBUG
	printf("Buffer queued (utility) -- send (thread %ld)\n", tld_get()->id);
#endif

	return buffs;
}

// normally only sending thread should access this function
static void _buffs_utility_release(process_buffs * buffs) {
#ifdef DEBUG
	printf("Queuing buffer -- utility (thread %ld)\n", tld_get()->id);
#endif

	// empty buff
	buffer_clean(buffs->analysis_buff);
	buffer_clean(buffs->command_buff);

	// stores pointer to buffer
	buffs->owner_id = PB_UTILITY;
	bq_push(&utility_q, &buffs);

#ifdef DEBUG
	printf("Buffer queued -- utility (thread %ld)\n", tld_get()->id);
#endif
}
358

359
static process_buffs * buffs_get(jlong thread_id) {
360
#ifdef DEBUG
361
	printf("Acquiring buffer -- empty (thread %ld)\n", tld_get()->id);
362 363
#endif

364
	// retrieves pointer to buffer
365
	process_buffs * buffs;
366
	bq_pop(&empty_q, &buffs);
367

368
	buffs->owner_id = thread_id;
Lukáš Marek's avatar
Lukáš Marek committed
369

370
#ifdef DEBUG
371
	printf("Buffer acquired -- empty (thread %ld)\n", tld_get()->id);
372 373
#endif

374
	return buffs;
375 376
}

377
// normally only sending thread should access this function
378
static void _buffs_release(process_buffs * buffs) {
379
#ifdef DEBUG
380
	printf("Queuing buffer -- empty (thread %ld)\n", tld_get()->id);
381 382
#endif

383 384 385
	// empty buff
	buffer_clean(buffs->analysis_buff);
	buffer_clean(buffs->command_buff);
Lukáš Marek's avatar
Lukáš Marek committed
386

387
	// stores pointer to buffer
388
	buffs->owner_id = PB_FREE;
389
	bq_push(&empty_q, &buffs);
390 391

#ifdef DEBUG
392
	printf("Buffer queued -- empty (thread %ld)\n", tld_get()->id);
393
#endif
Lukáš Marek's avatar
Lukáš Marek committed
394 395
}

396
static void buffs_objtag(process_buffs * buffs) {
397
#ifdef DEBUG
398
	printf("Queuing buffer -- objtag (thread %ld)\n", tld_get()->id);
399 400
#endif

401
	buffs->owner_id = PB_OBJTAG;
402
	bq_push(&objtag_q, &buffs);
403 404

#ifdef DEBUG
405
	printf("Buffer queued -- objtag (thread %ld)\n", tld_get()->id);
406
#endif
407
}
408

409 410
// only objtag thread should access this function
static process_buffs * _buffs_objtag_get() {
411
#ifdef DEBUG
412
	printf("Acquiring buffer -- objtag (thread %ld)\n", tld_get()->id);
413 414
#endif

415 416
	process_buffs * buffs;
	bq_pop(&objtag_q, &buffs);
417

418
#ifdef DEBUG
419
	printf("Buffer acquired -- objtag (thread %ld)\n", tld_get()->id);
420 421
#endif

422 423
	return buffs;
}
424

425
static void _buffs_send(process_buffs * buffs) {
426
#ifdef DEBUG
427
	printf("Queuing buffer -- send (thread %ld)\n", tld_get()->id);
428 429
#endif

430
	buffs->owner_id = PB_SEND;
431
	bq_push(&send_q, &buffs);
432 433

#ifdef DEBUG
434
	printf("Buffer queued -- send (thread %ld)\n", tld_get()->id);
435
#endif
436 437
}

438 439
// only sending thread should access this function
static process_buffs * _buffs_send_get() {
440
#ifdef DEBUG
441
	printf("Acquiring buffer -- send (thread %ld)\n", tld_get()->id);
442 443
#endif

444 445
	process_buffs * buffs;
	bq_pop(&send_q, &buffs);
Lukáš Marek's avatar
Lukáš Marek committed
446

447
#ifdef DEBUG
448
	printf("Buffer acquired -- send (thread %ld)\n", tld_get()->id);
449 450
#endif

451
	return buffs;
Lukáš Marek's avatar
Lukáš Marek committed
452 453
}

454
// ******************* Advanced packing routines *******************
455

456 457
static void _fill_ot_rec(JNIEnv * jni_env, buffer * cmd_buff,
		unsigned char ot_type, buffer * buff, jstring to_send) {
458

459 460 461 462 463 464 465 466
	// crate object tagging record
	objtag_rec ot_rec;
	// type of object to be tagged
	ot_rec.obj_type = ot_type;
	// position in the buffer, where the data will be stored during tagging
	ot_rec.buff_pos = buffer_filled(buff);
	// global reference to the object to be tagged
	ot_rec.obj_to_tag = (*jni_env)->NewGlobalRef(jni_env, to_send);
467

468 469
	// save to command buff
	buffer_fill(cmd_buff, &ot_rec, sizeof(ot_rec));
470 471
}

472
static void pack_object(JNIEnv * jni_env, buffer * buff, buffer * cmd_buff,
473
		jobject to_send, unsigned char object_type) {
Lukáš Marek's avatar
Lukáš Marek committed
474

Lukáš Marek's avatar
Lukáš Marek committed
475 476
	// create entry for object tagging thread that will replace the null ref
	if(to_send != NULL) {
477
		_fill_ot_rec(jni_env, cmd_buff, object_type, buff, to_send);
Lukáš Marek's avatar
Lukáš Marek committed
478
	}
479

Lukáš Marek's avatar
Lukáš Marek committed
480
	// pack null net reference
481
	pack_long(buff, NULL_NET_REF);
482 483
}

484 485 486 487 488
static void buff_put_short(buffer * buff, size_t buff_pos, jshort to_put) {
	// put the short at the position in network order
	jshort nts = htons(to_put);
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jshort));
}
489

490

491 492 493
static void buff_put_int(buffer * buff, size_t buff_pos, jint to_put) {
	// put the int at the position in network order
	jint nts = htonl(to_put);
494
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jint));
495 496
}

497
static void buff_put_long(buffer * buff, size_t buff_pos, jlong to_put) {
498
	// put the long at the position in network order
499 500
	jlong nts = htobe64(to_put);
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jlong));
501 502 503
}


504
// ******************* analysis helper methods *******************
505

506
static jshort next_analysis_id () {
507 508 509
	// get id for this method string
	// this could use different lock then tagging but it should not be a problem
	// and it will be used rarely - bit unoptimized
510 511

	jshort result = -1;
512 513
	enter_critical_section(jvmti_env, tagging_lock);
	{
514
		result = avail_analysis_id++;
515 516
	}
	exit_critical_section(jvmti_env, tagging_lock);
517

518 519 520 521
	return result;
}

static jshort register_method(
522
		JNIEnv * jni_env, jstring analysis_method_desc,
523
		jlong thread_id) {
524 525 526 527
#ifdef DEBUG
	printf("Registering method (thread %ld)\n", tld_get()->id);
#endif

528 529 530 531 532 533 534 535
	// *** send register analysis method message ***

	// request unique id
	jshort new_analysis_id = next_analysis_id();

	// get string length
	jsize str_len =
			(*jni_env)->GetStringUTFLength(jni_env, analysis_method_desc);
536

537 538 539 540
	// get string data as utf-8
	const char * str =
			(*jni_env)->GetStringUTFChars(jni_env, analysis_method_desc, NULL);
	check_error(str == NULL, "Cannot get string from java");
541

542 543 544
	// check if the size is sendable
	int size_fits = str_len < UINT16_MAX;
	check_error(! size_fits, "Java string is too big for sending");
545

546
	// obtain buffer
547
	process_buffs * buffs = buffs_utility_get();
548
	buffer * buff = buffs->analysis_buff;
549

550 551 552 553 554
	// msg id
	pack_byte(buff, MSG_REG_ANALYSIS);
	// new id for analysis method
	pack_short(buff, new_analysis_id);
	// method descriptor
555
	pack_string_utf8(buff, str, str_len);
556

557
	// send message
558 559 560 561
	buffs_utility_send(buffs);

	// release string
	(*jni_env)->ReleaseStringUTFChars(jni_env, analysis_method_desc, str);
562

563
#ifdef DEBUG
564
	printf("Method registered (thread %ld)\n", tld_get()->id);
565 566
#endif

567
	return new_analysis_id;
568 569 570
}


571
static jlong next_thread_id () {
572
#ifdef DEBUG
573
	printf("Marking thread (thread %ld)\n", tld_get()->id);
574
#endif
575
	// mark the thread - with lock
576
	// TODO replace total ordering lock with private lock - perf. issue
577
	jlong result = -1;
578
	enter_critical_section(jvmti_env, to_buff_lock);
579
	{
580
		result = avail_thread_id++;
581
	}
582
	exit_critical_section(jvmti_env, to_buff_lock);
583

584
#ifdef DEBUG
585
	printf("Thread marked (thread %ld)\n", result);
586
#endif
587
	return result;
588
}
589

590

591
static size_t create_analysis_request_header (
592
		buffer * buff, jshort analysis_method_id
593 594 595 596 597 598 599 600 601 602 603 604 605 606
) {
	// analysis method id
	pack_short(buff, analysis_method_id);

	// position of the short indicating the length of marshalled arguments
	size_t pos = buffer_filled(buff);

	// initial value of the length of the marshalled arguments
	pack_short(buff, 0xBAAD);

	return pos;
}


607
void analysis_start_buff(
608 609
		JNIEnv * jni_env, jshort analysis_method_id, jbyte ordering_id,
		struct tldata * tld
610
) {
611
#ifdef DEBUGANL
612
	printf("Analysis (buffer) start enter (thread %ld)\n", tld_get()->id);
613 614
#endif

615
	check_error(ordering_id < 0, "Buffer id has negative value");
616

617 618 619 620 621 622 623 624 625 626 627 628 629 630
	// flush normal buffers before each global buffering
	if(tld->analysis_buff != NULL) {
		// invalidate buffer pointers
		tld->analysis_buff = NULL;
		tld->command_buff = NULL;

		// send buffers for object tagging
		buffs_objtag(tld->pb);

		// invalidate buffer pointer
		tld->pb = NULL;
	}

	// allocate special local buffer for this buffering
631
	if(tld->local_pb == NULL) {
632
		// mark thread
633 634
		if(tld->id == INVALID_THREAD_ID) {
			tld->id = next_thread_id ();
635 636 637
		}

		// get buffers
638
		tld->local_pb = buffs_get(tld->id);
639 640 641
	}

	// set local buffers for this buffering
642 643
	tld->analysis_buff = tld->local_pb->analysis_buff;
	tld->command_buff = tld->local_pb->command_buff;
644

645
	tld->to_buff_id = ordering_id;
646

647 648
	// create request header, keep track of the position
	// of the length of marshalled arguments
649
	tld->args_length_pos = create_analysis_request_header(tld->analysis_buff, analysis_method_id);
650

651
#ifdef DEBUGANL
652
	printf("Analysis (buffer) start exit (thread %ld)\n", tld_get()->id);
653 654 655 656
#endif
}


657
static size_t create_analysis_msg(buffer * buff, jlong id) {
658
	// create analysis message
659 660 661 662 663 664 665 666 667 668

	// analysis msg
	pack_byte(buff, MSG_ANALYZE);

	// thread (total order buffer) id
	pack_long(buff, id);

	// get pointer to the location where count of requests will stored
	size_t pos = buffer_filled(buff);

669 670
	// request count space initialization
	pack_int(buff, 0xBAADF00D);
671 672 673 674

	return pos;
}

675 676


677
static void analysis_start(
678 679
		JNIEnv * jni_env, jshort analysis_method_id,
		struct tldata * tld
680
) {
681
#ifdef DEBUGANL
682
	printf("Analysis start enter (thread %ld)\n", tld_get()->id);
683 684
#endif

685
	if(tld->analysis_buff == NULL) {
686

687
		// mark thread
688 689
		if(tld->id == INVALID_THREAD_ID) {
			tld->id = next_thread_id ();
690
		}
691

692
		// get buffers
693 694 695
		tld->pb = buffs_get(tld->id);
		tld->analysis_buff = tld->pb->analysis_buff;
		tld->command_buff = tld->pb->command_buff;
696

697
		// determines, how many analysis requests are sent in one message
698
		tld->analysis_count = 0;
699

700
		// create analysis message
701
		tld->analysis_count_pos = create_analysis_msg(tld->analysis_buff, tld->id);
702 703
	}

704 705
	// create request header, keep track of the position
	// of the length of marshalled arguments
706
	tld->args_length_pos = create_analysis_request_header(tld->analysis_buff, analysis_method_id);
707

708
#ifdef DEBUGANL
709
	printf("Analysis start exit (thread %ld)\n", tld_get()->id);
710
#endif
711 712
}

713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736
static void correct_cmd_buff_pos(buffer * cmd_buff, size_t shift) {

	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;

	objtag_rec ot_rec;

	// go through all records and shift the buffer position
	while(read < cmd_buff_len) {

		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));

		// shift buffer position
		ot_rec.buff_pos += shift;

		// write ot_rec data
		buffer_fill_at_pos(cmd_buff, read, &ot_rec, sizeof(ot_rec));

		// next
		read += sizeof(ot_rec);
	}
}

737
static void analysis_end_buff(struct tldata * tld) {
738
#ifdef DEBUGANL
739
	printf("Analysis (buffer) end enter (thread %ld)\n", tld_get()->id);
740 741 742 743
#endif

	// TODO lock for each buffer id

744
	// sending of half-full buffer is done in shutdown hook and obj free hook
745 746 747 748 749

	// write analysis to total order buffer - with lock
	enter_critical_section(jvmti_env, to_buff_lock);
	{
		// pointer to the total order buffer structure
750
		to_buff_struct * tobs = &(to_buff_array[tld->to_buff_id]);
751 752 753 754

		// allocate new buffer
		if(tobs->pb == NULL) {

755
			tobs->pb = buffs_get(tld->id);
756

757
			// set owner_id as t_buffid
758
			tobs->pb->owner_id = tld->to_buff_id;
759

760
			// determines, how many analysis requests are sent in one message
761 762
			tobs->analysis_count = 0;

763
			// create analysis message
764
			tobs->analysis_count_pos = create_analysis_msg(
765
					tobs->pb->analysis_buff, tld->to_buff_id);
766 767
		}

768 769 770 771
		// first correct positions in command buffer
		// records in command buffer are positioned according to the local
		// analysis buffer but we want the position to be valid in total ordered
		// buffer
772
		correct_cmd_buff_pos(tld->local_pb->command_buff,
773 774
				buffer_filled(tobs->pb->analysis_buff));

775 776 777
		// fill total order buffers
		buffer_fill(tobs->pb->analysis_buff,
				// NOTE: normally access the buffer using methods
778 779
				tld->local_pb->analysis_buff->buff,
				tld->local_pb->analysis_buff->occupied);
780 781 782

		buffer_fill(tobs->pb->command_buff,
				// NOTE: normally access the buffer using methods
783 784
				tld->local_pb->command_buff->buff,
				tld->local_pb->command_buff->occupied);
785 786

		// empty local buffers
787 788
		buffer_clean(tld->local_pb->analysis_buff);
		buffer_clean(tld->local_pb->command_buff);
789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810

		// add number of completed requests
		++(tobs->analysis_count);

		// buffer has to be updated each time because jvm could end and buffer
		// has to be up-to date
		buff_put_int(tobs->pb->analysis_buff, tobs->analysis_count_pos,
				tobs->analysis_count);

		// send only when the method count is reached
		if(tobs->analysis_count >= ANALYSIS_COUNT) {

			// send buffers for object tagging
			buffs_objtag(tobs->pb);

			// invalidate buffer pointer
			tobs->pb = NULL;
		}
	}
	exit_critical_section(jvmti_env, to_buff_lock);

	// reset analysis and command buffers for normal buffering
811 812 813 814
	// set to NULL, because we've send the buffers at the beginning of
	// global buffer buffering
	tld->analysis_buff = NULL;
	tld->command_buff = NULL;
815 816

	// invalidate buffer id
817
	tld->to_buff_id = INVALID_BUFF_ID;
818

819
#ifdef DEBUGANL
820
	printf("Analysis (buffer) end exit (thread %ld)\n", tld_get()->id);
821 822 823
#endif
}

824
static void analysis_end(struct tldata * tld) {
825 826 827 828
	// update the length of the marshalled arguments
	jshort args_length = buffer_filled(tld->analysis_buff) - tld->args_length_pos - sizeof (jshort);
	buff_put_short(tld->analysis_buff, tld->args_length_pos, args_length);

829
	// this method is also called for end of analysis for totally ordered API
830 831
	if(tld->to_buff_id != INVALID_BUFF_ID) {
		analysis_end_buff(tld);
832 833 834
		return;
	}

835
#ifdef DEBUGANL
836
	printf("Analysis end enter (thread %ld)\n", tld_get()->id);
837 838
#endif

839
	// sending of half-full buffer is done in thread end hook
840

Lukáš Marek's avatar
Lukáš Marek committed
841
	// increment the number of completed requests
842
	tld->analysis_count++;
843

Lukáš Marek's avatar
Lukáš Marek committed
844
	// buffer has to be updated each time - the thread can end any time
845
	buff_put_int(tld->analysis_buff, tld->analysis_count_pos, tld->analysis_count);
846

Lukáš Marek's avatar
Lukáš Marek committed
847
	// send only after the proper count is reached
848
	if(tld->analysis_count >= ANALYSIS_COUNT) {
849
		// invalidate buffer pointers
850 851
		tld->analysis_buff = NULL;
		tld->command_buff = NULL;
852

853
		// send buffers for object tagging
854
		buffs_objtag(tld->pb);
855 856

		// invalidate buffer pointer
857
		tld->pb = NULL;
858
	}
859

860
#ifdef DEBUGANL
861
	printf("Analysis end exit (thread %ld)\n", tld_get()->id);
862
#endif
863
}
864

865
// ******************* Object tagging thread *******************
866

867
// TODO add cache - ??
868

869 870 871 872
static jclass THREAD_CLASS = NULL;
static jclass STRING_CLASS = NULL;

static void ot_pack_string_data(JNIEnv * jni_env, buffer * buff,
873
		jstring to_send, jlong str_net_ref) {
874

875 876 877 878 879 880
	// get string length
	jsize str_len = (*jni_env)->GetStringUTFLength(jni_env, to_send);

	// get string data as utf-8
	const char * str = (*jni_env)->GetStringUTFChars(jni_env, to_send, NULL);
	check_error(str == NULL, "Cannot get string from java");
881

882 883 884
	// check if the size is sendable
	int size_fits = str_len < UINT16_MAX;
	check_error(! size_fits, "Java string is too big for sending");
885

886
	// add message to the buffer
887 888

	// msg id
889
	pack_byte(buff, MSG_STRING_INFO);
890 891 892 893
	// send string net reference
	pack_long(buff, str_net_ref);
	// send string
	pack_string_utf8(buff, str, str_len);
894

895 896
	// release string
	(*jni_env)->ReleaseStringUTFChars(jni_env, to_send, str);
897 898
}

899 900 901 902 903 904
static void ot_pack_thread_data(JNIEnv * jni_env, buffer * buff,
		jstring to_send, jlong thr_net_ref) {

	jvmtiThreadInfo info;
	jvmtiError error = (*jvmti_env)->GetThreadInfo(jvmti_env, to_send, &info);
	check_error(error != JVMTI_ERROR_NONE, "Cannot get tread info");
905

906 907 908 909 910 911 912 913 914 915 916 917 918
	// pack thread info message

	// msg id
	pack_byte(buff, MSG_THREAD_INFO);

	// thread object id
	pack_long(buff, thr_net_ref);

	// thread name
	pack_string_utf8(buff, info.name, strlen(info.name));

	// is daemon thread
	pack_boolean(buff, info.is_daemon);
919
}
920

921
static void update_send_status(jobject to_send, jlong * net_ref) {
922

923 924 925
	net_ref_set_spec(net_ref, TRUE);
	update_net_reference(jvmti_env, to_send, *net_ref);
}
926

927 928
static void ot_pack_aditional_data(JNIEnv * jni_env, jlong * net_ref,
		jobject to_send, unsigned char obj_type, buffer * new_objs_buff) {
929

930 931
	// NOTE: we don't use lock for updating send status, so it is possible
	// that multiple threads will send it, but this will hurt only performance
932

933 934 935
	// test if the data was already sent to the server
	if(net_ref_get_spec(*net_ref) == TRUE) {
		return;
936
	}
937

938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953
	// NOTE: Tests for class types could be done by buffering threads.
	//       It depends, where we want to have the load.

	// String - pack data
	if((*jni_env)->IsInstanceOf(jni_env, to_send, STRING_CLASS)) {

		update_send_status(to_send, net_ref);
		ot_pack_string_data(jni_env, new_objs_buff, to_send, *net_ref);
	}

	// Thread - pack data
	if((*jni_env)->IsInstanceOf(jni_env, to_send, THREAD_CLASS)) {

		update_send_status(to_send, net_ref);
		ot_pack_thread_data(jni_env, new_objs_buff, to_send, *net_ref);
	}
954
}
955

956 957
static void ot_tag_record(JNIEnv * jni_env, buffer * buff, size_t buff_pos,
		jobject to_send, unsigned char obj_type, buffer * new_objs_buff) {
958

959
	// get net reference
960 961
	jlong net_ref =
			get_net_reference(jni_env, jvmti_env, new_objs_buff, to_send);
962

963 964 965 966 967 968 969 970 971
	// send additional data
	if(obj_type == OT_DATA_OBJECT) {

		// NOTE: can update net reference (net_ref)
		ot_pack_aditional_data(jni_env, &net_ref, to_send, obj_type,
				new_objs_buff);
	}

	// update the net reference
972
	buff_put_long(buff, buff_pos, net_ref);
973
}
974

975 976
static void ot_tag_buff(JNIEnv * jni_env, buffer * anl_buff, buffer * cmd_buff,
		buffer * new_objs_buff) {
977

978 979
	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;
980

981
	objtag_rec ot_rec;
982

983
	while(read < cmd_buff_len) {
984

985 986 987
		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));
		read += sizeof(ot_rec);
988

989 990
		ot_tag_record(jni_env, anl_buff, ot_rec.buff_pos, ot_rec.obj_to_tag,
				ot_rec.obj_type, new_objs_buff);
991

992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
		// global references are released after buffer is send
	}
}

// TODO code dup with ot_tag_buff
static void ot_relese_global_ref(JNIEnv * jni_env, buffer * cmd_buff) {

	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;

	objtag_rec ot_rec;

	while(read < cmd_buff_len) {

		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));
		read += sizeof(ot_rec);

		// release global references
1011
		(*jni_env)->DeleteGlobalRef(jni_env, ot_rec.obj_to_tag);
1012
	}
1013 1014
}

1015
static void * objtag_thread_loop(void * obj) {
1016

1017 1018 1019 1020
#ifdef DEBUG
		printf("Object tagging thread start (thread %ld)\n", tld_get()->id);
#endif

1021 1022 1023 1024 1025
	// attach thread to jvm
	JNIEnv *jni_env;
	jvmtiError error = (*java_vm)->AttachCurrentThreadAsDaemon(java_vm,
			(void **)&jni_env, NULL);
	check_jvmti_error(jvmti_env, error, "Unable to attach objtag thread.");
1026

1027 1028 1029
	// one spare buffer for new objects
	buffer * new_obj_buff = malloc(sizeof(buffer));
	buffer_alloc(new_obj_buff);
1030

1031 1032 1033 1034 1035 1036 1037 1038
	// retrieve java types

	STRING_CLASS = (*jni_env)->FindClass(jni_env, "java/lang/String");
	check_error(STRING_CLASS == NULL, "String class not found");

	THREAD_CLASS = (*jni_env)->FindClass(jni_env, "java/lang/Thread");
	check_error(STRING_CLASS == NULL, "Thread class not found");

1039 1040
	// exit when the jvm is terminated and there are no msg to process
	while(! (no_tagging_work && bq_length(&objtag_q) == 0) ) {
1041

1042 1043
		// get buffer - before tagging lock
		process_buffs * pb = _buffs_objtag_get();
1044

1045
#ifdef DEBUG
1046
		printf("Object tagging started (thread %ld)\n", tld_get()->id);
1047 1048
#endif

1049 1050 1051
		// tag the objects - with lock
		enter_critical_section(jvmti_env, tagging_lock);
		{
1052

1053 1054 1055 1056
			// tag objcects from buffer
			// note that analysis buffer is not required
			ot_tag_buff(jni_env, pb->analysis_buff, pb->command_buff,
					new_obj_buff);
1057

1058
			// exchange command_buff and new_obj_buff
1059
			buffer * old_cmd_buff = pb->command_buff;
1060
			pb->command_buff = new_obj_buff;
1061

1062
			// send buffer
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
			_buffs_send(pb);

			// global references are released after buffer is send
			// this is critical for ensuring that proper ordering of events
			// is maintained - see object free event for more info

			ot_relese_global_ref(jni_env, old_cmd_buff);

			// clean old_cmd_buff and make it as new_obj_buff for the next round
			buffer_clean(old_cmd_buff);
			new_obj_buff = old_cmd_buff;
1074 1075
		}
		exit_critical_section(jvmti_env, tagging_lock);
1076 1077

#ifdef DEBUG
1078
		printf("Object tagging ended (thread %ld)\n", tld_get()->id);
1079
#endif
1080 1081
	}

1082 1083 1084
	buffer_free(new_obj_buff);
	free(new_obj_buff);
	new_obj_buff = NULL;
1085

1086 1087 1088 1089
#ifdef DEBUG
		printf("Object tagging thread end (thread %ld)\n", tld_get()->id);
#endif

1090 1091
	return NULL;
}
1092

1093
// ******************* Sending thread *******************
1094

1095
static void _send_buffer(int connection, buffer * b) {
1096

1097