dislreagent.c 51 KB
Newer Older
1 2 3 4 5 6 7 8
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <netdb.h>
#include <unistd.h>
#include <netinet/tcp.h>

9 10
#include <pthread.h>

11 12 13
#include <jvmti.h>
#include <jni.h>

14 15
// has to be defined for jvmtihelper.h
#define ERR_PREFIX "DiSL-RE agent error: "
16

17 18
#include "jvmtihelper.h"
#include "comm.h"
19

20 21 22
#include "messagetype.h"
#include "buffer.h"
#include "buffpack.h"
23 24
#include "blockingqueue.h"
#include "netref.h"
25 26 27 28

#include "dislreagent.h"


29 30 31 32 33 34 35 36 37
// defaults - be sure that space in host_name is long enough
static const char * DEFAULT_HOST = "localhost";
static const char * DEFAULT_PORT = "11218";

// port and name of the instrumentation server
static char host_name[1024];
static char port_number[6]; // including final 0

static jvmtiEnv * jvmti_env;
38 39
static JavaVM * java_vm;

Lukáš Marek's avatar
Lukáš Marek committed
40
static int jvm_started = FALSE;
41

42 43
static volatile int no_tagging_work = FALSE;
static volatile int no_sending_work = FALSE;
44

45 46 47
// *** Sync queues ***

// queues contain process_buffs structure
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
// Utility queue (buffer) is specifically reserved for sending different
// messages then analysis messages. The rationale behind utility buffers is that
// at least one utility buffer is available or will be available in the near
// future no matter what. One of the two must hold:
// 1) Acquired buffer is send without any additional locking in between.
// 2) Particular "usage" (place of use) may request only constant number of
//    buffers. Place and constant is described right below and the queue size is
//    sum of all constants here

//    buffer for case 1)                     1
//    object free message                    1
//    new class info message                 1
//    just to be sure (parallelism for 1)    3
#define BQ_UTILITY 6

// queue with empty utility buffers
static blocking_queue utility_q;

// number of all buffers - used for analysis with some exceptions
#define BQ_BUFFERS 32

// number of analysis requests in one message
#define ANALYSIS_COUNT 16384


// queue with empty buffers
static blocking_queue empty_q;
76

77 78
// queue where buffers are queued for sending
static blocking_queue send_q;
Lukáš Marek's avatar
Lukáš Marek committed
79

80 81
// queue where buffers are queued for object
static blocking_queue objtag_q;
82

83 84 85
typedef struct {
	buffer * command_buff;
	buffer * analysis_buff;
86
	jlong owner_id;
87 88 89
} process_buffs;

// list of all allocated bq buffers
90
static process_buffs pb_list[BQ_BUFFERS + BQ_UTILITY];
91 92

#define OT_OBJECT 1
93
#define OT_DATA_OBJECT 2
94 95 96 97 98 99

typedef struct {
	unsigned char obj_type;
	size_t buff_pos;
	jobject obj_to_tag;
} objtag_rec;
100

101 102 103 104 105 106 107 108 109 110
// *** buffers for total ordering ***

typedef struct {
	process_buffs * pb;
	jint analysis_count;
	size_t analysis_count_pos;
} to_buff_struct;

#define INVALID_BUFF_ID -1

111 112
#define TO_BUFFER_MAX_ID 127 // byte is the holding type
#define TO_BUFFER_COUNT (TO_BUFFER_MAX_ID + 1) // +1 for buffer id 0
113 114 115

static jrawMonitorID to_buff_lock;

116
static to_buff_struct to_buff_array[TO_BUFFER_COUNT];
117

118 119 120 121 122 123 124 125 126 127
// *** buffer for object free ***

#define MAX_OBJ_FREE_EVENTS 4096

static process_buffs * obj_free_buff = NULL;
static jint obj_free_event_count = 0;
static size_t obj_free_event_count_pos = 0;

static jrawMonitorID obj_free_lock;

128 129
// *** Protected by tagging lock ***
// can require other locks while holding this
130

131 132
#define NULL_NET_REF 0

133
static jrawMonitorID tagging_lock;
134

135 136 137 138 139 140
// first available id for object tagging
static volatile jlong avail_object_id = 1;
static volatile jint avail_class_id = 1;

// first available id for new messages
static volatile jshort avail_analysis_id = 1;
141

142 143 144 145
// *** Thread ids ***

#define INVALID_THREAD_ID -1

146 147
#define STARTING_THREAD_ID (TO_BUFFER_MAX_ID + 1)

148
// initial ids are reserved for total ordering buffers
149
static volatile jlong avail_thread_id = STARTING_THREAD_ID;
150

151

152
// *** Thread locals ***
Lukáš Marek's avatar
Lukáš Marek committed
153

154 155 156 157 158
// NOTE: The JVMTI functionality allows to implement everything
// using JVM, but the GNU implementation is faster and WORKING


struct tldata {
159 160 161 162 163 164 165 166 167
	jlong id;
	process_buffs * local_pb;
	jbyte to_buff_id;
	process_buffs * pb;
	buffer * analysis_buff;
	buffer * command_buff;
	jint analysis_count;
	size_t analysis_count_pos;
	size_t args_length_pos;
168 169 170 171 172 173 174 175 176 177 178 179 180
};


#if defined (__APPLE__) && defined (__MACH__)

//
// Use pthreads on Mac OS X
//

static pthread_key_t tls_key;


static void tls_init () {
181 182
	int result = pthread_key_create (& tls_key, NULL);
	check_error(result != 0, "Failed to allocate thread-local storage key");
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
}


inline static struct tldata * tld_init (struct tldata * tld) {
	tld->id= INVALID_THREAD_ID;
	tld->local_pb = NULL;
	tld->to_buff_id = INVALID_BUFF_ID;
	tld->pb = NULL;
	tld->analysis_buff = NULL;
	tld->analysis_count = 0;
	tld->analysis_count_pos = 0;

	return tld;
}

static struct tldata * tld_create ()  {
	struct tldata * tld = malloc (sizeof (struct tldata));
	check_error (tld == NULL, "Failed to allocate thread-local data");
	int result = pthread_setspecific (tls_key, tld);
	check_error (result != 0, "Failed to store thread-local data");
	return tld_init (tld);
}

inline static struct tldata * tld_get () {
	struct tldata * tld = pthread_getspecific (tls_key);
	return (tld != NULL) ? tld : tld_create ();
}

#else

//
// Use GNU __thread where supported
//

static void tls_init () {
218
	// empty
219 220 221 222
}


static __thread struct tldata tld = {
223 224 225 226 227 228 229 230
		.id = INVALID_THREAD_ID,
		.local_pb = NULL,
		.to_buff_id = INVALID_BUFF_ID,
		.pb = NULL,
		.analysis_buff = NULL,
		.command_buff = NULL,
		.analysis_count = 0,
		.analysis_count_pos = 0,
231 232 233 234 235 236 237
};

inline static struct tldata * tld_get () {
	return & tld;
}

#endif
238

239 240 241 242 243

// *** Threads ***

static pthread_t objtag_thread;
static pthread_t send_thread;
Lukáš Marek's avatar
Lukáš Marek committed
244

245 246
// ******************* Helper routines *******************

247
static void parse_agent_options(char *options) {
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272

	static const char PORT_DELIM = ':';

	// assign defaults
	strcpy(host_name, DEFAULT_HOST);
	strcpy(port_number, DEFAULT_PORT);

	// no options found
	if (options == NULL) {
		return;
	}

	char * port_start = strchr(options, PORT_DELIM);

	// process port number
	if(port_start != NULL) {

		// replace PORT_DELIM with end of the string (0)
		port_start[0] = '\0';

		// move one char forward to locate port number
		++port_start;

		// convert number
		int fitsP = strlen(port_start) < sizeof(port_number);
273
		check_error(! fitsP, "Port number is too long");
274 275 276 277 278 279

		strcpy(port_number, port_start);
	}

	// check if host_name is big enough
	int fitsH = strlen(options) < sizeof(host_name);
280
	check_error(! fitsH, "Host name is too long");
281 282 283 284

	strcpy(host_name, options);
}

285
// ******************* Advanced buffer routines *******************
286

287 288 289
// owner_id can have several states
// > 0 && <= TO_BUFFER_MAX_ID
//    - means that buffer is reserved for total ordering events
290
// >= STARTING_THREAD_ID
291 292
//    - means that buffer is owned by some thread that is marked
// == -1 - means that buffer is owned by some thread that is NOT tagged
293

294
// == PB_FREE - means that buffer is currently free
295
static const jlong PB_FREE = -100;
296

297
// == PB_OBJTAG - means that buffer is scheduled (processed) for object tagging
298
static const jlong PB_OBJTAG = -101;
299

300
// == PB_SEND - means that buffer is scheduled (processed) for sending
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
static const jlong PB_SEND = -102;

// == PB_UTILITY - means that this is special utility buffer
static const jlong PB_UTILITY = -1000;

static process_buffs * buffs_utility_get() {
#ifdef DEBUG
	printf("Acquiring buffer -- utility (thread %ld)\n", tld_get()->id);
#endif

	// retrieves pointer to buffer
	process_buffs * buffs;
	bq_pop(&utility_q, &buffs);

	// no owner setting - it is already PB_UTILITY

#ifdef DEBUG
	printf("Buffer acquired -- utility (thread %ld)\n", tld_get()->id);
#endif

	return buffs;
}

static process_buffs * buffs_utility_send(process_buffs * buffs) {
#ifdef DEBUG
	printf("Queuing buffer (utility) -- send (thread %ld)\n", tld_get()->id);
#endif

	// no owner setting - it is already PB_UTILITY
	bq_push(&send_q, &buffs);

#ifdef DEBUG
	printf("Buffer queued (utility) -- send (thread %ld)\n", tld_get()->id);
#endif

	return buffs;
}

// normally only sending thread should access this function
static void _buffs_utility_release(process_buffs * buffs) {
#ifdef DEBUG
	printf("Queuing buffer -- utility (thread %ld)\n", tld_get()->id);
#endif

	// empty buff
	buffer_clean(buffs->analysis_buff);
	buffer_clean(buffs->command_buff);

	// stores pointer to buffer
	buffs->owner_id = PB_UTILITY;
	bq_push(&utility_q, &buffs);

#ifdef DEBUG
	printf("Buffer queued -- utility (thread %ld)\n", tld_get()->id);
#endif
}
357

358
static process_buffs * buffs_get(jlong thread_id) {
359
#ifdef DEBUG
360
	printf("Acquiring buffer -- empty (thread %ld)\n", tld_get()->id);
361 362
#endif

363
	// retrieves pointer to buffer
364
	process_buffs * buffs;
365
	bq_pop(&empty_q, &buffs);
366

367
	buffs->owner_id = thread_id;
Lukáš Marek's avatar
Lukáš Marek committed
368

369
#ifdef DEBUG
370
	printf("Buffer acquired -- empty (thread %ld)\n", tld_get()->id);
371 372
#endif

373
	return buffs;
374 375
}

