amqp_synch_call.py 4.69 KB
Newer Older
1
import os
2
3
import pika

4
5
6
7
8
9
# for using it as library and as a __main__
try:
    from messages import *
except:
    from .messages import *

10
VERSION = '0.0.7'
11
AMQP_EXCHANGE = 'amq.topic'
12

13

14
15
16
17
class AmqpSynchCallTimeoutError(Exception):
    pass


18
def publish_message(connection, message):
19
20
21
22
    """
    Publishes message into the correct topic (uses Message object metadata)
    Creates temporary channel on it's own
    Connection must be a pika.BlockingConnection
23
    """
24
    channel = None
25

26
27
    try:
        channel = connection.channel()
28

29
        properties = pika.BasicProperties(**message.get_properties())
30

31
        channel.basic_publish(
32
            exchange=AMQP_EXCHANGE,
33
34
35
36
37
38
39
40
            routing_key=message.routing_key,
            properties=properties,
            body=message.to_json(),
        )

    finally:
        if channel and channel.is_open:
            channel.close()
41

42

43
def amqp_request(connection, request_message, component_id):
44
45
46
47
48
49
50
    """
    Publishes message into the correct topic (uses Message object metadata)
    Returns reply message.
    Uses reply_to and corr id amqp's properties for matching the reply
    Creates temporary channel, and queues on it's own
    Connection must be a pika.BlockingConnection
    """
51

52
53
54
55
    # check first that sender didnt forget about reply to and corr id
    assert request_message.reply_to
    assert request_message.correlation_id

56
    channel = None
57

58
59
60
    try:
        response = None
        reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
61

Federico Sismondi's avatar
Federico Sismondi committed
62
        channel = connection.channel()
63
64
        result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
        callback_queue = result.method.queue
65

66
67
68
69
70
        # bind and listen to reply_to topic
        channel.queue_bind(
            exchange=AMQP_EXCHANGE,
            queue=callback_queue,
            routing_key=request_message.reply_to
71
72
        )

73
74
75
76
77
78
        channel.basic_publish(
            exchange=AMQP_EXCHANGE,
            routing_key=request_message.routing_key,
            properties=pika.BasicProperties(**request_message.get_properties()),
            body=request_message.to_json(),
        )
79

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
        time.sleep(0.2)
        retries_left = 10

        while retries_left > 0:
            time.sleep(0.5)
            method, props, body = channel.basic_get(reply_queue_name)
            if method:
                channel.basic_ack(method.delivery_tag)
                if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
                    break
            retries_left -= 1

        if retries_left > 0:

            body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
            response = MsgReply(request_message, **body_dict)

        else:
            # clean up
            channel.queue_delete(reply_queue_name)
100
            raise AmqpSynchCallTimeoutError(
101
102
103
104
105
                "Response timeout! rkey: %s , request type: %s" % (
                    request_message.routing_key,
                    request_message._type
                )
            )
106

107
        return response
108

109
110
111
112
113
    finally:
        if channel and channel.is_open:
            # clean up
            channel.queue_delete(reply_queue_name)
            channel.close()
114
115


116
if __name__ == '__main__':
117
118
119
120
121
122
123

    try:
        AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
    except KeyError as e:
        AMQP_EXCHANGE = "amq.topic"

    try:
124
        from urllib.parse import urlparse
125

126
        AMQP_URL = str(os.environ['AMQP_URL'])
127
        p = urlparse(AMQP_URL)
128
        AMQP_USER = p.username
129
        AMQP_PASS = p.password
130
        AMQP_SERVER = p.hostname
131
        AMQP_VHOST = p.path.strip('/')
132

133
        print('Env vars for AMQP connection succesfully imported')
134
135
136
137
138

    except KeyError as e:

        print('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
        # load default values
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
        AMQP_SERVER = "localhost"
        AMQP_USER = "guest"
        AMQP_PASS = "guest"
        AMQP_VHOST = "/"
        AMQP_URL = "amqp://{0}:{1}@{2}/{3}".format(AMQP_USER, AMQP_PASS, AMQP_SERVER, AMQP_VHOST)

    print(json.dumps(
        {
            'server': AMQP_SERVER,
            'session': AMQP_VHOST,
            'user': AMQP_USER,
            'pass': '#' * len(AMQP_PASS),
            'exchange': AMQP_EXCHANGE
        }
    ))
154

155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
    retries_left = 3
    while retries_left > 0:
        try:
            connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
            m = MsgTest()
            publish_message(connection, m)
            break
        except pika.exceptions.ConnectionClosed:
            retries_left -= 1
            print('retrying..')
            time.sleep(0.2)

    # r = amqp_request(connection, m, 'someImaginaryComponent')
    # print(repr(r))