dislreagent.c 49.2 KB
Newer Older
1 2 3 4 5 6 7 8
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <netdb.h>
#include <unistd.h>
#include <netinet/tcp.h>

9 10
#include <pthread.h>

11 12 13
#include <jvmti.h>
#include <jni.h>

14 15
// has to be defined for jvmtihelper.h
#define ERR_PREFIX "DiSL-RE agent error: "
16

17 18
#include "../src-agent-c/jvmtihelper.h"
#include "../src-agent-c/comm.h"
19

20 21 22
#include "messagetype.h"
#include "buffer.h"
#include "buffpack.h"
23 24
#include "blockingqueue.h"
#include "netref.h"
25 26 27 28 29

#include "dislreagent.h"

static const int ERR_SERVER = 10003;

30 31 32 33 34 35 36 37
// defaults - be sure that space in host_name is long enough
static const char * DEFAULT_HOST = "localhost";
static const char * DEFAULT_PORT = "11218";

// port and name of the instrumentation server
static char host_name[1024];
static char port_number[6]; // including final 0

38 39 40
// number of analysis requests in one message
static jint ANALYSIS_COUNT = 1024;

41
static jvmtiEnv * jvmti_env;
42 43
static JavaVM * java_vm;

Lukáš Marek's avatar
Lukáš Marek committed
44
static int jvm_started = FALSE;
45

46 47
static volatile int no_tagging_work = FALSE;
static volatile int no_sending_work = FALSE;
48

49
// *** Accessed only by sending thread ***
50 51 52 53 54

// communication connection socket descriptor
// access must be protected by monitor
static int connection = 0;

55 56 57 58 59 60 61
// *** Sync queues ***

// !!! There should be enough buffers for initial class loading
// Sending thread is not jet running but buffers are consumed
#define BQ_BUFFERS 512

// queues contain process_buffs structure
62

63 64
// queues with empty buffers
static blocking_queue empty_buff_q;
65

66 67
// queue where buffers are queued for sending
static blocking_queue send_q;
Lukáš Marek's avatar
Lukáš Marek committed
68

69 70
// queue where buffers are queued for object
static blocking_queue objtag_q;
71

72 73 74
typedef struct {
	buffer * command_buff;
	buffer * analysis_buff;
75
	jlong owner_id;
76 77 78 79 80 81 82 83 84 85 86 87 88 89
} process_buffs;

// list of all allocated bq buffers
static process_buffs pb_list[BQ_BUFFERS];

#define OT_OBJECT 1
#define OT_STRING 2
#define OT_CLASS 3

typedef struct {
	unsigned char obj_type;
	size_t buff_pos;
	jobject obj_to_tag;
} objtag_rec;
90

91 92 93 94 95 96 97 98 99 100
// *** buffers for total ordering ***

typedef struct {
	process_buffs * pb;
	jint analysis_count;
	size_t analysis_count_pos;
} to_buff_struct;

#define INVALID_BUFF_ID -1

101 102
#define TO_BUFFER_MAX_ID 127 // byte is the holding type
#define TO_BUFFER_COUNT (TO_BUFFER_MAX_ID + 1) // +1 for buffer id 0
103 104 105

static jrawMonitorID to_buff_lock;

106
static to_buff_struct to_buff_array[TO_BUFFER_COUNT];
107

108 109
// *** Protected by tagging lock ***
// can require other locks while holding this
110

111 112
#define NULL_NET_REF 0

113
static jrawMonitorID tagging_lock;
114

115 116 117 118 119 120
// first available id for object tagging
static volatile jlong avail_object_id = 1;
static volatile jint avail_class_id = 1;

// first available id for new messages
static volatile jshort avail_analysis_id = 1;
121

122 123 124 125
// *** Thread ids ***

#define INVALID_THREAD_ID -1

126 127
#define STARTING_THREAD_ID (TO_BUFFER_MAX_ID + 1)

128
// initial ids are reserved for total ordering buffers
129
static volatile jlong avail_thread_id = STARTING_THREAD_ID;
130

131

132
// *** Thread locals ***
Lukáš Marek's avatar
Lukáš Marek committed
133

134 135 136 137 138
// NOTE: The JVMTI functionality allows to implement everything
// using JVM, but the GNU implementation is faster and WORKING


struct tldata {
139 140 141 142 143 144 145 146 147
	jlong id;
	process_buffs * local_pb;
	jbyte to_buff_id;
	process_buffs * pb;
	buffer * analysis_buff;
	buffer * command_buff;
	jint analysis_count;
	size_t analysis_count_pos;
	size_t args_length_pos;
148 149 150 151 152 153 154 155 156 157 158 159 160
};


#if defined (__APPLE__) && defined (__MACH__)

//
// Use pthreads on Mac OS X
//

static pthread_key_t tls_key;


static void tls_init () {
161 162
	int result = pthread_key_create (& tls_key, NULL);
	check_error(result != 0, "Failed to allocate thread-local storage key");
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
}


inline static struct tldata * tld_init (struct tldata * tld) {
	tld->id= INVALID_THREAD_ID;
	tld->local_pb = NULL;
	tld->to_buff_id = INVALID_BUFF_ID;
	tld->pb = NULL;
	tld->analysis_buff = NULL;
	tld->analysis_count = 0;
	tld->analysis_count_pos = 0;

	return tld;
}

static struct tldata * tld_create ()  {
	struct tldata * tld = malloc (sizeof (struct tldata));
	check_error (tld == NULL, "Failed to allocate thread-local data");
	int result = pthread_setspecific (tls_key, tld);
	check_error (result != 0, "Failed to store thread-local data");
	return tld_init (tld);
}

inline static struct tldata * tld_get () {
	struct tldata * tld = pthread_getspecific (tls_key);
	return (tld != NULL) ? tld : tld_create ();
}

#else

//
// Use GNU __thread where supported
//

static void tls_init () {
198
	// empty
199 200 201 202
}


static __thread struct tldata tld = {
203 204 205 206 207 208 209 210
		.id = INVALID_THREAD_ID,
		.local_pb = NULL,
		.to_buff_id = INVALID_BUFF_ID,
		.pb = NULL,
		.analysis_buff = NULL,
		.command_buff = NULL,
		.analysis_count = 0,
		.analysis_count_pos = 0,
211 212 213 214 215 216 217
};

inline static struct tldata * tld_get () {
	return & tld;
}

#endif
218

219 220 221 222 223

// *** Threads ***

static pthread_t objtag_thread;
static pthread_t send_thread;
Lukáš Marek's avatar
Lukáš Marek committed
224

225 226
// ******************* Helper routines *******************

227
static void parse_agent_options(char *options) {
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252

	static const char PORT_DELIM = ':';

	// assign defaults
	strcpy(host_name, DEFAULT_HOST);
	strcpy(port_number, DEFAULT_PORT);

	// no options found
	if (options == NULL) {
		return;
	}

	char * port_start = strchr(options, PORT_DELIM);

	// process port number
	if(port_start != NULL) {

		// replace PORT_DELIM with end of the string (0)
		port_start[0] = '\0';

		// move one char forward to locate port number
		++port_start;

		// convert number
		int fitsP = strlen(port_start) < sizeof(port_number);
253
		check_error(! fitsP, "Port number is too long");
254 255 256 257 258 259

		strcpy(port_number, port_start);
	}

	// check if host_name is big enough
	int fitsH = strlen(options) < sizeof(host_name);
260
	check_error(! fitsH, "Host name is too long");
261 262 263 264

	strcpy(host_name, options);
}

265
// ******************* Advanced buffer routines *******************
266

267 268 269
// owner_id can have several states
// > 0 && <= TO_BUFFER_MAX_ID
//    - means that buffer is reserved for total ordering events
270
// >= STARTING_THREAD_ID
271 272
//    - means that buffer is owned by some thread that is marked
// == -1 - means that buffer is owned by some thread that is NOT tagged
273

274
// == PB_FREE - means that buffer is currently free
275
static const jlong PB_FREE = -2;
276

277 278
// == PB_OBJTAG - means that buffer is scheduled (processed) for object tagging
static const jlong PB_OBJTAG = -100;
279

280 281
// == PB_SEND - means that buffer is scheduled (processed) for sending
static const jlong PB_SEND = -101;
282

283
static process_buffs * buffs_get(jlong thread_id) {
284
#ifdef DEBUG
285
	printf("Acquiring buffer -- empty (thread %ld)\n", tld_get()->id);
286 287
#endif

288
	// retrieves pointer to buffer
289
	process_buffs * buffs;
290
	bq_pop(&empty_buff_q, &buffs);
291

292
	buffs->owner_id = thread_id;
Lukáš Marek's avatar
Lukáš Marek committed
293

294
#ifdef DEBUG
295
	printf("Buffer acquired -- empty (thread %ld)\n", tld_get()->id);
296 297
#endif

298
	return buffs;
299 300
}

301 302
// only objtag or sending thread should access this function
static void _buffs_release(process_buffs * buffs) {
303
#ifdef DEBUG
304
	printf("Queuing buffer -- empty (thread %ld)\n", tld_get()->id);
305 306
#endif

307 308 309
	// empty buff
	buffer_clean(buffs->analysis_buff);
	buffer_clean(buffs->command_buff);
310

311
	// stores pointer to buffer
312
	buffs->owner_id = PB_FREE;
313
	bq_push(&empty_buff_q, &buffs);
314 315

#ifdef DEBUG
316
	printf("Buffer queued -- empty (thread %ld)\n", tld_get()->id);
317
#endif
318 319
}

320
static void buffs_objtag(process_buffs * buffs) {
321
#ifdef DEBUG
322
	printf("Queuing buffer -- objtag (thread %ld)\n", tld_get()->id);
323 324
#endif

325
	buffs->owner_id = PB_OBJTAG;
326
	bq_push(&objtag_q, &buffs);
327 328

#ifdef DEBUG
329
	printf("Buffer queued -- objtag (thread %ld)\n", tld_get()->id);
330
#endif
331
}
332

333 334
// only objtag thread should access this function
static process_buffs * _buffs_objtag_get() {
335
#ifdef DEBUG
336
	printf("Acquiring buffer -- objtag (thread %ld)\n", tld_get()->id);
337 338
#endif

339 340
	process_buffs * buffs;
	bq_pop(&objtag_q, &buffs);
341

342
#ifdef DEBUG
343
	printf("Buffer acquired -- objtag (thread %ld)\n", tld_get()->id);
344 345
#endif

346 347
	return buffs;
}
348

349
static void buffs_send(process_buffs * buffs) {
350
#ifdef DEBUG
351
	printf("Queuing buffer -- send (thread %ld)\n", tld_get()->id);
352 353
#endif

354
	buffs->owner_id = PB_SEND;
355
	bq_push(&send_q, &buffs);
356 357

#ifdef DEBUG
358
	printf("Buffer queued -- send (thread %ld)\n", tld_get()->id);
359
#endif
360 361
}

362 363
// only sending thread should access this function
static process_buffs * _buffs_send_get() {
364
#ifdef DEBUG
365
	printf("Acquiring buffer -- send (thread %ld)\n", tld_get()->id);
366 367
#endif

368 369
	process_buffs * buffs;
	bq_pop(&send_q, &buffs);
370

371
#ifdef DEBUG
372
	printf("Buffer acquired -- send (thread %ld)\n", tld_get()->id);
373 374
#endif

375
	return buffs;
376 377
}

378
// ******************* Advanced packing routines *******************
379

380 381
static void _fill_ot_rec(JNIEnv * jni_env, buffer * cmd_buff,
		unsigned char ot_type, buffer * buff, jstring to_send) {
382

383 384 385 386 387 388 389 390
	// crate object tagging record
	objtag_rec ot_rec;
	// type of object to be tagged
	ot_rec.obj_type = ot_type;
	// position in the buffer, where the data will be stored during tagging
	ot_rec.buff_pos = buffer_filled(buff);
	// global reference to the object to be tagged
	ot_rec.obj_to_tag = (*jni_env)->NewGlobalRef(jni_env, to_send);
391

392 393
	// save to command buff
	buffer_fill(cmd_buff, &ot_rec, sizeof(ot_rec));
394 395
}

396 397
static void pack_object(JNIEnv * jni_env, buffer * buff, buffer * cmd_buff,
		jobject to_send) {
398

399 400 401 402
	// create entry for object tagging thread that will replace the null ref
	if(to_send != NULL) {
		_fill_ot_rec(jni_env, cmd_buff, OT_OBJECT, buff, to_send);
	}
403

404
	// pack null net reference
405
	pack_long(buff, NULL_NET_REF);
406
}
407

408 409
static void pack_string_java(JNIEnv * jni_env, buffer * buff, buffer * cmd_buff,
		jstring to_send) {
410

411 412 413 414
	// create entry for object tagging thread that will replace the null ref
	if(to_send != NULL) {
		_fill_ot_rec(jni_env, cmd_buff, OT_STRING, buff, to_send);
	}
415

416
	// pack null net reference
417
	pack_long(buff, NULL_NET_REF);
418 419
}

420 421
static void pack_class(JNIEnv * jni_env, buffer * buff, buffer * cmd_buff,
		jclass to_send) {
422

423 424 425 426
	// create entry for object tagging thread that will replace the null ref
	if(to_send != NULL) {
		_fill_ot_rec(jni_env, cmd_buff, OT_CLASS, buff, to_send);
	}
427

428
	// pack null class id
429
	pack_long(buff, NULL_NET_REF);
430
}
431

432 433 434 435 436
static void buff_put_short(buffer * buff, size_t buff_pos, jshort to_put) {
	// put the short at the position in network order
	jshort nts = htons(to_put);
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jshort));
}
437

438

439 440 441
static void buff_put_int(buffer * buff, size_t buff_pos, jint to_put) {
	// put the int at the position in network order
	jint nts = htonl(to_put);
442
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jint));
443 444
}

445
static void buff_put_long(buffer * buff, size_t buff_pos, jlong to_put) {
446
	// put the long at the position in network order
447 448
	jlong nts = htobe64(to_put);
	buffer_fill_at_pos(buff, buff_pos, &nts, sizeof(jlong));
449 450 451
}


452
// ******************* analysis helper methods *******************
453

454
static jshort next_analysis_id () {
455 456 457
	// get id for this method string
	// this could use different lock then tagging but it should not be a problem
	// and it will be used rarely - bit unoptimized
458 459

	jshort result = -1;
460 461
	enter_critical_section(jvmti_env, tagging_lock);
	{
462
		result = avail_analysis_id++;
463 464
	}
	exit_critical_section(jvmti_env, tagging_lock);
465

466 467 468 469
	return result;
}

static jshort register_method(
470 471
		JNIEnv * jni_env, jstring analysis_method_desc,
		jlong thread_id
472 473 474 475 476 477 478 479 480
) {
#ifdef DEBUG
	printf("Registering method (thread %ld)\n", tld_get()->id);
#endif

	// *** send register analysis ***

	jshort new_analysis_id = next_analysis_id ();

481
	// send register analysis message
482

483
	// obtain buffer
484
	process_buffs * buffs = buffs_get(thread_id);
485
	buffer * buff = buffs->analysis_buff;
486

487 488 489 490 491 492 493
	// msg id
	pack_byte(buff, MSG_REG_ANALYSIS);
	// new id for analysis method
	pack_short(buff, new_analysis_id);
	// method descriptor
	// uses string case and additional message for string - bit unoptimized
	pack_string_java(jni_env, buff, buffs->command_buff, analysis_method_desc);
494

495 496
	// send message
	buffs_objtag(buffs);
497

498
#ifdef DEBUG
499
	printf("Method registered (thread %ld)\n", tld_get()->id);
500 501
#endif

502
	return new_analysis_id;
503 504 505
}


506
static jlong next_thread_id () {
507
#ifdef DEBUG
508
	printf("Marking thread (thread %ld)\n", tld_get()->id);
509
#endif
510
	// mark the thread - with lock
511
	// TODO replace total ordering lock with private lock - perf. issue
512
	jlong result = -1;
513
	enter_critical_section(jvmti_env, to_buff_lock);
514
	{
515
		result = avail_thread_id++;
516
	}
517
	exit_critical_section(jvmti_env, to_buff_lock);
518

519
#ifdef DEBUG
520
	printf("Thread marked (thread %ld)\n", result);
521
#endif
522
	return result;
523
}
524

525 526

static size_t createAnalysisRequestHeader (
527
		buffer * buff, jshort analysis_method_id
528 529 530 531 532 533 534 535 536 537 538 539 540 541
) {
	// analysis method id
	pack_short(buff, analysis_method_id);

	// position of the short indicating the length of marshalled arguments
	size_t pos = buffer_filled(buff);

	// initial value of the length of the marshalled arguments
	pack_short(buff, 0xBAAD);

	return pos;
}


542
void analysis_start_buff(
543 544
		JNIEnv * jni_env, jshort analysis_method_id, jbyte ordering_id,
		struct tldata * tld
545
) {
546
#ifdef DEBUG
547
	printf("Analysis (buffer) start enter (thread %ld)\n", tld_get()->id);
548 549
#endif

550
	check_error(ordering_id < 0, "Buffer id has negative value");
551

552
	if(tld->local_pb == NULL) {
553
		// mark thread
554 555
		if(tld->id == INVALID_THREAD_ID) {
			tld->id = next_thread_id ();
556 557 558
		}

		// get buffers
559
		tld->local_pb = buffs_get(tld->id);
560 561 562
	}

	// set local buffers for this buffering
563 564
	tld->analysis_buff = tld->local_pb->analysis_buff;
	tld->command_buff = tld->local_pb->command_buff;
565

566
	tld->to_buff_id = ordering_id;
567

568 569 570
	// create request header, keep track of the position
	// of the length of marshalled arguments
	tld->args_length_pos = createAnalysisRequestHeader(tld->analysis_buff, analysis_method_id);
571 572

#ifdef DEBUG
573
	printf("Analysis (buffer) start exit (thread %ld)\n", tld_get()->id);
574 575 576 577
#endif
}


578
static size_t createAnalysisMsg(buffer * buff, jlong id) {
579
	// create analysis message
580 581 582 583 584 585 586 587 588 589

	// analysis msg
	pack_byte(buff, MSG_ANALYZE);

	// thread (total order buffer) id
	pack_long(buff, id);

	// get pointer to the location where count of requests will stored
	size_t pos = buffer_filled(buff);

590 591
	// request count space initialization
	pack_int(buff, 0xBAADF00D);
592 593 594 595

	return pos;
}

596 597


598
static void analysis_start(
599 600
		JNIEnv * jni_env, jshort analysis_method_id,
		struct tldata * tld
601
) {
602
#ifdef DEBUG
603
	printf("Analysis start enter (thread %ld)\n", tld_get()->id);
604 605
#endif

606
	if(tld->analysis_buff == NULL) {
607

608
		// mark thread
609 610
		if(tld->id == INVALID_THREAD_ID) {
			tld->id = next_thread_id ();
611
		}
612

613
		// get buffers
614 615 616
		tld->pb = buffs_get(tld->id);
		tld->analysis_buff = tld->pb->analysis_buff;
		tld->command_buff = tld->pb->command_buff;
617

618
		// determines, how many analysis requests are sent in one message
619
		tld->analysis_count = 0;
620

621
		// create analysis message
622
		tld->analysis_count_pos = createAnalysisMsg(tld->analysis_buff, tld->id);
623 624
	}

625 626 627
	// create request header, keep track of the position
	// of the length of marshalled arguments
	tld->args_length_pos = createAnalysisRequestHeader(tld->analysis_buff, analysis_method_id);
628 629

#ifdef DEBUG
630
	printf("Analysis start exit (thread %ld)\n", tld_get()->id);
631
#endif
632 633
}

634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
static void correct_cmd_buff_pos(buffer * cmd_buff, size_t shift) {

	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;

	objtag_rec ot_rec;

	// go through all records and shift the buffer position
	while(read < cmd_buff_len) {

		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));

		// shift buffer position
		ot_rec.buff_pos += shift;

		// write ot_rec data
		buffer_fill_at_pos(cmd_buff, read, &ot_rec, sizeof(ot_rec));

		// next
		read += sizeof(ot_rec);
	}
}

658
static void analysis_end_buff(struct tldata * tld) {
659
#ifdef DEBUG
660
	printf("Analysis (buffer) end enter (thread %ld)\n", tld_get()->id);
661 662 663 664
#endif

	// TODO lock for each buffer id

665
	// sending of half-full buffer is done in shutdown hook and obj free hook
666 667 668 669 670

	// write analysis to total order buffer - with lock
	enter_critical_section(jvmti_env, to_buff_lock);
	{
		// pointer to the total order buffer structure
671
		to_buff_struct * tobs = &(to_buff_array[tld->to_buff_id]);
672 673 674 675

		// allocate new buffer
		if(tobs->pb == NULL) {

676
			tobs->pb = buffs_get(tld->id);
677

678
			// set owner_id as t_buffid
679
			tobs->pb->owner_id = tld->to_buff_id;
680

681
			// determines, how many analysis requests are sent in one message
682 683
			tobs->analysis_count = 0;

684
			// create analysis message
685
			tobs->analysis_count_pos = createAnalysisMsg(
686
					tobs->pb->analysis_buff, tld->to_buff_id);
687 688
		}

689 690 691 692
		// first correct positions in command buffer
		// records in command buffer are positioned according to the local
		// analysis buffer but we want the position to be valid in total ordered
		// buffer
693
		correct_cmd_buff_pos(tld->local_pb->command_buff,
694 695
				buffer_filled(tobs->pb->analysis_buff));

696 697 698
		// fill total order buffers
		buffer_fill(tobs->pb->analysis_buff,
				// NOTE: normally access the buffer using methods
699 700
				tld->local_pb->analysis_buff->buff,
				tld->local_pb->analysis_buff->occupied);
701 702 703

		buffer_fill(tobs->pb->command_buff,
				// NOTE: normally access the buffer using methods
704 705
				tld->local_pb->command_buff->buff,
				tld->local_pb->command_buff->occupied);
706 707

		// empty local buffers
708 709
		buffer_clean(tld->local_pb->analysis_buff);
		buffer_clean(tld->local_pb->command_buff);
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731

		// add number of completed requests
		++(tobs->analysis_count);

		// buffer has to be updated each time because jvm could end and buffer
		// has to be up-to date
		buff_put_int(tobs->pb->analysis_buff, tobs->analysis_count_pos,
				tobs->analysis_count);

		// send only when the method count is reached
		if(tobs->analysis_count >= ANALYSIS_COUNT) {

			// send buffers for object tagging
			buffs_objtag(tobs->pb);

			// invalidate buffer pointer
			tobs->pb = NULL;
		}
	}
	exit_critical_section(jvmti_env, to_buff_lock);

	// reset analysis and command buffers for normal buffering
732 733 734
	if(tld->pb != NULL) {
		tld->analysis_buff = tld->pb->analysis_buff;
		tld->command_buff = tld->pb->command_buff;
735 736
	}
	else {
737 738
		tld->analysis_buff = NULL;
		tld->command_buff = NULL;
739 740 741
	}

	// invalidate buffer id
742
	tld->to_buff_id = INVALID_BUFF_ID;
743 744

#ifdef DEBUG
745
	printf("Analysis (buffer) end exit (thread %ld)\n", tld_get()->id);
746 747 748
#endif
}

749
static void analysis_end(struct tldata * tld) {
750 751 752 753
	// update the length of the marshalled arguments
	jshort args_length = buffer_filled(tld->analysis_buff) - tld->args_length_pos - sizeof (jshort);
	buff_put_short(tld->analysis_buff, tld->args_length_pos, args_length);

754
	// this method is also called for end of analysis for totally ordered API
755 756
	if(tld->to_buff_id != INVALID_BUFF_ID) {
		analysis_end_buff(tld);
757 758 759
		return;
	}

760
#ifdef DEBUG
761
	printf("Analysis end enter (thread %ld)\n", tld_get()->id);
762 763
#endif

764
	// sending of half-full buffer is done in thread end hook
765

Lukáš Marek's avatar
Lukáš Marek committed
766
	// increment the number of completed requests
767
	tld->analysis_count++;
768

Lukáš Marek's avatar
Lukáš Marek committed
769
	// buffer has to be updated each time - the thread can end any time
770
	buff_put_int(tld->analysis_buff, tld->analysis_count_pos, tld->analysis_count);
771

Lukáš Marek's avatar
Lukáš Marek committed
772
	// send only after the proper count is reached
773
	if(tld->analysis_count >= ANALYSIS_COUNT) {
774
		// invalidate buffer pointers
775 776
		tld->analysis_buff = NULL;
		tld->command_buff = NULL;
777

778
		// send buffers for object tagging
779
		buffs_objtag(tld->pb);
780 781

		// invalidate buffer pointer
782
		tld->pb = NULL;
783
	}
784 785

#ifdef DEBUG
786
	printf("Analysis end exit (thread %ld)\n", tld_get()->id);
787
#endif
788
}
789

790
// ******************* Object tagging thread *******************
791

792
// TODO add cache - ??
793

794 795
static void ot_pack_string_cache(JNIEnv * jni_env, buffer * buff,
		jstring to_send, jlong str_net_ref) {
796

797 798 799 800 801 802
	// get string length
	jsize str_len = (*jni_env)->GetStringUTFLength(jni_env, to_send);

	// get string data as utf-8
	const char * str = (*jni_env)->GetStringUTFChars(jni_env, to_send, NULL);
	check_error(str == NULL, "Cannot get string from java");
803

804 805 806
	// check if the size is sendable
	int size_fits = str_len < UINT16_MAX;
	check_error(! size_fits, "Java string is too big for sending");
807

808
	// add message to the buffer
809 810

	// msg id
811 812 813 814 815
	pack_byte(buff, MSG_NEW_STRING);
	// send string net reference
	pack_long(buff, str_net_ref);
	// send string
	pack_string_utf8(buff, str, str_len);
816

817 818
	// release string
	(*jni_env)->ReleaseStringUTFChars(jni_env, to_send, str);
819 820
}

821 822
static void ot_tag_object(JNIEnv * jni_env, buffer * buff, size_t buff_pos,
		jobject to_send, buffer * new_objs_buff) {
823

824 825 826 827
	// get net reference and put it on proper position
	buff_put_long(buff, buff_pos,
			get_net_reference(jni_env, jvmti_env, new_objs_buff, to_send));
}
828

829 830 831
// NOTE: this tagging uses cache
static void ot_tag_string(JNIEnv * jni_env, buffer * buff, size_t buff_pos,
		jobject to_send, buffer * new_objs_buff) {
832

833 834
	jlong net_ref =
			get_net_reference(jni_env, jvmti_env, new_objs_buff, to_send);
835

836 837 838 839
	// test if the string was already sent to the server
	// NOTE: we don't use lock here, so it is possible that multiple threads
	//       will send it, but this will not hurt (only performance)
	if(net_ref_get_spec(net_ref) == FALSE) {
840

841 842 843
		// update the send status
		net_ref_set_spec(&net_ref, TRUE);
		update_net_reference(jvmti_env, to_send, net_ref);
844

845 846 847
		// add cached string to the buffer
		ot_pack_string_cache(jni_env, new_objs_buff, to_send, net_ref);
	}
848

849 850
	buff_put_long(buff, buff_pos, net_ref);
}
851

852 853
static void ot_tag_class(JNIEnv * jni_env, buffer * buff, size_t buff_pos,
		jobject to_send, buffer * new_objs_buff) {
854

855 856 857
	// get class net reference...
	jlong net_ref =
			get_net_reference(jni_env, jvmti_env, new_objs_buff, to_send);
858

859
	// ... and put it on proper position
860
	buff_put_long(buff, buff_pos, net_ref);
861
}
862

863 864
static void ot_tag_buff(JNIEnv * jni_env, buffer * anl_buff, buffer * cmd_buff,
		buffer * new_objs_buff) {
865

866 867
	size_t cmd_buff_len = buffer_filled(cmd_buff);
	size_t read = 0;
868

869
	objtag_rec ot_rec;
870

871
	while(read < cmd_buff_len) {
872

873 874 875
		// read ot_rec data
		buffer_read(cmd_buff, read, &ot_rec, sizeof(ot_rec));
		read += sizeof(ot_rec);
876

877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896
		// tag
		switch(ot_rec.obj_type) {
		case OT_OBJECT: {
			ot_tag_object(jni_env, anl_buff, ot_rec.buff_pos, ot_rec.obj_to_tag,
					new_objs_buff);
			break;
		}
		case OT_STRING: {
			ot_tag_string(jni_env, anl_buff, ot_rec.buff_pos, ot_rec.obj_to_tag,
					new_objs_buff);
			break;
		}
		case OT_CLASS: {
			ot_tag_class(jni_env, anl_buff, ot_rec.buff_pos, ot_rec.obj_to_tag,
					new_objs_buff);
			break;
		}
		default:
			check_error(TRUE, "Undefined type to pack.");
		}
897

898 899
		// free global reference
		(*jni_env)->DeleteGlobalRef(jni_env, ot_rec.obj_to_tag);
900
	}
901 902
}

903
static void * objtag_thread_loop(void * obj) {
904

905 906 907 908 909
	// attach thread to jvm
	JNIEnv *jni_env;
	jvmtiError error = (*java_vm)->AttachCurrentThreadAsDaemon(java_vm,
			(void **)&jni_env, NULL);
	check_jvmti_error(jvmti_env, error, "Unable to attach objtag thread.");
910

911 912 913
	// one spare buffer for new objects
	buffer * new_obj_buff = malloc(sizeof(buffer));
	buffer_alloc(new_obj_buff);
914

915 916
	// exit when the jvm is terminated and there are no msg to process
	while(! (no_tagging_work && bq_length(&objtag_q) == 0) ) {
917

918 919
		// get buffer - before tagging lock
		process_buffs * pb = _buffs_objtag_get();
920

921
#ifdef DEBUG
922
		printf("Object tagging started (thread %ld)\n", tld_get()->id);
923 924
#endif

925 926 927
		// tag the objects - with lock
		enter_critical_section(jvmti_env, tagging_lock);
		{
928

929 930 931 932
			// tag objcects from buffer
			// note that analysis buffer is not required
			ot_tag_buff(jni_env, pb->analysis_buff, pb->command_buff,
					new_obj_buff);
933

934 935 936 937
			// exchange command_buff and new_obj_buff
			buffer * tmp = pb->command_buff;
			pb->command_buff = new_obj_buff;
			new_obj_buff = tmp;
938

939 940
			// clean new new_obj_buff
			buffer_clean(new_obj_buff);
941

942 943 944 945
			// send buffer
			buffs_send(pb);
		}
		exit_critical_section(jvmti_env, tagging_lock);
946 947

#ifdef DEBUG
948
		printf("Object tagging ended (thread %ld)\n", tld_get()->id);
949
#endif
950 951
	}

952 953 954
	buffer_free(new_obj_buff);
	free(new_obj_buff);
	new_obj_buff = NULL;
955

956 957
	return NULL;
}
958

959
// ******************* Sending thread *******************
960

961
static void _send_buffer(buffer * b) {
962

963
	// send data
964
	// NOTE: normally access the buffer using methods
965 966
	send_data(connection, b->buff, b->occupied);
}
967

968
static int open_connection() {
969

970 971 972 973
	// get host address
	struct addrinfo * addr;
	int gai_res = getaddrinfo(host_name, port_number, NULL, &addr);
	check_error(gai_res != 0, gai_strerror(gai_res));
974

975 976 977
	// create stream socket
	int sockfd = socket(addr->ai_family, SOCK_STREAM, 0);
	check_std_error(sockfd, -1, "Cannot create socket");
978

979 980 981
	// connect to server
	int conn_res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
	check_std_error(conn_res, -1, "Cannot connect to server");
982

983 984
	// free host address info
	freeaddrinfo(addr);
985

986
	return sockfd;
Lukáš Marek's avatar
Lukáš Marek committed
987 988
}

989
static void close_connection(int conn, jlong thread_id) {
Lukáš Marek's avatar
Lukáš Marek committed
990

991
	// send close message
992

993
	// obtain buffer
994
	process_buffs * buffs = buffs_get(thread_id);
995
	buffer * buff = buffs->command_buff;
996

997 998
	// msg id
	pack_byte(buff, MSG_CLOSE);
999

1000 1001
	// send buffer directly
	_send_buffer(buff);
1002

1003 1004
	// release buffer
	_buffs_release(buffs);
Lukáš Marek's avatar
Lukáš Marek committed
1005

1006 1007 1008
	// close socket
	close(conn);
}
Lukáš Marek's avatar
Lukáš Marek committed
1009 1010


1011
static void attach_current_thread_to_jvm () {
1012
	JNIEnv *jni_env;
1013
	jvmtiError error = (*java_vm)->AttachCurrentThreadAsDaemon(
1014
			java_vm, (void **)&jni_env, NULL
1015
	);
1016
	check_jvmti_error(jvmti_env, error, "Unable to attach send thread.");
1017 1018 1019 1020 1021
}

static void * send_thread_loop(void * obj) {
	connection = open_connection();
	attach_current_thread_to_jvm ();
Lukáš Marek's avatar
Lukáš Marek committed
1022

1023 1024
	// exit when the jvm is terminated and there are no msg to process
	while(! (no_sending_work && bq_length(&send_q) == 0) ) {
Lukáš Marek's avatar
Lukáš Marek committed
1025

1026
		// get buffer
1027 1028 1029
		// TODO thread could timeout here with timeout about 5 sec and check
		// if all of the buffers are allocated by the application threads
		// and all application threads are waiting on free buffer - deadlock
1030
		process_buffs * pb = _buffs_send_get();
1031

1032
#ifdef DEBUG
1033
		printf("Sending buffer (thread %ld)\n", tld_get()->id);
1034 1035
#endif

1036 1037 1038 1039
		// first send command buffer - contains new class or object ids,...
		_send_buffer(pb->command_buff);
		// send analysis buffer
		_send_buffer(pb->analysis_buff);
Lukáš Marek's avatar
Lukáš Marek committed
1040

1041 1042
		// release buffer
		_buffs_release(pb);
1043 1044

#ifdef DEBUG
1045
		printf("Buffer sent (thread %ld)\n", tld_get()->id);
1046
#endif
1047
	}
Lukáš Marek's avatar
Lukáš Marek committed
1048

1049
	// close connection
1050
	close_connection(connection, tld_get()->id);
1051 1052
	return NULL;
}
Lukáš Marek's avatar
Lukáš Marek committed
1053

1054
// ******************* CLASS LOAD callback *******************
Lukáš Marek's avatar
Lukáš Marek committed
1055

1056
void JNICALL jvmti_callback_class_file_load_hook(
1057 1058 1059 1060 1061
		jvmtiEnv *jvmti_env, JNIEnv* jni_env,
		jclass class_being_redefined, jobject loader,
		const char* name, jobject protection_domain,
		jint class_data_len, const unsigned char* class_data,
		jint* new_class_data_len, unsigned char** new_class_data
1062 1063
) {
	struct tldata * tld = tld_get();
Lukáš Marek's avatar
Lukáš Marek committed
1064

1065
	// TODO instrument analysis classes
Lukáš Marek's avatar
Lukáš Marek committed
1066

1067
#ifdef DEBUG
1068
	printf("Sending new class (thread %ld)\n", tld_get()->id);
1069 1070
#endif

1071 1072 1073 1074 1075
	// *** send class info ***

	// send new class message

	// obtain buffer - before tagging lock
1076
	process_buffs * buffs = buffs_get(tld->id);
1077
	buffer * buff = buffs->analysis_buff;
Lukáš Marek's avatar
Lukáš Marek committed
1078

1079 1080 1081 1082
	// tag the class loader - with lock
	enter_critical_section(jvmti_env, tagging_lock);
	{
		// retrieve class loader net ref
1083
		jlong loader_id = NULL_NET_REF;
1084 1085 1086 1087

		// this callback can be called before the jvm is started
		// the loaded classes are mostly java.lang.*
		// classes will be (hopefully) loaded by the same class loader
1088 1089
		// this phase is indicated by NULL_NET_REF in the class loader id and it
		// is then handled by server
1090
		if(jvm_started) {
1091
			loader_id = get_net_reference(
1092 1093
					jni_env, jvmti_env,
					buffs->command_buff, loader
1094
			);
1095
		}
1096

1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
		// msg id
		pack_byte(buff, MSG_NEW_CLASS);
		// class name
		pack_string_utf8(buff, name, strlen(name));
		// class loader id
		pack_long(buff, loader_id);
		// class code length
		pack_int(buff, class_data_len);
		// class code
		pack_bytes(buff, class_data, class_data_len);

		// send message
		buffs_send(buffs);
	}
	exit_critical_section(jvmti_env, tagging_lock);
1112 1113

#ifdef DEBUG
1114
	printf("New class sent (thread %ld)\n", tld_get()->id);
1115
#endif
1116 1117
}

1118

1119
// ******************* OBJECT FREE callback *******************
1120

1121
void JNICALL jvmti_callback_object_free_hook(
1122
		jvmtiEnv *jvmti_env, jlong tag
1123
) {
1124
#ifdef DEBUG
1125
	printf("Sending object free (thread %ld)\n", tld_get()->id);
1126 1127
#endif

1128 1129
	// NOTE: we don't need to send any other buffer with this one because
	// in the buffers are only life objects (global references)
1130

1131
	// send new obj free message
1132

1133
	// TODO buffer more msgs (send buffer at shutdown) - ??
1134
	// obtain buffer
1135
	process_buffs * buffs = buffs_get(tld_get()->id);
1136
	buffer * buff = buffs->analysis_buff;
1137

1138 1139 1140 1141
	// msg id
	pack_byte(buff, MSG_OBJ_FREE);
	// obj id
	pack_long(buff, tag);
1142

1143
	// send message
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
	// NOTE !: It is critical for proper ordering to send the buffer to the
	// object tagging queue.
	// Explanation: It is guaranteed, that no buffer held by an analysis thread
	// has this object, because all buffers have references to the objects they
	// are holding. The object tagging thread is the one who is releasing the
	// references. It is then necessary, that this event is put to the sending
	// queue after the object tagging thread puts the currently processed buffer
	// to the sending queue. This is easily arranged by putting this buffer to
	// the object tagging queue.
	buffs_objtag(buffs);
1154 1155

#ifdef DEBUG
1156
	printf("Object free sent (thread %ld)\n", tld_get()->id);
1157
#endif
1158
}
Lukáš Marek's avatar
Lukáš Marek committed
1159 1160


1161
// ******************* START callback *******************
Lukáš Marek's avatar
Lukáš Marek committed
1162

1163
void JNICALL jvmti_callback_vm_start_hook(
1164
		jvmtiEnv *jvmti_env, JNIEnv* jni_env
1165
) {
1166 1167
	jvm_started = TRUE;
}
Lukáš Marek's avatar
Lukáš Marek committed
1168

1169

1170
// ******************* INIT callback *******************