376
// normally only sending thread should access this function
377
static void _buffs_release(process_buffs * buffs) {
378
#ifdef DEBUG
379
	printf("Queuing buffer -- empty (thread %ld)\n", tld_get()->id);
380 381
#endif

382 383 384
	// empty buff
	buffer_clean(buffs->analysis_buff);
	buffer_clean(buffs->command_buff);
385

386
	// stores pointer to buffer
387
	buffs->owner_id = PB_FREE;
388
	bq_push(&empty_q, &buffs);
389 390

#ifdef DEBUG
391
	printf("Buffer queued -- empty (thread %ld)\n", tld_get()->id);
392
#endif
393 394
}

395
static void buffs_objtag(process_buffs * buffs) {
396
#ifdef DEBUG
397
	printf("Queuing buffer -- objtag (thread %ld)\n", tld_get()->id);
398 399
#endif

400
	buffs->owner_id = PB_OBJTAG;
401
	bq_push(&objtag_q, &buffs);
402 403

#ifdef DEBUG
404
	printf("Buffer queued -- objtag (thread %ld)\n", tld_get()->id);
405
#endif
406
}
407

408 409
// only objtag thread should access this function
static process_buffs * _buffs_objtag_get() {
410
#ifdef DEBUG
411
	printf("Acquiring buffer -- objtag (thread %ld)\n", tld_get()->id);
412 413
#endif

414 415
	process_buffs * buffs;
	bq_pop(&objtag_q, &buffs);
416

417
#ifdef DEBUG
418
	printf("Buffer acquired -- objtag (thread %ld)\n", tld_get()->id);
419 420
#endif

421 422
	return buffs;
}
423

424
static void _buffs_send(process_buffs * buffs) {
425
#ifdef DEBUG
426
	printf("Queuing buffer -- send (thread %ld)\n", tld_get()->id);
427 428
#endif

429
	buffs->owner_id = PB_SEND;
430
	bq_push(&send_q, &buffs);
431 432

#ifdef DEBUG
433
	printf("Buffer queued -- send (thread %ld)\n", tld_get()->id);
434
#endif
435 436
}

437 438
// only sending thread should access this function
static process_buffs * _buffs_send_get() {
439
#ifdef DEBUG
440
	printf("Acquiring buffer -- send (thread %ld)\n", tld_get()->id);
441 442
#endif

443 444
	process_buffs * buffs;
	bq_pop(&send_q, &buffs);
445

446
#ifdef DEBUG
447
	printf("Buffer acquired -- send (thread %ld)\n", tld_get()->id);
448 449
#endif

450
	return buffs;
451 452
}

453
// ******************* Advanced packing routines *******************
454

455 456
static void _fill_ot_rec(JNIEnv * jni_env, buffer * cmd_buff,
		unsigned char ot_type, buffer * buff, jstring to_send) {
457

458 459 460 461 462 463 464 465
	// crate object tagging record
	objtag_rec ot_rec;
	// type of object to be tagged
	ot_rec.obj_type = ot_type;
	// position in the buffer, where the data will be stored during tagging
	ot_rec.buff_pos = buffer_filled(buff);
	// global reference to the object to be tagged
	ot_rec.obj_to_tag = (*jni_env)->NewGlobalRef(jni_env, to_send);
466

467 468
	// save to command buff
	buffer_fill(cmd_buff, &ot_rec, sizeof(ot_rec));
469 470
}

471
static void pack_object(JNIEnv * jni_env, buffer * buff, buffer * cmd_buff,
472
		jobject to_send, unsigned char object_type) {
473

474 475
	// create entry for object tagging thread that will replace the null ref
	if(to_send != NULL) {
476
		_fill_ot_rec(jni_env, cmd_buff, object_type, buff, to_send);
477
	}
478

479
	// pack null net reference
480
	pack_long(buff, NULL_NET_REF);
481 482
}

483 484 485 486 487
static void buff_put_short(buffer * buff, size_t buff_pos, jshort to_put) {
	// put the short at the position in network order
	jshort nts = htons(to_put);
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jshort));
}
488

489

490 491 492
static void buff_put_int(buffer * buff, size_t buff_pos, jint to_put) {
	// put the int at the position in network order
	jint nts = htonl(to_put);
493
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jint));
494 495
}

496
static void buff_put_long(buffer * buff, size_t buff_pos, jlong to_put) {
497
	// put the long at the position in network order
498 499
	jlong nts = htobe64(to_put);
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jlong));
500 501 502
}


503
// ******************* analysis helper methods *******************
504

505
static jshort next_analysis_id () {
506 507 508
	// get id for this method string
	// this could use different lock then tagging but it should not be a problem
	// and it will be used rarely - bit unoptimized
509 510

	jshort result = -1;
511 512
	enter_critical_section(jvmti_env, tagging_lock);
	{
513
		result = avail_analysis_id++;
514 515
	}
	exit_critical_section(jvmti_env, tagging_lock);
516

517 518 519 520
	return result;
}

static jshort register_method(
521
		JNIEnv * jni_env, jstring analysis_method_desc,
522
		jlong thread_id) {
523 524 525 526
#ifdef DEBUG
	printf("Registering method (thread %ld)\n", tld_get()->id);
#endif

527 528 529 530 531 532 533 534
	// *** send register analysis method message ***

	// request unique id
	jshort new_analysis_id = next_analysis_id();

	// get string length
	jsize str_len =
			(*jni_env)->GetStringUTFLength(jni_env, analysis_method_desc);
535

536 537 538 539
	// get string data as utf-8
	const char * str =
			(*jni_env)->GetStringUTFChars(jni_env, analysis_method_desc, NULL);
	check_error(str == NULL, "Cannot get string from java");
540

541 542 543
	// check if the size is sendable
	int size_fits = str_len < UINT16_MAX;
	check_error(! size_fits, "Java string is too big for sending");
544

545
	// obtain buffer
546
	process_buffs * buffs = buffs_utility_get();
547
	buffer * buff = buffs->analysis_buff;
548

549 550 551 552 553
	// msg id
	pack_byte(buff, MSG_REG_ANALYSIS);
	// new id for analysis method
	pack_short(buff, new_analysis_id);
	// method descriptor
554
	pack_string_utf8(buff, str, str_len);
555

556
	// send message
557 558 559 560
	buffs_utility_send(buffs);

	// release string
	(*jni_env)->ReleaseStringUTFChars(jni_env, analysis_method_desc, str);
561

562
#ifdef DEBUG
563
	printf("Method registered (thread %ld)\n", tld_get()->id);
564 565
#endif

566
	return new_analysis_id;
567 568 569
}


570
static jlong next_thread_id () {
571
#ifdef DEBUG
572
	printf("Marking thread (thread %ld)\n", tld_get()->id);
573
#endif
574
	// mark the thread - with lock
575
	// TODO replace total ordering lock with private lock - perf. issue
576
	jlong result = -1;
577
	enter_critical_section(jvmti_env, to_buff_lock);
578
	{
579
		result = avail_thread_id++;
580
	}
581
	exit_critical_section(jvmti_env, to_buff_lock);
582

583
#ifdef DEBUG
584
	printf("Thread marked (thread %ld)\n", result);
585
#endif
586
	return result;
587
}
588

589

590
static size_t create_analysis_request_header (
591
		buffer * buff, jshort analysis_method_id
592 593 594 595 596 597 598 599 600 601 602 603 604 605
) {
	// analysis method id
	pack_short(buff, analysis_method_id);

	// position of the short indicating the length of marshalled arguments
	size_t pos = buffer_filled(buff);

	// initial value of the length of the marshalled arguments
	pack_short(buff, 0xBAAD);

	return pos;
}


606
void analysis_start_buff(
607 608
		JNIEnv * jni_env, jshort analysis_method_id, jbyte ordering_id,
		struct tldata * tld
609
) {
610
#ifdef DEBUGANL
611
	printf("Analysis (buffer) start enter (thread %ld)\n", tld_get()->id);
612 613
#endif

614
	check_error(ordering_id < 0, "Buffer id has negative value");
615

616 617 618 619 620 621 622 623 624 625 626 627 628 629
	// flush normal buffers before each global buffering
	if(tld->analysis_buff != NULL) {
		// invalidate buffer pointers
		tld->analysis_buff = NULL;
		tld->command_buff = NULL;

		// send buffers for object tagging
		buffs_objtag(tld->pb);

		// invalidate buffer pointer
		tld->pb = NULL;
	}

	// allocate special local buffer for this buffering
630
	if(tld->local_pb == NULL) {
631
		// mark thread
632 633
		if(tld->id == INVALID_THREAD_ID) {
			tld->id = next_thread_id ();
634 635 636
		}

		// get buffers
637
		tld->local_pb = buffs_get(tld->id);
638 639 640
	}

	// set local buffers for this buffering
641 642
	tld->analysis_buff = tld->local_pb->analysis_buff;
	tld->command_buff = tld->local_pb->command_buff;
643

644
	tld->to_buff_id = ordering_id;
645

646 647
	// create request header, keep track of the position
	// of the length of marshalled arguments
648
	tld->args_length_pos = create_analysis_request_header(tld->analysis_buff, analysis_method_id);
649

650
#ifdef DEBUGANL
651
	printf("Analysis (buffer) start exit (thread %ld)\n", tld_get()->id);
652 653 654 655
#endif
}


656
static size_t create_analysis_msg(buffer * buff, jlong id) {
657
	// create analysis message
658 659 660 661 662 663 664 665 666 667

	// analysis msg
	pack_byte(buff, MSG_ANALYZE);

	// thread (total order buffer) id
	pack_long(buff, id);

	// get pointer to the location where count of requests will stored
	size_t pos = buffer_filled(buff);

668 669
	// request count space initialization
	pack_int(buff, 0xBAADF00D);
670 671 672 673

	return pos;
}

674 675


676
static void analysis_start(
677 678
		JNIEnv * jni_env, jshort analysis_method_id,
		struct tldata * tld
679
) {
680
#ifdef DEBUGANL
681
	printf("Analysis start enter (thread %ld)\n", tld_get()->id);
682 683
#endif

684
	if(tld->analysis_buff == NULL) {
685

686
		// mark thread
687 688
		if(tld->id == INVALID_THREAD_ID) {
			tld->id = next_thread_id ();
689
		}
690

691
		// get buffers
692 693 694
		tld->pb = buffs_get(tld->id);
		tld->analysis_buff = tld->pb->analysis_buff;
		tld->command_buff = tld->pb->command_buff;
695

696
		// determines, how many analysis requests are sent in one message
697
		tld->analysis_count = 0;
698

699
		// create analysis message
700
		tld->analysis_count_pos = create_analysis_msg(tld->analysis_buff, tld->id);
701 702
	}

703 704
	// create request header, keep track of the position
	// of the length of marshalled arguments
705
	tld->args_length_pos = create_analysis_request_header(tld->analysis_buff, analysis_method_id);
706

707
#ifdef DEBUGANL
708
	printf("Analysis start exit (thread %ld)\n", tld_get()->id);
709
#endif
710 711
}

