Commit b1bd38a8 authored by Federico Sismondi's avatar Federico Sismondi

some fixes, now pcap dumper creates empty pcap from begining, so we can query...

some fixes, now pcap dumper creates empty pcap from begining, so we can query it and avoid getting an non exisiting file exception
parent 68fef9fe
Pipeline #1818 passed with stage
in 0 seconds
...@@ -81,12 +81,45 @@ class AmqpDataPacketDumper: ...@@ -81,12 +81,45 @@ class AmqpDataPacketDumper:
# fixme deprecate old rkey fromats # fixme deprecate old rkey fromats
DEFAULT_TOPICS = ['#.fromAgent.#', 'fromAgent.#'] DEFAULT_TOPICS = ['#.fromAgent.#', 'fromAgent.#']
DEFAULT_DUMP_DIR = "tmp" DEFAULT_DUMP_DIR = "tmp"
DEFAULT_LOG_LEVEL = 10 # 10-> debug
DEFAULT_FILENAME = "DLT_RAW.pcap" DEFAULT_FILENAME = "DLT_RAW.pcap"
DEFAULT_FILENAME_WR = "DLT_RAW.pcap~"
QUANTITY_MESSAGES_PER_PCAP = 100 QUANTITY_MESSAGES_PER_PCAP = 100
def __init__(self, dump_dir=None, filename=None, dlt=DLT_RAW, amqp_url=None, amqp_exchange=None, topics=None): def dumper_init(self):
# delete tmp pcap file (the one with ~)
full_path_temp_pcap_file = os.path.join(self.dump_dir, self.pcap_filename_wr)
# delete previous pcap file
full_path_pcap_file = os.path.join(self.dump_dir, self.pcap_filename)
for f in [full_path_pcap_file, full_path_temp_pcap_file]:
if os.path.exists(f):
if os.path.isfile(f):
os.remove(f)
self.pcap_dumper = Dumper(
filename=os.path.join(self.dump_dir, self.pcap_filename_wr),
snaplen=2000,
network=self.dlt
)
# copy filename.pcap~ to filename.pcap
shutil.copyfile(
os.path.join(self.dump_dir, self.pcap_filename_wr),
os.path.join(self.dump_dir, self.pcap_filename)
)
def __init__(self,
dump_dir=DEFAULT_DUMP_DIR,
log_level=DEFAULT_LOG_LEVEL,
filename=DEFAULT_FILENAME,
dlt=DLT_RAW,
topics=DEFAULT_TOPICS,
amqp_url=None,
amqp_exchange=None,
):
assert dlt in ALLOWED_DATA_LINK_TYPES, 'not accepted dlt %s' % dlt assert dlt in ALLOWED_DATA_LINK_TYPES, 'not accepted dlt %s' % dlt
self.COMPONENT_ID = 'capture_dumper_%s' % str(uuid.uuid4())[:8] # uuid in case several dumpers listening to bus self.COMPONENT_ID = 'capture_dumper_%s' % str(uuid.uuid4())[:8] # uuid in case several dumpers listening to bus
...@@ -100,18 +133,10 @@ class AmqpDataPacketDumper: ...@@ -100,18 +133,10 @@ class AmqpDataPacketDumper:
self.url = os.environ.get('AMQP_URL') self.url = os.environ.get('AMQP_URL')
self.exchange = os.environ.get('AMQP_EXCHANGE') self.exchange = os.environ.get('AMQP_EXCHANGE')
if dump_dir: self.pcap_filename = filename
self.dump_dir = dump_dir self.pcap_filename_wr = self.pcap_filename + "~"
else:
self.dump_dir = self.DEFAULT_DUMP_DIR
if filename:
self.pcap_filename = filename
self.pcap_filename_wr = self.pcap_filename + "~"
else:
self.pcap_filename = self.DEFAULT_FILENAME
self.pcap_filename_wr = self.pcap_filename + "~"
self.dump_dir = dump_dir
if not os.path.exists(self.dump_dir): if not os.path.exists(self.dump_dir):
os.makedirs(self.dump_dir) os.makedirs(self.dump_dir)
...@@ -123,7 +148,7 @@ class AmqpDataPacketDumper: ...@@ -123,7 +148,7 @@ class AmqpDataPacketDumper:
self.connection = pika.BlockingConnection(pika.URLParameters(self.url)) # queues & default exchange declaration self.connection = pika.BlockingConnection(pika.URLParameters(self.url)) # queues & default exchange declaration
self.channel = self.connection.channel() self.channel = self.connection.channel()
self.data_queue_name = 'data@%s' % self.COMPONENT_ID self.data_queue_name = '%s::packet.sniffed.raw' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.data_queue_name, self.channel.queue_declare(queue=self.data_queue_name,
auto_delete=True, auto_delete=True,
arguments={'x-max-length': 1000} arguments={'x-max-length': 1000}
...@@ -138,14 +163,10 @@ class AmqpDataPacketDumper: ...@@ -138,14 +163,10 @@ class AmqpDataPacketDumper:
rabbitmq_handler.setFormatter(json_formatter) rabbitmq_handler.setFormatter(json_formatter)
self.logger.addHandler(rabbitmq_handler) self.logger.addHandler(rabbitmq_handler)
self.logger.setLevel(LOG_LEVEL) self.logger.setLevel(log_level)
# subscribe to data plane channels # subscribe to data plane channels
if topics is None: self.topics = topics
self.topics = self.DEFAULT_TOPICS
else:
self.topics = topics
for t in self.topics: for t in self.topics:
self.channel.queue_bind(exchange=self.exchange, self.channel.queue_bind(exchange=self.exchange,
queue=self.data_queue_name, queue=self.data_queue_name,
...@@ -174,29 +195,6 @@ class AmqpDataPacketDumper: ...@@ -174,29 +195,6 @@ class AmqpDataPacketDumper:
self.channel.basic_qos(prefetch_count=1) self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.data_queue_name) self.channel.basic_consume(self.on_request, queue=self.data_queue_name)
def dumper_init(self):
# delete tmp pcap file (the one with ~)
full_path_temp_pcap_file = os.path.join(self.dump_dir, self.pcap_filename_wr)
# delete previous pcap file
full_path_pcap_file = os.path.join(self.dump_dir, self.pcap_filename)
for f in [full_path_pcap_file, full_path_temp_pcap_file]:
if os.path.exists(f):
if os.path.isfile(f):
os.remove(f)
self.pcap_dumper = Dumper(
filename=os.path.join(self.dump_dir, self.pcap_filename_wr),
snaplen=2000,
network=self.dlt
)
# copy filename.pcap~ to filename.pcap
shutil.copyfile(
os.path.join(self.dump_dir, self.pcap_filename_wr),
os.path.join(self.dump_dir, self.pcap_filename)
)
def dump_packet(self, message): def dump_packet(self, message):
try: try:
t = time.time() t = time.time()
...@@ -336,4 +334,3 @@ class AmqpDataPacketDumper: ...@@ -336,4 +334,3 @@ class AmqpDataPacketDumper:
if __name__ == '__main__': if __name__ == '__main__':
dumper = AmqpDataPacketDumper() dumper = AmqpDataPacketDumper()
dumper.run() dumper.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment