event_bus_utils.py 8.01 KB
Newer Older
1
2
import os
import pika
3
import logging
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

# for using it as library and as a __main__
try:
    from messages import *
except:
    from .messages import *

VERSION = '0.0.8'
AMQP_EXCHANGE = 'amq.topic'


class AmqpSynchCallTimeoutError(Exception):
    pass

19

20
21
class AmqpListener(threading.Thread):
    COMPONENT_ID = 'amqp_listener_%s' % uuid.uuid1()
22
    DEFAULT_TOPIC_SUSBCRIPTIONS = ['#']
23
24
    DEFAULT_EXCHAGE = 'amq.topic'

25
    def __init__(self, amqp_url, amqp_exchange, callback, topics=None):
26
27
28
29

        threading.Thread.__init__(self)

        if callback is None:
30
            self.message_dispatcher = print
31
        else:
32
            self.message_dispatcher = callback
33

34
35
        self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
        self.channel = self.connection.channel()
36
37
38
39
40
41
42
43
44
45
46
47
48

        if amqp_exchange:
            self.exchange = amqp_exchange
        else:
            self.exchange = self.DEFAULT_EXCHAGE

        # queues & default exchange declaration
        self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
        self.channel.queue_declare(queue=self.services_queue_name,
                                   auto_delete=True,
                                   arguments={'x-max-length': 200})

        if topics:  # subscribe only to passed list
49
50
51
            self.topics = topics
        else:
            self.topics = self.DEFAULT_EXCHAGE
52

53
        for t in self.topics:
54
55
            self.channel.queue_bind(exchange=self.exchange,
                                    queue=self.services_queue_name,
56
57
                                    routing_key=t)

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
        # Hello world message
        m = MsgTestingToolComponentReady(
            component=self.COMPONENT_ID,
            description="%s is READY" % self.COMPONENT_ID

        )

        self.channel.basic_publish(
            body=m.to_json(),
            routing_key=m.routing_key,
            exchange=self.exchange,
            properties=pika.BasicProperties(
                content_type='application/json',
            )
        )

        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.on_request, queue=self.services_queue_name)

    def stop(self):
        self.channel.queue_delete(self.services_queue_name)
        self.channel.stop_consuming()
        self.connection.close()

    def on_request(self, ch, method, props, body):

        props_dict = {
            'content_type': props.content_type,
            'delivery_mode': props.delivery_mode,
            'correlation_id': props.correlation_id,
            'reply_to': props.reply_to,
            'message_id': props.message_id,
            'timestamp': props.timestamp,
            'user_id': props.user_id,
            'app_id': props.app_id,
        }

        m = None
        try:
            m = Message.from_json(body)
            m.update_properties(**props_dict)
            m.routing_key = method.routing_key
100
            self.message_dispatcher(m)
101
102

        except NonCompliantMessageFormatError as e:
103
            logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e))
104
105

        except Exception as e:
106
            logging.error(e)
107
108
109
110
111

        finally:
            ch.basic_ack(delivery_tag=method.delivery_tag)

    def run(self):
112
        logging.info("Starting thread listening on the event bus on topics %s" % self.topics)
113
114
115
116
        for i in range(1, 4):
            try:
                self.channel.start_consuming()
            except pika.exceptions.ConnectionClosed as err:
117
118
119
120
                logging.error('Unexpected connection closed, retrying %s/%s' % (i, 4))
                logging.error(err)

        logging.info('Bye byes!')
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238


def publish_message(connection, message):
    """
    Publishes message into the correct topic (uses Message object metadata)
    Creates temporary channel on it's own
    Connection must be a pika.BlockingConnection
    """
    channel = None

    try:
        channel = connection.channel()

        properties = pika.BasicProperties(**message.get_properties())

        channel.basic_publish(
            exchange=AMQP_EXCHANGE,
            routing_key=message.routing_key,
            properties=properties,
            body=message.to_json(),
        )

    finally:
        if channel and channel.is_open:
            channel.close()


def amqp_request(connection, request_message, component_id):
    """
    Publishes message into the correct topic (uses Message object metadata)
    Returns reply message.
    Uses reply_to and corr id amqp's properties for matching the reply
    Creates temporary channel, and queues on it's own
    Connection must be a pika.BlockingConnection
    """

    # check first that sender didnt forget about reply to and corr id
    assert request_message.reply_to
    assert request_message.correlation_id

    channel = None

    try:
        response = None
        reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)

        channel = connection.channel()
        result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
        callback_queue = result.method.queue

        # bind and listen to reply_to topic
        channel.queue_bind(
            exchange=AMQP_EXCHANGE,
            queue=callback_queue,
            routing_key=request_message.reply_to
        )

        channel.basic_publish(
            exchange=AMQP_EXCHANGE,
            routing_key=request_message.routing_key,
            properties=pika.BasicProperties(**request_message.get_properties()),
            body=request_message.to_json(),
        )

        time.sleep(0.2)
        retries_left = 10

        while retries_left > 0:
            time.sleep(0.5)
            method, props, body = channel.basic_get(reply_queue_name)
            if method:
                channel.basic_ack(method.delivery_tag)
                if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
                    break
            retries_left -= 1

        if retries_left > 0:

            body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
            response = MsgReply(request_message, **body_dict)

        else:
            # clean up
            channel.queue_delete(reply_queue_name)
            raise AmqpSynchCallTimeoutError(
                "Response timeout! rkey: %s , request type: %s" % (
                    request_message.routing_key,
                    request_message._type
                )
            )

        return response

    finally:
        if channel and channel.is_open:
            # clean up
            channel.queue_delete(reply_queue_name)
            channel.close()


if __name__ == '__main__':

    try:
        AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
    except KeyError as e:
        AMQP_EXCHANGE = "amq.topic"

    try:
        AMQP_URL = str(os.environ['AMQP_URL'])
        print('Env vars for AMQP connection succesfully imported')

    except KeyError as e:
        AMQP_URL = "amqp://guest:guest@localhost/"


    def callback_function(message_received):
        print("Callback function received: \n\t" + repr(message_received))

239

240
241
242
243
    # amqp listener example:
    amqp_listener_thread = AmqpListener(
        amqp_url=AMQP_URL,
        amqp_exchange=AMQP_EXCHANGE,
244
245
246
        callback=callback_function,
        topics='#'
    )
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273

    try:
        amqp_listener_thread.start()
    except Exception as e:
        print(e)

    # publish message example
    retries_left = 3
    while retries_left > 0:
        try:
            connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
            m = MsgTest()
            publish_message(connection, m)
            break
        except pika.exceptions.ConnectionClosed:
            retries_left -= 1
            print('retrying..')
            time.sleep(0.2)

    # example of a request sent into the bus
    m = MsgTestSuiteGetTestCases()
    try:
        r = amqp_request(connection, m, 'someImaginaryComponent')
        print("This is the response I got:\n\t" + repr(r))

    except AmqpSynchCallTimeoutError as e:
        print("Nobody answered to our request :'(")