Commit 0047be2f authored by Lubomir Bulej's avatar Lubomir Bulej
Browse files

Fix termination of sender/receiver threads in unified agent

parent 42eb42ea
......@@ -26,6 +26,8 @@
// ******************* Communication *******************
static int connection_sockfd;
static void
send_data (int sockfd, buffer_t * restrict buffer) {
const size_t end_position = buffer_position (buffer);
......@@ -82,13 +84,13 @@ close_connection (int sockfd) {
}
// ******************* Receiver routines *******************
static pthread_t receiver;
static volatile int stop_sender_thread = 0;
static pthread_t receiver;
static volatile bool stop_requested = false;
static void *
receiver_loop (void * arg) {
int sockfd = (int) arg;
int sockfd = *((int *) arg);
// Create fake connection struct so we can use the disl recv methods.
// Get rid of this once the disl network subsystem becomes redundant.
......@@ -96,7 +98,7 @@ receiver_loop (void * arg) {
.sockfd = sockfd
};
while (!stop_sender_thread) {
while (!stop_requested) {
fd_set rfds;
FD_ZERO (&rfds);
FD_SET (sockfd, &rfds);
......@@ -140,7 +142,7 @@ static blocking_queue send_q;
static void *
sender_loop (void * arg) {
ldebug ("sender thread 0x%lx started\n", pthread_self ());
int sockfd = (int) arg;
int sockfd = *((int *) arg);
uint32_t request_id = 1;
......@@ -148,7 +150,7 @@ sender_loop (void * arg) {
buffer_init (&temp_buffer, BUFFER_INITIAL_CAPACITY);
// exit when the jvm is terminated and there are no msg to process
while (!stop_sender_thread || bq_length (&send_q) != 0) {
while (!stop_requested || bq_length (&send_q) != 0) {
ldebug (
"waiting for buffer (stop_sender: %d, queue length: %lu)\n",
stop_sender_thread, bq_length (&send_q)
......@@ -211,7 +213,6 @@ sender_loop (void * arg) {
}
ldebug ("sender thread terminating\n");
close_connection (sockfd);
return NULL;
}
......@@ -231,13 +232,13 @@ sender_init () {
void
sender_start () {
ldebug ("creating sender thread\n");
int sockfd = open_connection ();
connection_sockfd = open_connection ();
int result = pthread_create (&sender, NULL, sender_loop, (void *) sockfd);
int result = pthread_create (&sender, NULL, sender_loop, &connection_sockfd);
check_error (result != 0, "failed to create sender thread");
ldebug ("created sender thread: %d\n", result);
result = pthread_create (&receiver, NULL, receiver_loop, (void *) sockfd);
result = pthread_create (&receiver, NULL, receiver_loop, &connection_sockfd);
check_error (result != 0, "failed to create receiver thread");
ldebug ("created receiver thread: %d\n", result);
}
......@@ -245,7 +246,7 @@ sender_start () {
void
sender_stop () {
ldebug ("stopping sender thread\n");
ldebug ("stopping sender/receiver threads\n");
//
// Signal the sender thread to stop working. Because the thread may be
......@@ -258,18 +259,24 @@ sender_stop () {
// NOTE/TODO: Also, the buffers should be numbered according to their
// arrival in the send queue. This has to be supported by the queue itself.
//
stop_sender_thread = 1;
stop_requested = true;
process_buffs * empty = pb_normal_get (0);
sender_enqueue (empty);
ldebug ("waiting for receiver thread to stop \n");
ldebug ("waiting for receiver thread to stop\n");
int result = pthread_join (receiver, NULL);
check_error (result != 0, "failed to join receiver thread");
ldebug ("waiting for sender thread to stop \n");
ldebug ("waiting for sender thread to stop\n");
result = pthread_join (sender, NULL);
check_error (result != 0, "failed to join sender thread");
//
// Send the "session close" message and close the connection
// after both threads terminate.
//
close_connection (connection_sockfd);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment