Commit f75f374a authored by Federico Sismondi's avatar Federico Sismondi
Browse files

no longer threading class, added example on how to execute as a separate process

parent c0b5b623
import six
import pika
import threading
import logging
from datetime import timedelta, datetime
from multiprocessing import Process
from datetime import datetime
import os
from messages import *
from pure_pcapy import Dumper, Pkthdr, DLT_IEEE802_15_4, DLT_RAW
class AmqpDataPacketDumper(threading.Thread):
class AmqpDataPacketDumper:
"""
Sniffs data.serial and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
Sniffs data.tun and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
......@@ -19,32 +19,45 @@ class AmqpDataPacketDumper(threading.Thread):
incl_len: the number of bytes of packet data actually captured and saved in the file. This value should never become larger than orig_len or the snaplen value of the global header.
orig_len: the length of the packet as it appeared on the network when it was captured. If incl_len and orig_len differ, the actually saved packet size was limited by snaplen.
"""
COMPONENT_ID = 'capture_dumper'
COMPONENT_ID = 'capture_dumper_%s' % uuid.uuid1() # uuid in case several dumpers listening to bus
DEFAULT_DUMP_DIR = './dumps'
def __init__(self, connection, topics):
threading.Thread.__init__(self)
# queues & default exchange declaration
self.messages_dumped = 0
def __init__(self, amqp_url, amqp_exchange, topics, dump_dir=None):
self.connection = connection
self.url = amqp_url
self.exchange = amqp_exchange
# setup connection
self.connection = pika.BlockingConnection(pika.URLParameters(self.url))
# queues & default exchange declaration
self.messages_dumped = 0
self.channel = self.connection.channel()
self.data_queue_name = 'data@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.data_queue_name, auto_delete=True)
self.channel.queue_declare(
queue=self.data_queue_name,
auto_delete=True,
arguments={'x-max-length': 100}
)
for t in topics:
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
self.channel.queue_bind(exchange=self.exchange,
queue=self.data_queue_name,
routing_key=t)
if dump_dir and os.path.isdir(dump_dir):
self.dump_dir = dump_dir
else:
self.dump_dir = self.DEFAULT_DUMP_DIR
# Hello world message
self.channel.basic_publish(
body=json.dumps({'_type': '%s.info' % self.COMPONENT_ID,
'value': '%s is up!' % self.COMPONENT_ID, }
),
routing_key='control.%s.info' % self.COMPONENT_ID,
exchange=AMQP_EXCHANGE,
exchange=self.exchange,
properties=pika.BasicProperties(
content_type='application/json',
)
......@@ -56,14 +69,14 @@ class AmqpDataPacketDumper(threading.Thread):
datetime_string = time.strftime("%Y%m%d_%H%M")
network_type = "DLT_IEEE802_15_4"
self.pcap_15_4_dumper = Dumper(
filename="{0}_{1}.pcap".format(datetime_string, network_type),
filename=os.path.join(self.dump_dir, "{0}_{1}.pcap".format(datetime_string, network_type)),
snaplen=200,
network=DLT_IEEE802_15_4
)
network_type = "DLT_RAW"
self.pcap_raw_ip_dumper = Dumper(
filename="{0}_{1}.pcap".format(datetime_string, network_type),
filename=os.path.join(self.dump_dir, "{0}_{1}.pcap".format(datetime_string, network_type)),
snaplen=200,
network=DLT_RAW
)
......@@ -151,12 +164,7 @@ class AmqpDataPacketDumper(threading.Thread):
print('Bye byes!')
if __name__ == '__main__':
MESSAGE_UI_SELECTOR = 1
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
def amqp_data_packet_dumper():
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
print('Imported AMQP_EXCHANGE env var: %s' % AMQP_EXCHANGE)
......@@ -185,8 +193,6 @@ if __name__ == '__main__':
AMQP_URL = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
channel = connection.channel()
logging.info("AMQP connection established")
# in case its not declared
connection.channel().exchange_declare(exchange=AMQP_EXCHANGE,
......@@ -201,7 +207,22 @@ if __name__ == '__main__':
'data.tun.fromAgent.coap_client_agent',
]
pcap_dumper = AmqpDataPacketDumper(
connection=connection,
topics=pcap_amqp_topic_subscriptions
amqp_url=AMQP_URL,
amqp_exchange=AMQP_EXCHANGE,
topics=pcap_amqp_topic_subscriptions,
dump_dir=None
)
pcap_dumper.start()
# pcap_dumper.start()
pcap_dumper.run()
if __name__ == '__main__':
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
p = Process(target=amqp_data_packet_dumper, args=())
p.start()
for i in range(1, 1000):
time.sleep(1)
print(i)
p.join()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment