Commit c03392ec authored by Federico Sismondi's avatar Federico Sismondi

added _api_version field in SessionLog messages, also adopter a let-it-crash...

added _api_version field in SessionLog messages, also adopter a let-it-crash approach when sending or receiving messages on the bus regarding amqp connection, channels and broken pipes
parent 568dc306
Pipeline #1956 passed with stage
in 0 seconds
...@@ -10,7 +10,7 @@ try: ...@@ -10,7 +10,7 @@ try:
except: except:
from .messages import * from .messages import *
VERSION = '0.0.9' VERSION = '0.0.10'
AMQP_EXCHANGE = 'amq.topic' AMQP_EXCHANGE = 'amq.topic'
MAX_LOG_LINE_LENGTH = 120 MAX_LOG_LINE_LENGTH = 120
...@@ -168,23 +168,18 @@ def publish_message(connection, message): ...@@ -168,23 +168,18 @@ def publish_message(connection, message):
Creates temporary channel on it's own Creates temporary channel on it's own
Connection must be a pika.BlockingConnection Connection must be a pika.BlockingConnection
""" """
channel = None
try:
channel = connection.channel()
properties = pika.BasicProperties(**message.get_properties()) channel = connection.channel()
properties = pika.BasicProperties(**message.get_properties())
channel.basic_publish( channel.basic_publish(
exchange=AMQP_EXCHANGE, exchange=AMQP_EXCHANGE,
routing_key=message.routing_key, routing_key=message.routing_key,
properties=properties, properties=properties,
body=message.to_json(), body=message.to_json(),
) )
finally: if channel and channel.is_open:
if channel and channel.is_open: channel.close()
channel.close()
def amqp_request(connection, request_message, component_id, retries=10): def amqp_request(connection, request_message, component_id, retries=10):
...@@ -204,63 +199,60 @@ def amqp_request(connection, request_message, component_id, retries=10): ...@@ -204,63 +199,60 @@ def amqp_request(connection, request_message, component_id, retries=10):
time_between_requests = 0.5 time_between_requests = 0.5
channel = None channel = None
response = None
try:
response = None reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
channel = connection.channel()
channel = connection.channel() result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True) callback_queue = result.method.queue
callback_queue = result.method.queue
# bind and listen to reply_to topic
# bind and listen to reply_to topic channel.queue_bind(
channel.queue_bind( exchange=AMQP_EXCHANGE,
exchange=AMQP_EXCHANGE, queue=callback_queue,
queue=callback_queue, routing_key=request_message.reply_to
routing_key=request_message.reply_to )
)
channel.basic_publish(
channel.basic_publish( exchange=AMQP_EXCHANGE,
exchange=AMQP_EXCHANGE, routing_key=request_message.routing_key,
routing_key=request_message.routing_key, properties=pika.BasicProperties(**request_message.get_properties()),
properties=pika.BasicProperties(**request_message.get_properties()), body=request_message.to_json(),
body=request_message.to_json(), )
)
time.sleep(0.2)
time.sleep(0.2) retries_left = retries
retries_left = retries
while retries_left > 0:
while retries_left > 0: time.sleep(time_between_requests)
time.sleep(time_between_requests) method, props, body = channel.basic_get(reply_queue_name)
method, props, body = channel.basic_get(reply_queue_name) if method:
if method: channel.basic_ack(method.delivery_tag)
channel.basic_ack(method.delivery_tag) if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id: break
break retries_left -= 1
retries_left -= 1
if retries_left > 0:
if retries_left > 0: body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict) else:
response = MsgReply(request_message, **body_dict) # clean up
channel.queue_delete(reply_queue_name)
else: channel.close()
# clean up raise AmqpSynchCallTimeoutError(
channel.queue_delete(reply_queue_name) "Response timeout! rkey: %s , request type: %s" % (
raise AmqpSynchCallTimeoutError( request_message.routing_key,
"Response timeout! rkey: %s , request type: %s" % ( type(request_message)
request_message.routing_key,
type(request_message)
)
) )
)
return response if channel and channel.is_open:
# clean up
channel.queue_delete(reply_queue_name)
channel.close()
finally: return response
if channel and channel.is_open:
# clean up
channel.queue_delete(reply_queue_name)
channel.close()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -40,6 +40,7 @@ except ImportError: ...@@ -40,6 +40,7 @@ except ImportError:
pass pass
VERSION = '0.0.8' VERSION = '0.0.8'
API_VERSION = '1.0.8'
# defaults vars # defaults vars
AMQP_URL = 'amqp://guest:guest@localhost' AMQP_URL = 'amqp://guest:guest@localhost'
...@@ -172,6 +173,7 @@ class JsonFormatter(logging.Formatter): ...@@ -172,6 +173,7 @@ class JsonFormatter(logging.Formatter):
try: try:
log_record = OrderedDict() log_record = OrderedDict()
log_record['component'] = record.name log_record['component'] = record.name
log_record['_api_version'] = API_VERSION
except NameError: except NameError:
log_record = {} log_record = {}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment