Commit 1437d12a authored by Federico Sismondi's avatar Federico Sismondi
Browse files

some modifs in messages lib, deteled useless modules, added some example messages in cli component

parent 76f55dfc
......@@ -92,7 +92,7 @@ class NullLogHandler(logging.Handler):
class AmqpSniffer(threading.Thread):
COMPONENT_ID = 'amqp_sniffer'
COMPONENT_ID = 'amqp_sniffer_%s' % uuid.uuid1()
DEFAULT_EXCHAGE = 'amq.topic'
def __init__(self, conn, exchange=None, topics=None):
......@@ -542,7 +542,7 @@ if __name__ == '__main__':
'snif0': MsgSniffingStart(
capture_id='TD_COAP_CORE_01',
filter_if='tun0',
filter_proto='udp port 5683'
filter_proto='udp'
),
'snif1': MsgSniffingStop(),
# get a particular capture file
......@@ -629,7 +629,8 @@ if __name__ == '__main__':
# 'tt11': MsgStepCheckExecute(step_id="TD_COAP_CORE_01_v01_step_02"),
# 'tt12': MsgStepCheckExecute(step_id="TD_COAP_CORE_01_v01_step_03"),
'tt13': MsgStepVerifyExecute(step_id="TD_COAP_CORE_01_v01_step_04"),
'tt100': MsgTestSuiteReport(),
'ttver':MsgTestCaseVerdict(),
'ttrepo': MsgTestSuiteReport(),
# for 6lowpan TT tests
's_hc_01': MsgStepStimuliExecute(step_id='TD_6LowPAN_HC_01_step_01', node='eut1'),
's_hc_02': MsgStepStimuliExecute(step_id='TD_6LowPAN_HC_02_step_01', node='eut1'),
......
......@@ -461,7 +461,7 @@ class MsgSessionLog(Message):
_msg_data_template = {
"_type": "log",
"component": "the_drummer",
"description": "I've got blisters on my fingers!"
"message": "I've got blisters on my fingers!"
}
......@@ -1267,7 +1267,7 @@ class MsgSniffingStart(Message):
"_type": "sniffing.start",
"capture_id": "TD_COAP_CORE_01",
"filter_if": "tun0",
"filter_proto": "udp port 5683"
"filter_proto": "udp"
}
......
import six
import os
import json
import pika
import signal
import shutil
import logging
from multiprocessing import Process
from datetime import datetime
import os
from messages import *
from pure_pcapy import Dumper, Pkthdr, DLT_IEEE802_15_4, DLT_RAW
from pure_pcapy import Dumper, Pkthdr, DLT_RAW, DLT_IEEE802_15_4_NOFCS
try:
# For Python 3.0 and later
from urllib.parse import urlparse
except ImportError:
# Fall back to Python 2
from urlparse import urlparse
logger = logging.getLogger(__name__)
VERSION = '0.0.1'
def launch_amqp_data_to_pcap_dumper(amqp_url=None, amqp_exchange=None, topics=None, dump_dir=None):
def signal_int_handler(self, frame):
logger.info('got SIGINT, stopping dumper..')
if pcap_dumper:
pcap_dumper.stop()
signal.signal(signal.SIGINT, signal_int_handler)
if amqp_url and amqp_exchange:
amqp_exchange = amqp_exchange
amqp_url = amqp_url
else:
try:
amqp_exchange = str(os.environ['AMQP_EXCHANGE'])
print('Imported AMQP_EXCHANGE env var: %s' % amqp_exchange)
except KeyError as e:
amqp_exchange = "amq.topic"
print('Cannot retrieve environment variables for AMQP EXCHANGE. Loading default: %s' % amqp_exchange)
try:
amqp_url = str(os.environ['AMQP_URL'])
print('Imported AMQP_URL env var: %s' % amqp_url)
p = urlparse(amqp_url)
user = p.username
server = p.hostname
logger.info(
"Env variables imported for AMQP connection, User: {0} @ Server: {1} ".format(user,
server))
except KeyError:
print('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
# load default values
amqp_url = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
if topics:
pcap_amqp_topic_subscriptions = topics
else:
pcap_amqp_topic_subscriptions = ['data.serial.fromAgent.coap_client_agent',
'data.serial.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_client_agent',
]
# init pcap_dumper
pcap_dumper = AmqpDataPacketDumper(
amqp_url=amqp_url,
amqp_exchange=amqp_exchange,
topics=pcap_amqp_topic_subscriptions,
dump_dir=dump_dir,
)
# start pcap_dumper
pcap_dumper.run()
class AmqpDataPacketDumper:
"""
Sniffs data.serial and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
Sniffs data.tun and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
Sniffs data.tun and dumps into pcap file (assumes that frames are DLT_RAW)
about pcap header:
ts_sec: the date and time when this packet was captured. This value is in seconds since January 1, 1970 00:00:00 GMT; this is also known as a UN*X time_t. You can use the ANSI C time() function from time.h to get this value, but you might use a more optimized way to get this timestamp value. If this timestamp isn't based on GMT (UTC), use thiszone from the global header for adjustments.
ts_usec: in regular pcap files, the microseconds when this packet was captured, as an offset to ts_sec. In nanosecond-resolution files, this is, instead, the nanoseconds when the packet was captured, as an offset to ts_sec /!\ Beware: this value shouldn't reach 1 second (in regular pcap files 1 000 000; in nanosecond-resolution files, 1 000 000 000); in this case ts_sec must be increased instead!
incl_len: the number of bytes of packet data actually captured and saved in the file. This value should never become larger than orig_len or the snaplen value of the global header.
orig_len: the length of the packet as it appeared on the network when it was captured. If incl_len and orig_len differ, the actually saved packet size was limited by snaplen.
ts_sec: the date and time when this packet was captured. This value is in seconds since January 1,
1970 00:00:00 GMT; this is also known as a UN*X time_t. You can use the ANSI C time() function
from time.h to get this value, but you might use a more optimized way to get this timestamp value.
If this timestamp isn't based on GMT (UTC), use thiszone from the global header for adjustments.
ts_usec: in regular pcap files, the microseconds when this packet was captured, as an offset to ts_sec.
In nanosecond-resolution files, this is, instead, the nanoseconds when the packet was captured, as
an offset to ts_sec
/!\ Beware: this value shouldn't reach 1 second (in regular pcap files 1 000 000;
in nanosecond-resolution files, 1 000 000 000); in this case ts_sec must be increased instead!
incl_len: the number of bytes of packet data actually captured and saved in the file. This value should
never become larger than orig_len or the snaplen value of the global header.
orig_len: the length of the packet as it appeared on the network when it was captured. If incl_len and
orig_len differ, the actually saved packet size was limited by snaplen.
"""
COMPONENT_ID = 'capture_dumper_%s' % uuid.uuid1() # uuid in case several dumpers listening to bus
DEFAULT_DUMP_DIR = 'dumps'
DEFAULT_DUMP_DIR = 'tmp'
DEFAULT_RAWIP_DUMP_FILENAME = "DLT_RAW.pcap"
DEFAULT_802154_DUMP_FILENAME = "DLT_IEEE802_15_4_NO_FCS.pcap"
NETWORK_DUMPS = [DEFAULT_802154_DUMP_FILENAME, DEFAULT_RAWIP_DUMP_FILENAME]
def __init__(self, amqp_url, amqp_exchange, topics, dump_dir):
DEFAULT_RAWIP_DUMP_FILENAME_WR = "DLT_RAW.pcap~"
DEFAULT_802154_DUMP_FILENAME_WR = "DLT_IEEE802_15_4_NO_FCS.pcap~"
NETWORK_DUMPS_TEMP = [DEFAULT_RAWIP_DUMP_FILENAME_WR, DEFAULT_802154_DUMP_FILENAME_WR]
QUANTITY_MESSAGES_PER_PCAP = 100
def __init__(self, amqp_url, amqp_exchange, topics, dump_dir=None):
self.messages_dumped = 0
self.url = amqp_url
self.exchange = amqp_exchange
# setup connection
self.connection = pika.BlockingConnection(pika.URLParameters(self.url))
if dump_dir:
self.dump_dir = dump_dir
else:
self.dump_dir = self.DEFAULT_DUMP_DIR
# queues & default exchange declaration
self.messages_dumped = 0
if not os.path.exists(self.dump_dir):
os.makedirs(self.dump_dir)
# pcap dumpers
self.pcap_15_4_dumper = None
self.pcap_raw_ip_dumper = None
self.dumpers_init()
# AMQP stuff
self.connection = pika.BlockingConnection(pika.URLParameters(self.url)) # queues & default exchange declaration
self.channel = self.connection.channel()
self.data_queue_name = 'data@%s' % self.COMPONENT_ID
self.channel.queue_declare(
queue=self.data_queue_name,
auto_delete=True,
arguments={'x-max-length': 100}
)
self.channel.queue_declare(queue=self.data_queue_name,
auto_delete=True,
arguments={'x-max-length': 1000}
)
# subscribe to data plane channels
for t in topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.data_queue_name,
routing_key=t)
if dump_dir:
self.dump_dir = dump_dir
else:
self.dump_dir = self.DEFAULT_DUMP_DIR
# subscribe to channel where the terminate session message is published
self.channel.queue_bind(exchange=self.exchange,
queue=self.data_queue_name,
routing_key='control.session')
if not os.path.exists(self.dump_dir):
os.makedirs(self.dump_dir)
# Hello world message
# publish Hello message in bus
self.channel.basic_publish(
body=json.dumps({'_type': '%s.info' % self.COMPONENT_ID,
'value': '%s is up!' % self.COMPONENT_ID, }
......@@ -68,33 +166,101 @@ class AmqpDataPacketDumper:
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.data_queue_name)
datetime_string = time.strftime("%Y%m%d_%H%M")
network_type = "DLT_IEEE802_15_4"
def dumpers_init(self):
for net_dump_filename in self.NETWORK_DUMPS_TEMP:
full_path = os.path.join(self.dump_dir, net_dump_filename)
if os.path.isfile(full_path):
os.remove(full_path)
self.pcap_15_4_dumper = Dumper(
filename=os.path.join(self.dump_dir, "{0}_{1}.pcap".format(datetime_string, network_type)),
filename=os.path.join(self.dump_dir, self.DEFAULT_802154_DUMP_FILENAME_WR),
snaplen=200,
network=DLT_IEEE802_15_4
network=DLT_IEEE802_15_4_NOFCS
)
network_type = "DLT_RAW"
self.pcap_raw_ip_dumper = Dumper(
filename=os.path.join(self.dump_dir, "{0}_{1}.pcap".format(datetime_string, network_type)),
filename=os.path.join(self.dump_dir, self.DEFAULT_RAWIP_DUMP_FILENAME_WR),
snaplen=200,
network=DLT_RAW
)
def dump_packet(self, message):
try:
t = time.time()
t_s = int(t)
t_u_delta = int((t - t_s) * 1000000)
if 'serial' in message.interface_name:
raw_packet = bytes(message.data)
packet_slip = bytes(message.data_slip)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=t_s,
ts_usec=t_u_delta,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_15_4_dumper.dump(pcap_packet_header, raw_packet)
self.messages_dumped += 1
shutil.copyfile(
os.path.join(self.dump_dir, self.DEFAULT_802154_DUMP_FILENAME_WR),
os.path.join(self.dump_dir, self.DEFAULT_802154_DUMP_FILENAME)
)
elif 'tun' in message.interface_name:
raw_packet = bytes(message.data)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=t_s,
ts_usec=t_u_delta,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_raw_ip_dumper.dump(pcap_packet_header, raw_packet)
self.messages_dumped += 1
shutil.copyfile(
os.path.join(self.dump_dir, self.DEFAULT_RAWIP_DUMP_FILENAME_WR),
os.path.join(self.dump_dir, self.DEFAULT_RAWIP_DUMP_FILENAME)
)
else:
logger.info('Raw packet not dumped to pcap: ' + repr(message))
return
except Exception as e:
logger.error(e)
print('Messages dumped : ' + str(self.messages_dumped))
def dumps_rotate(self):
for net_dump_filename in self.NETWORK_DUMPS:
full_path = os.path.join(self.dump_dir, net_dump_filename)
if os.path.isfile(full_path):
logger.info('rotating file dump: %s' % full_path)
shutil.copyfile(
full_path,
os.path.join(self.dump_dir, datetime.now().strftime('%Y%m%d_%H%M%S_') + net_dump_filename),
)
def stop(self):
logger.info("Stopping packet dumper..")
self.channel.queue_delete(self.data_queue_name)
self.channel.stop_consuming()
self.connection.close()
def on_request(self, ch, method, props, body):
now = datetime.now()
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
logging.info("Message sniffed: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
self.messages_dumped += 1
try:
......@@ -111,53 +277,38 @@ class AmqpDataPacketDumper:
m = Message.from_json(body)
m.update_properties(**props_dict)
logger.info('got event: %s' % type(m))
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
self.stop()
if isinstance(m, MsgPacketSniffedRaw):
try:
if 'serial' in m.interface_name:
raw_packet = bytes(m.data)
packet_slip = bytes(m.data_slip)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,
ts_usec=now.microsecond,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_15_4_dumper.dump(pcap_packet_header, raw_packet)
elif 'tun' in m.interface_name:
raw_packet = bytes(m.data)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,
ts_usec=now.microsecond,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_raw_ip_dumper.dump(pcap_packet_header, raw_packet)
else:
logging.info('raw packet not dumped to pcap: ' + repr(m))
except TypeError as e:
logging.error(str(e))
logging.error(repr(m))
self.dump_packet(m)
try: # rotate files each X messages dumped
if self.messages_dumped != 0 and self.messages_dumped % self.QUANTITY_MESSAGES_PER_PCAP == 0:
self.dumps_rotate()
self.dumpers_init()
except Exception as e:
logger.error(e)
else:
logging.info('drop amqp message: ' + repr(m))
logger.info('drop amqp message: ' + repr(m))
except NonCompliantMessageFormatError as e:
print('* * * * * * API VALIDATION ERROR * * * * * * * ')
print("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
print('* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise NonCompliantMessageFormatError("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
# raise NonCompliantMessageFormatError("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
except Exception as e:
logger.error(e)
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
logger.error("Message: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
def run(self):
print("Starting thread listening on the event bus")
......@@ -165,65 +316,15 @@ class AmqpDataPacketDumper:
print('Bye byes!')
def amqp_data_packet_dumper():
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
print('Imported AMQP_EXCHANGE env var: %s' % AMQP_EXCHANGE)
except KeyError as e:
AMQP_EXCHANGE = "amq.topic"
print('Cannot retrieve environment variables for AMQP EXCHANGE. Loading default: %s' % AMQP_EXCHANGE)
try:
AMQP_URL = str(os.environ['AMQP_URL'])
print('Imported AMQP_URL env var: %s' % AMQP_URL)
p = six.moves.urllib_parse.urlparse(AMQP_URL)
AMQP_USER = p.username
AMQP_SERVER = p.hostname
logging.info(
"Env variables imported for AMQP connection, User: {0} @ Server: {1} ".format(AMQP_USER,
AMQP_SERVER))
except KeyError as e:
print('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
# load default values
AMQP_URL = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
# in case its not declared
connection.channel().exchange_declare(exchange=AMQP_EXCHANGE,
type='topic',
durable=True,
)
# start pcap dumper
pcap_amqp_topic_subscriptions = ['data.serial.fromAgent.coap_client_agent',
'data.serial.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_client_agent',
]
pcap_dumper = AmqpDataPacketDumper(
amqp_url=AMQP_URL,
amqp_exchange=AMQP_EXCHANGE,
topics=pcap_amqp_topic_subscriptions,
dump_dir=None
)
# pcap_dumper.start()
pcap_dumper.run()
if __name__ == '__main__':
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
import multiprocessing
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
p = Process(target=amqp_data_packet_dumper, args=())
p = multiprocessing.Process(target=launch_amqp_data_to_pcap_dumper(), args=())
p.start()
for i in range(1, 1000):
time.sleep(1)
print(i)
p.join()
\ No newline at end of file
p.join()
......@@ -268,18 +268,20 @@ class AmqpSniffer(threading.Thread):
self.channel = self.connection.channel()
self.services_queu_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queu_name, auto_delete=True)
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete = True,
arguments = {'x-max-length': 100})
if topics: # susbscribe only to passed list
for t in topics:
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.services_queu_name,
queue=self.services_queue_name,
routing_key=t)
else: # subscribe to all events
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.services_queu_name,
queue=self.services_queue_name,
routing_key='#')
# Hello world message
......@@ -293,10 +295,10 @@ class AmqpSniffer(threading.Thread):
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.services_queu_name)
self.channel.basic_consume(self.on_request, queue=self.services_queue_name)
def stop(self):
self.channel.queue_delete(self.services_queu_name)
self.channel.queue_delete(self.services_queue_name)
self.channel.stop_consuming()
self.connection.close()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment