activemqlistermanager.py 3.56 KB
Newer Older
1
import stomp, os, json, time
2
3
from threading import Thread

4
5
data_format = os.environ.get("DATA_FORMAT", "json")

6
7

class Listener(object):
8
    def __init__(self, conn, handler):
9
10
        self.conn = conn
        self.count = 0
11
        self.handler = handler
12
        self.start = time.time()
13

14
    def on_error(self, headers, message):
15
        print("received an error %s" % message)
16
17
18
19

    def on_message(self, headers, message):
        self.handler(message)

20

21
class Worker(Thread):
22
23
24
25
26
    def __init__(
        self, hostname, port, username, password, topic, handler, sleeping, index
    ):
        self.hostname = hostname
        self.port = port
27
        self.topic = topic
28
        self.handler = handler
29
        self.sleeping = sleeping
30
31
        self.index = index
        self.username = username
32
        self.password = password
33
34
        self.status = None
        self.normal_stop = False
35
36
37
38
39
40
        super(Worker, self).__init__()

    def getStatus(self):
        return self.status

    def getIndex(self):
41
        return self.index
42
43

    def stop(self):
44
        self.normal_stop = True
45
46
47

    def run(self):
        print("Worker {0} started".format(self.index))
48
49
50
51
52
        print(
            "Hostname : {0}\nPort: {1}\nTopic: {2}".format(
                self.hostname, self.port, self.topic
            )
        )
53
54
        while True:
            if self.normal_stop:
55
                break
56
57
            print("Trying to connect ...")
            try:
58
59
60
61
                conn = stomp.Connection(host_and_ports=[(self.hostname, self.port)])
                conn.set_listener("", Listener(conn, self.handler))
                conn.connect(login=self.username, passcode=self.password)
                conn.subscribe(destination=self.topic, id=1, ack="auto")
62
63
                self.status = "started"
                print("Waiting for messages...")
64
                while 1:
65
                    if self.normal_stop:
66
67
                        break
                    time.sleep(self.sleeping)
68
69
70
71
72
73
74
75
76
            except Exception as e:
                print("Could not connect to ActiveMQ broker")
                self.status = "error"
                print(e)
                time.sleep(5)
        print("End process")
        self.status = "stopped"


77
class ActiveMQManager:
78
79
80
81
82
    def __init__(self, handler):
        self.all_threads = []
        self.handler = handler
        thread_controller = Thread(target=self.workerController)
        thread_controller.start()
83
84

    def getData(self, data):
85
        if data_format == "json":
86
            _data = None
87
88
89
90
91
            try:
                _data = json.loads(data)
            except Exception as e:
                print("Could not decode json content")
                print("data content", data)
92
                return None
93
94
95
96
97
98
99
100
101
102
103
104
105
            self.handler(_data)

    def workerController(self):
        print("Worker controller started")
        while True:
            for w in self.all_threads:
                if w.getStatus() == "stopped" or w.getStatus() == "error":
                    w.stop()
                    print("Worker {0} will restart in 5 seconds".format(w.getIndex()))
                    time.sleep(5)
                    w.start()
            time.sleep(20)

106
    def startWorker(self, hostname, port, username, password, topic, key):
107
108
109
        for w in self.all_threads:
            if w.getIndex() == key:
                print("Connection already registered")
110
111
112
113
114
                return None
        sleeping = 5  # 5 seconds
        worker = Worker(
            hostname, port, username, password, topic, self.handler, sleeping, key
        )
115
116
        self.all_threads.append(worker)
        worker.start()