712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735
static void correct_cmd_buff_pos(buffer * cmd_buff, size_t shift) {

	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;

	objtag_rec ot_rec;

	// go through all records and shift the buffer position
	while(read < cmd_buff_len) {

		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));

		// shift buffer position
		ot_rec.buff_pos += shift;

		// write ot_rec data
		buffer_fill_at_pos(cmd_buff, read, &ot_rec, sizeof(ot_rec));

		// next
		read += sizeof(ot_rec);
	}
}

736
static void analysis_end_buff(struct tldata * tld) {
737
#ifdef DEBUGANL
738
	printf("Analysis (buffer) end enter (thread %ld)\n", tld_get()->id);
739 740 741 742
#endif

	// TODO lock for each buffer id

743
	// sending of half-full buffer is done in shutdown hook and obj free hook
744 745 746 747 748

	// write analysis to total order buffer - with lock
	enter_critical_section(jvmti_env, to_buff_lock);
	{
		// pointer to the total order buffer structure
749
		to_buff_struct * tobs = &(to_buff_array[tld->to_buff_id]);
750 751 752 753

		// allocate new buffer
		if(tobs->pb == NULL) {

754
			tobs->pb = buffs_get(tld->id);
755

756
			// set owner_id as t_buffid
757
			tobs->pb->owner_id = tld->to_buff_id;
758

759
			// determines, how many analysis requests are sent in one message
760 761
			tobs->analysis_count = 0;

762
			// create analysis message
763
			tobs->analysis_count_pos = create_analysis_msg(
764
					tobs->pb->analysis_buff, tld->to_buff_id);
765 766
		}

767 768 769 770
		// first correct positions in command buffer
		// records in command buffer are positioned according to the local
		// analysis buffer but we want the position to be valid in total ordered
		// buffer
771
		correct_cmd_buff_pos(tld->local_pb->command_buff,
772 773
				buffer_filled(tobs->pb->analysis_buff));

774 775 776
		// fill total order buffers
		buffer_fill(tobs->pb->analysis_buff,
				// NOTE: normally access the buffer using methods
777 778
				tld->local_pb->analysis_buff->buff,
				tld->local_pb->analysis_buff->occupied);
779 780 781

		buffer_fill(tobs->pb->command_buff,
				// NOTE: normally access the buffer using methods
782 783
				tld->local_pb->command_buff->buff,
				tld->local_pb->command_buff->occupied);
784 785

		// empty local buffers
786 787
		buffer_clean(tld->local_pb->analysis_buff);
		buffer_clean(tld->local_pb->command_buff);
788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809

		// add number of completed requests
		++(tobs->analysis_count);

		// buffer has to be updated each time because jvm could end and buffer
		// has to be up-to date
		buff_put_int(tobs->pb->analysis_buff, tobs->analysis_count_pos,
				tobs->analysis_count);

		// send only when the method count is reached
		if(tobs->analysis_count >= ANALYSIS_COUNT) {

			// send buffers for object tagging
			buffs_objtag(tobs->pb);

			// invalidate buffer pointer
			tobs->pb = NULL;
		}
	}
	exit_critical_section(jvmti_env, to_buff_lock);

	// reset analysis and command buffers for normal buffering
810 811 812 813
	// set to NULL, because we've send the buffers at the beginning of
	// global buffer buffering
	tld->analysis_buff = NULL;
	tld->command_buff = NULL;
814 815

	// invalidate buffer id
816
	tld->to_buff_id = INVALID_BUFF_ID;
817

818
#ifdef DEBUGANL
819
	printf("Analysis (buffer) end exit (thread %ld)\n", tld_get()->id);
820 821 822
#endif
}

823
static void analysis_end(struct tldata * tld) {
824 825 826 827
	// update the length of the marshalled arguments
	jshort args_length = buffer_filled(tld->analysis_buff) - tld->args_length_pos - sizeof (jshort);
	buff_put_short(tld->analysis_buff, tld->args_length_pos, args_length);

828
	// this method is also called for end of analysis for totally ordered API
829 830
	if(tld->to_buff_id != INVALID_BUFF_ID) {
		analysis_end_buff(tld);
831 832 833
		return;
	}

834
#ifdef DEBUGANL
835
	printf("Analysis end enter (thread %ld)\n", tld_get()->id);
836 837
#endif

838
	// sending of half-full buffer is done in thread end hook
839

Lukáš Marek's avatar
Lukáš Marek committed
840
	// increment the number of completed requests
841
	tld->analysis_count++;
842

Lukáš Marek's avatar
Lukáš Marek committed
843
	// buffer has to be updated each time - the thread can end any time
844
	buff_put_int(tld->analysis_buff, tld->analysis_count_pos, tld->analysis_count);
845

Lukáš Marek's avatar
Lukáš Marek committed
846
	// send only after the proper count is reached
847
	if(tld->analysis_count >= ANALYSIS_COUNT) {
848
		// invalidate buffer pointers
849 850
		tld->analysis_buff = NULL;
		tld->command_buff = NULL;
851

852
		// send buffers for object tagging
853
		buffs_objtag(tld->pb);
854 855

		// invalidate buffer pointer
856
		tld->pb = NULL;
857
	}
858

859
#ifdef DEBUGANL
860
	printf("Analysis end exit (thread %ld)\n", tld_get()->id);
861
#endif
862
}
863

864
// ******************* Object tagging thread *******************
865

866
// TODO add cache - ??
867

868 869 870 871
static jclass THREAD_CLASS = NULL;
static jclass STRING_CLASS = NULL;

static void ot_pack_string_data(JNIEnv * jni_env, buffer * buff,
872
		jstring to_send, jlong str_net_ref) {
873

874 875 876 877 878 879
	// get string length
	jsize str_len = (*jni_env)->GetStringUTFLength(jni_env, to_send);

	// get string data as utf-8
	const char * str = (*jni_env)->GetStringUTFChars(jni_env, to_send, NULL);
	check_error(str == NULL, "Cannot get string from java");
880

881 882 883
	// check if the size is sendable
	int size_fits = str_len < UINT16_MAX;
	check_error(! size_fits, "Java string is too big for sending");
884

885
	// add message to the buffer
886 887

	// msg id
888
	pack_byte(buff, MSG_STRING_INFO);
889 890 891 892
	// send string net reference
	pack_long(buff, str_net_ref);
	// send string
	pack_string_utf8(buff, str, str_len);
893

894 895
	// release string
	(*jni_env)->ReleaseStringUTFChars(jni_env, to_send, str);
896 897
}

898 899 900 901 902 903
static void ot_pack_thread_data(JNIEnv * jni_env, buffer * buff,
		jstring to_send, jlong thr_net_ref) {

	jvmtiThreadInfo info;
	jvmtiError error = (*jvmti_env)->GetThreadInfo(jvmti_env, to_send, &info);
	check_error(error != JVMTI_ERROR_NONE, "Cannot get tread info");
904

905 906 907 908 909 910 911 912 913 914 915 916 917
	// pack thread info message

	// msg id
	pack_byte(buff, MSG_THREAD_INFO);

	// thread object id
	pack_long(buff, thr_net_ref);

	// thread name
	pack_string_utf8(buff, info.name, strlen(info.name));

	// is daemon thread
	pack_boolean(buff, info.is_daemon);
918
}
919

920
static void update_send_status(jobject to_send, jlong * net_ref) {
921

922 923 924
	net_ref_set_spec(net_ref, TRUE);
	update_net_reference(jvmti_env, to_send, *net_ref);
}
925

926 927
static void ot_pack_aditional_data(JNIEnv * jni_env, jlong * net_ref,
		jobject to_send, unsigned char obj_type, buffer * new_objs_buff) {
928

929 930
	// NOTE: we don't use lock for updating send status, so it is possible
	// that multiple threads will send it, but this will hurt only performance
931

932 933 934
	// test if the data was already sent to the server
	if(net_ref_get_spec(*net_ref) == TRUE) {
		return;
935
	}
936

937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952
	// NOTE: Tests for class types could be done by buffering threads.
	//       It depends, where we want to have the load.

	// String - pack data
	if((*jni_env)->IsInstanceOf(jni_env, to_send, STRING_CLASS)) {

		update_send_status(to_send, net_ref);
		ot_pack_string_data(jni_env, new_objs_buff, to_send, *net_ref);
	}

	// Thread - pack data
	if((*jni_env)->IsInstanceOf(jni_env, to_send, THREAD_CLASS)) {

		update_send_status(to_send, net_ref);
		ot_pack_thread_data(jni_env, new_objs_buff, to_send, *net_ref);
	}
953
}
954

955 956
static void ot_tag_record(JNIEnv * jni_env, buffer * buff, size_t buff_pos,
		jobject to_send, unsigned char obj_type, buffer * new_objs_buff) {
957

958
	// get net reference
959 960
	jlong net_ref =
			get_net_reference(jni_env, jvmti_env, new_objs_buff, to_send);
961

962 963 964 965 966 967 968 969 970
	// send additional data
	if(obj_type == OT_DATA_OBJECT) {

		// NOTE: can update net reference (net_ref)
		ot_pack_aditional_data(jni_env, &net_ref, to_send, obj_type,
				new_objs_buff);
	}

	// update the net reference
971
	buff_put_long(buff, buff_pos, net_ref);
972
}
973

974 975
static void ot_tag_buff(JNIEnv * jni_env, buffer * anl_buff, buffer * cmd_buff,
		buffer * new_objs_buff) {
976

977 978
	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;
979

980
	objtag_rec ot_rec;
981

982
	while(read < cmd_buff_len) {
983

984 985 986
		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));
		read += sizeof(ot_rec);
987

988 989
		ot_tag_record(jni_env, anl_buff, ot_rec.buff_pos, ot_rec.obj_to_tag,
				ot_rec.obj_type, new_objs_buff);
990

991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
		// global references are released after buffer is send
	}
}

// TODO code dup with ot_tag_buff
static void ot_relese_global_ref(JNIEnv * jni_env, buffer * cmd_buff) {

	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;

	objtag_rec ot_rec;

	while(read < cmd_buff_len) {

		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));
		read += sizeof(ot_rec);

		// release global references
1010
		(*jni_env)->DeleteGlobalRef(jni_env, ot_rec.obj_to_tag);
1011
	}
1012 1013
}

1014
static void * objtag_thread_loop(void * obj) {
1015

1016 1017 1018 1019
#ifdef DEBUG
		printf("Object tagging thread start (thread %ld)\n", tld_get()->id);
#endif

1020 1021 1022 1023 1024
	// attach thread to jvm
	JNIEnv *jni_env;
	jvmtiError error = (*java_vm)->AttachCurrentThreadAsDaemon(java_vm,
			(void **)&jni_env, NULL);
	check_jvmti_error(jvmti_env, error, "Unable to attach objtag thread.");
1025

1026 1027 1028
	// one spare buffer for new objects
	buffer * new_obj_buff = malloc(sizeof(buffer));
	buffer_alloc(new_obj_buff);
1029

1030 1031 1032 1033 1034 1035 1036 1037
	// retrieve java types

	STRING_CLASS = (*jni_env)->FindClass(jni_env, "java/lang/String");
	check_error(STRING_CLASS == NULL, "String class not found");

	THREAD_CLASS = (*jni_env)->FindClass(jni_env, "java/lang/Thread");
	check_error(STRING_CLASS == NULL, "Thread class not found");

1038 1039
	// exit when the jvm is terminated and there are no msg to process
	while(! (no_tagging_work && bq_length(&objtag_q) == 0) ) {
1040

1041 1042
		// get buffer - before tagging lock
		process_buffs * pb = _buffs_objtag_get();
1043

1044
#ifdef DEBUG
1045
		printf("Object tagging started (thread %ld)\n", tld_get()->id);
1046 1047
#endif

1048 1049 1050
		// tag the objects - with lock
		enter_critical_section(jvmti_env, tagging_lock);
		{
1051

1052 1053 1054 1055
			// tag objcects from buffer
			// note that analysis buffer is not required
			ot_tag_buff(jni_env, pb->analysis_buff, pb->command_buff,
					new_obj_buff);
1056

1057
			// exchange command_buff and new_obj_buff
1058
			buffer * old_cmd_buff = pb->command_buff;
1059
			pb->command_buff = new_obj_buff;
1060

1061
			// send buffer
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
			_buffs_send(pb);

			// global references are released after buffer is send
			// this is critical for ensuring that proper ordering of events
			// is maintained - see object free event for more info

			ot_relese_global_ref(jni_env, old_cmd_buff);

			// clean old_cmd_buff and make it as new_obj_buff for the next round
			buffer_clean(old_cmd_buff);
			new_obj_buff = old_cmd_buff;
1073 1074
		}
		exit_critical_section(jvmti_env, tagging_lock);
1075 1076

#ifdef DEBUG
1077
		printf("Object tagging ended (thread %ld)\n", tld_get()->id);
1078
#endif
1079 1080
	}

1081 1082 1083
	buffer_free(new_obj_buff);
	free(new_obj_buff);
	new_obj_buff = NULL;
1084

1085 1086 1087 1088
#ifdef DEBUG
		printf("Object tagging thread end (thread %ld)\n", tld_get()->id);
#endif

1089 1090
	return NULL;
}
1091

1092
// ******************* Sending thread *******************
1093

1094
static void _send_buffer(int connection, buffer * b) {
1095

1096
	// send data
1097
	// NOTE: normally access the buffer using methods
1098 1099
	send_data(connection, b->buff, b->occupied);
}
1100

1101
static int open_connection() {
1102

1103 1104 1105 1106
	// get host address
	struct addrinfo * addr;
	int gai_res = getaddrinfo(host_name, port_number, NULL, &addr);
	check_error(gai_res != 0, gai_strerror(gai_res));
1107

1108 1109 1110
	// create stream socket
	int sockfd = socket(addr->ai_family, SOCK_STREAM, 0);
	check_std_error(sockfd, -1, "Cannot create socket");
1111

1112 1113 1114
	// connect to server
	int conn_res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
	check_std_error(conn_res, -1, "Cannot connect to server");
1115

1116 1117
	// free host address info
	freeaddrinfo(addr);
1118

1119
	return sockfd;
Lukáš Marek's avatar
Lukáš Marek committed
1120 1121
}

1122
static void close_connection(int conn, jlong thread_id) {
Lukáš Marek's avatar
Lukáš Marek committed
1123

1124
	// send close message
1125

1126
	// obtain buffer
1127
	process_buffs * buffs = buffs_get(thread_id);
1128
	buffer * buff = buffs->command_buff;
1129

1130 1131
	// msg id
	pack_byte(buff, MSG_CLOSE);
1132

1133
	// send buffer directly
1134
	_send_buffer(conn, buff);
1135

1136 1137
	// release buffer
	_buffs_release(buffs);
Lukáš Marek's avatar
Lukáš Marek committed
1138

1139
	// close socket
1140
	shutdown(conn, SHUT_RDWR);
1141 1142
	close(conn);
}
1143
static void * send_thread_loop(void * obj) {
Lukáš Marek's avatar
Lukáš Marek committed
1144

1145 1146 1147
#ifdef DEBUG
	printf("Sending thread start (thread %ld)\n", tld_get()->id);
#endif
Lukáš Marek's avatar
Lukáš Marek committed
1148

1149
	// open connection
1150
	int connection = open_connection();
Lukáš Marek's avatar
Lukáš Marek committed
1151

1152 1153
	// exit when the jvm is terminated and there are no msg to process
	while(! (no_sending_work && bq_length(&send_q) == 0) ) {
Lukáš Marek's avatar
Lukáš Marek committed
1154

1155
		// get buffer
1156 1157 1158
		// TODO thread could timeout here with timeout about 5 sec and check
		// if all of the buffers are allocated by the application threads
		// and all application threads are waiting on free buffer - deadlock
1159
		process_buffs * pb = _buffs_send_get();
1160

1161
#ifdef DEBUG
1162
		printf("Sending buffer (thread %ld)\n", tld_get()->id);
1163 1164
#endif

1165
		// first send command buffer - contains new class or object ids,...
1166
		_send_buffer(connection, pb->command_buff);
1167
		// send analysis buffer
1168
		_send_buffer(connection, pb->analysis_buff);
Lukáš Marek's avatar
Lukáš Marek committed
1169

1170 1171 1172 1173 1174 1175 1176 1177 1178
		// release (enqueue) buffer according to the type
		if(pb->owner_id == PB_UTILITY) {
			// utility buffer
			_buffs_utility_release(pb);
		}
		else {
			// normal buffer
			_buffs_release(pb);
		}
1179 1180

#ifdef DEBUG
1181
		printf("Buffer sent (thread %ld)\n", tld_get()->id);
1182
#endif
1183
	}
Lukáš Marek's avatar
Lukáš Marek committed
1184

1185
	// close connection
1186
	close_connection(connection, tld_get()->id);
1187 1188 1189 1190 1191

#ifdef DEBUG
	printf("Sending thread end (thread %ld)\n", tld_get()->id);
#endif

1192 1193
	return NULL;
}
Lukáš Marek's avatar
Lukáš Marek committed
1194

1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259

// ******************* REDispatch methods *******************

JNIEXPORT jshort JNICALL Java_ch_usi_dag_dislre_REDispatch_registerMethod
(JNIEnv * jni_env, jclass this_class, jstring analysis_method_desc) {

	return register_method(jni_env, analysis_method_desc, tld_get()->id);
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart__S
(JNIEnv * jni_env, jclass this_class, jshort analysis_method_id) {

	analysis_start(jni_env, analysis_method_id, tld_get());
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisStart__SB
(JNIEnv * jni_env, jclass this_class, jshort analysis_method_id,
		jbyte ordering_id) {

	analysis_start_buff(jni_env, analysis_method_id, ordering_id, tld_get());
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_analysisEnd
(JNIEnv * jni_env, jclass this_class) {

	analysis_end(tld_get());
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendBoolean
(JNIEnv * jni_env, jclass this_class, jboolean to_send) {

	pack_boolean(tld_get()->analysis_buff, to_send);
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendByte
(JNIEnv * jni_env, jclass this_class, jbyte to_send) {

	pack_byte(tld_get()->analysis_buff, to_send);
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendChar
(JNIEnv * jni_env, jclass this_class, jchar to_send) {

	pack_char(tld_get()->analysis_buff, to_send);
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendShort
(JNIEnv * jni_env, jclass this_class, jshort to_send) {

	pack_short(tld_get()->analysis_buff, to_send);
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendInt
(JNIEnv * jni_env, jclass this_class, jint to_send) {

	pack_int(tld_get()->analysis_buff, to_send);
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendLong
(JNIEnv * jni_env, jclass this_class, jlong to_send) {

	pack_long(tld_get()->analysis_buff, to_send);
}


1260 1261 1262 1263
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendFloat
(JNIEnv * jni_env, jclass this_class, jfloat to_send) {

	pack_float(tld_get()->analysis_buff, to_send);
1264 1265
}

1266 1267
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendDouble
(JNIEnv * jni_env, jclass this_class, jdouble to_send) {
1268

1269
	pack_double(tld_get()->analysis_buff, to_send);
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendObject
(JNIEnv * jni_env, jclass this_class, jobject to_send) {

	struct tldata * tld = tld_get ();
	pack_object(jni_env, tld->analysis_buff, tld->command_buff, to_send,
			OT_OBJECT);
}

JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendObjectPlusData
(JNIEnv * jni_env, jclass this_class, jobject to_send) {

	struct tldata * tld = tld_get ();
	pack_object(jni_env, tld->analysis_buff, tld->command_buff, to_send,
			OT_DATA_OBJECT);
}

1288 1289 1290 1291 1292 1293 1294 1295
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendObjectSize
(JNIEnv * jni_env, jclass this_class, jobject to_send) {
	jlong size = -1;
	jvmtiError error = (*jvmti_env)->GetObjectSize(jvmti_env, to_send, &size);
	check_jvmti_error(jvmti_env, error, "Cannot get object size");
	pack_long(tld_get()->analysis_buff, size);
}

1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
JNIEXPORT void JNICALL Java_ch_usi_dag_dislre_REDispatch_sendCurrentThread
(JNIEnv * jni_env, jclass this_class) {
	//
	// GetCurrentThread may return null if called in the wrong phase,
	// and even if it fails for other reason, we at least send null.
	//
	// Consider sending some dummy data for an "init" thread while
	// GetCurrentThread is not yet ready to return the current
	// thread, to differentiate it from failure.
	//
	jthread thread = NULL;
	(*jvmti_env)->GetCurrentThread(jvmti_env, &thread);
	struct tldata * tld = tld_get ();
	pack_object(jni_env, tld->analysis_buff, tld->command_buff, thread,
			OT_DATA_OBJECT);
}

1313