Commit 939289b8 authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'setup_py' into 'master'

Setup py

See merge request !29
parents 5d752ccf 68839a61
This diff is collapsed.
Agent for the f-interop platform (ioppytest-agent) Agent for the f-interop platform (ioppytest-agent)
-------------------------------------------------- --------------------------------------------------
About
-----
Agent (~VPN client) is a component which connects the environment where
the IUT runs to testing tool using the AMQP bus.
This component is part of the ioppytest framework ecosystem.
This components needs to run in the user's host and must share some
kind on interface with the implementation under test (IUT), it will
enable the creation of a private network between all devices in the
session.
Installation
------------
create virtual env for not messing up your current environment
```
pip install virtualenv
virtualenv -p /usr/bin/python2.7 my_venv
source my_venv/bin/activate
```
install ioppytest-agent using pip
```
pip install ioppytest-agent
```
Design Design
------ ------
The design of the f-interop agent is modular by design.
An agent is made of different processes that connect to AMQP message An agent is made of different processes that connect to AMQP message
broker and exchange messages (in and out) with other components using broker and exchange messages (in and out) with other components using
the same AMQP broker. the same AMQP broker.
...@@ -19,13 +44,10 @@ charge of launching all the other components. ...@@ -19,13 +44,10 @@ charge of launching all the other components.
If new components needs to be added they just need to be launched If new components needs to be added they just need to be launched
from this component. from this component.
Core open the default ZMQ socket that is used by other components to
communicate with each others.
Error handling Error handling
-------------- --------------
When there is a Ctrl-C the agent should kill all other components and When there is a user interrupt signal (Ctrl-C) the agent should kill
disconnect as gracefully as possible. all other components and disconnect as gracefully as possible.
Serial mode (with 802.15.4 probe) Serial mode (with 802.15.4 probe)
......
from __future__ import absolute_import
from .agent_cli import Agent
from __future__ import absolute_import
from .agent_cli import main
if __name__ == "__main__":
main()
...@@ -23,18 +23,17 @@ Features of the agent ...@@ -23,18 +23,17 @@ Features of the agent
* The agent isn't the way the user interact with test coordinator/manager. It simply connects to backend to establish a * The agent isn't the way the user interact with test coordinator/manager. It simply connects to backend to establish a
sort of virtual network. sort of virtual network.
""" """
from __future__ import absolute_import
import logging import logging
import click import click
import uuid
import multiprocessing
from connectors.tun import TunConnector from .connectors import TunConnector
from connectors.core import CoreConnector from .connectors import CoreConnector
from connectors.http import HTTPConnector from .connectors import SerialConnector
from connectors.serialconn import SerialConnector
from utils import arrow_down, arrow_up, finterop_banner from .utils import ioppytest_banner
from utils.packet_dumper import launch_amqp_data_to_pcap_dumper from .utils import packet_dumper
try: try:
from urllib.parse import urlparse from urllib.parse import urlparse
...@@ -55,8 +54,13 @@ class Agent(object): ...@@ -55,8 +54,13 @@ class Agent(object):
""" """
header = """ header = """
Agent (~VPN client) for connecting your implementation under test (IUT) to the private network of the remote interop Agent (~VPN client) is a component which connects the environment where
session. the IUT runs to testing tool using the AMQP bus.
This component is part of the ioppytest framework ecosystem.
This components needs to run in the user's host and must share some
kind on interface with the implementation under test (IUT), it will
enable the creation of a private network between all devices in the
session.
Some examples on the different modes of running the agent (depending on the cabling and networking of your IUT): Some examples on the different modes of running the agent (depending on the cabling and networking of your IUT):
...@@ -65,7 +69,7 @@ Note: We assume that a session was a already created and user has url ...@@ -65,7 +69,7 @@ Note: We assume that a session was a already created and user has url
and it has been exported as environment variable and it has been exported as environment variable
e.g.: e.g.:
export AMQP_URL=amqp://alfredo:zitarrosa@exampleRmqHost[:port]/sessionXX export AMQP_URL=amqp://alfredo:zitarrosa@example.com[:port]/sessionXX
--------------------------------------------------------------------- ---------------------------------------------------------------------
1. user runs an IPv6 based implementation (e.g. coap_client) which runs in same PC where agent runs (default mode): 1. user runs an IPv6 based implementation (e.g. coap_client) which runs in same PC where agent runs (default mode):
...@@ -73,7 +77,7 @@ export AMQP_URL=amqp://alfredo:zitarrosa@exampleRmqHost[:port]/sessionXX ...@@ -73,7 +77,7 @@ export AMQP_URL=amqp://alfredo:zitarrosa@exampleRmqHost[:port]/sessionXX
\b \b
command: command:
sudo python -m agent connect \\ sudo -E python -m agent connect \\
--url $AMQP_URL \\ --url $AMQP_URL \\
--name coap_client --name coap_client
...@@ -88,7 +92,7 @@ bootstrap ( virtual interface creation, and forced IP assignation) ...@@ -88,7 +92,7 @@ bootstrap ( virtual interface creation, and forced IP assignation)
\b \b
command: command:
sudo python -m agent connect \\ sudo -E python -m agent connect \\
--url $AMQP_URL \\ --url $AMQP_URL \\
--name coap_client \\ --name coap_client \\
--force-bootstrap \\ --force-bootstrap \\
...@@ -106,7 +110,7 @@ This can be used for example when the implementation under test is a device in a ...@@ -106,7 +110,7 @@ This can be used for example when the implementation under test is a device in a
\b \b
command: command:
sudo python -m agent connect \\ sudo -E python -m agent connect \\
--url $AMQP_URL \\ --url $AMQP_URL \\
--name coap_client \\ --name coap_client \\
--force-bootstrap \\ --force-bootstrap \\
...@@ -139,7 +143,7 @@ For more information: README.md ...@@ -139,7 +143,7 @@ For more information: README.md
def __init__(self): def __init__(self):
print(finterop_banner) print(ioppytest_banner)
self.cli = click.Group( self.cli = click.Group(
add_help_option=Agent.header, add_help_option=Agent.header,
...@@ -160,9 +164,8 @@ For more information: README.md ...@@ -160,9 +164,8 @@ For more information: README.md
self.name_option = click.Option( self.name_option = click.Option(
param_decls=["--name"], param_decls=["--name"],
default=str(uuid.uuid1()), required=True,
required=False, help="Agent identity, normally associated with the IUT role (coap_client, comi_server, etc)")
help="Agent identity (default: random generated)")
self.dump_option = click.Option( self.dump_option = click.Option(
param_decls=["--dump"], param_decls=["--dump"],
...@@ -261,13 +264,16 @@ For more information: README.md ...@@ -261,13 +264,16 @@ For more information: README.md
# TODO fix pcap_dumper support for py2, python3 -m utils.packet_dumper works fine tho # TODO fix pcap_dumper support for py2, python3 -m utils.packet_dumper works fine tho
# if dump: # if dump:
# dump_p = multiprocessing.Process(target=launch_amqp_data_to_pcap_dumper, args=()) # dump_p = multiprocessing.Process(target=packet_dumper.launch_amqp_data_to_pcap_dumper, args=())
# dump_p.start() # dump_p.start()
def run(self): def run(self):
self.cli() self.cli()
if __name__ == "__main__": def main():
agent = Agent() agent = Agent()
agent.run() agent.run()
if __name__ == "__main__":
main()
from __future__ import absolute_import
from .tun import TunConnector
from .core import CoreConnector
from .serialconn import SerialConnector
...@@ -5,7 +5,7 @@ import json ...@@ -5,7 +5,7 @@ import json
import logging import logging
from multiprocessing import Process from multiprocessing import Process
from utils.messages import Message as EventBusMessage from ..utils.messages import Message as EventBusMessage
from amqp.exceptions import UnexpectedFrame from amqp.exceptions import UnexpectedFrame
from kombu import Connection, Queue, Exchange, Consumer from kombu import Connection, Queue, Exchange, Consumer
...@@ -64,7 +64,7 @@ class BaseConsumer(ConsumerMixin): ...@@ -64,7 +64,7 @@ class BaseConsumer(ConsumerMixin):
def subscribe_to_topics(self, topic_list): def subscribe_to_topics(self, topic_list):
for t in topic_list: for t in topic_list:
queue = Queue( queue = Queue(
name="consumer: {name}.{consumer_name}::RKey:{rkey}".format( name="consumer: {name}.{consumer_name}?rkey={rkey}".format(
name=self.name, name=self.name,
consumer_name=self.consumer_name, consumer_name=self.consumer_name,
rkey=t rkey=t
...@@ -88,7 +88,7 @@ class BaseConsumer(ConsumerMixin): ...@@ -88,7 +88,7 @@ class BaseConsumer(ConsumerMixin):
assert type(body) is dict # assert that kombu deserialized it already assert type(body) is dict # assert that kombu deserialized it already
json_body = json.dumps(body) json_body = json.dumps(body)
self.log.debug("DEFAULT on_message callback, got: {}".format(message.delivery_info.get('routing_key'))) self.log.debug("base on_message() callback, got: {}".format(message.delivery_info.get('routing_key')))
msg = EventBusMessage.load(json_body, message.delivery_info.get('routing_key')) msg = EventBusMessage.load(json_body, message.delivery_info.get('routing_key'))
try: try:
self._on_message(msg) self._on_message(msg)
...@@ -98,16 +98,13 @@ class BaseConsumer(ConsumerMixin): ...@@ -98,16 +98,13 @@ class BaseConsumer(ConsumerMixin):
) )
def _on_message(self, message): def _on_message(self, message):
"Class to be overridden by children calss" "Class to be overridden by children class"
return NotImplementedError() return NotImplementedError()
def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs): def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs):
# control plane info # control plane info
for q in self.queues: for q in self.queues:
self.log.info( self.log.info("Queue: {queue_name} bound to: {rkey} ".format(queue_name=q.name, rkey=q.routing_key))
"Queue: {queue_name} bound to: {rkey} ".format(queue_name=q.name,
rkey=q.routing_key)
)
class BaseController(Process): class BaseController(Process):
......
...@@ -3,10 +3,10 @@ Plugin to connect to the AMQP broker ...@@ -3,10 +3,10 @@ Plugin to connect to the AMQP broker
""" """
import json import json
import logging import logging
from utils.messages import *
from kombu import Producer from kombu import Producer
from .base import BaseController, BaseConsumer from .base import BaseController, BaseConsumer
from ..utils.messages import MsgTestingToolComponentReady
__version__ = (0, 1, 0) __version__ = (0, 1, 0)
...@@ -24,7 +24,6 @@ class CoreConsumer(BaseConsumer): ...@@ -24,7 +24,6 @@ class CoreConsumer(BaseConsumer):
subscriptions) subscriptions)
def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs): def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs):
log.info("Backend ready to consume data")
# let's send bootstrap message # let's send bootstrap message
msg = MsgTestingToolComponentReady( msg = MsgTestingToolComponentReady(
...@@ -39,9 +38,11 @@ class CoreConsumer(BaseConsumer): ...@@ -39,9 +38,11 @@ class CoreConsumer(BaseConsumer):
routing_key=msg.routing_key routing_key=msg.routing_key
) )
log.info("Agent READY, listening on the event bus for ctrl messages and data packets..")
def _on_message(self, message): def _on_message(self, message):
self.log.debug( self.log.warning(
"Consumer specialized handler <{consumer_name}> got: {message}".format( "<{consumer_name}> got {message}, no callback bound to it.".format(
consumer_name=self.consumer_name, consumer_name=self.consumer_name,
message=repr(message) message=repr(message)
) )
......
...@@ -6,10 +6,10 @@ import serial ...@@ -6,10 +6,10 @@ import serial
import logging import logging
import threading import threading
from connectors.base import BaseController, BaseConsumer from .base import BaseController, BaseConsumer
from utils.serial_listener import SerialListener from ..utils import messages
from utils import arrow_down, arrow_up, finterop_banner from ..utils import arrow_down
from utils.messages import * from ..utils.serial_listener import SerialListener
__version__ = (0, 1, 0) __version__ = (0, 1, 0)
...@@ -26,11 +26,11 @@ class SerialConsumer(BaseConsumer): ...@@ -26,11 +26,11 @@ class SerialConsumer(BaseConsumer):
def __init__(self, user, password, session, server, exchange, name, consumer_name): def __init__(self, user, password, session, server, exchange, name, consumer_name):
self.dispatcher = { self.dispatcher = {
MsgPacketInjectRaw: self.handle_data, messages.MsgPacketInjectRaw: self.handle_data,
} }
subscriptions = [ subscriptions = [
MsgPacketInjectRaw.routing_key.replace('*', name).replace('ip.tun', '802154.serial'), messages.MsgPacketInjectRaw.routing_key.replace('*', name).replace('ip.tun', '802154.serial'),
] ]
super(SerialConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name, super(SerialConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name,
......
...@@ -7,15 +7,12 @@ import json ...@@ -7,15 +7,12 @@ import json
import logging import logging
import sys import sys
import datetime import datetime
from utils.opentun import OpenTunLinux, OpenTunMACOS
from utils import arrow_down, arrow_up, finterop_banner
from utils.messages import *
from kombu import Producer from kombu import Producer
from connectors.base import BaseController, BaseConsumer
__version__ = (0, 1, 0) from .base import BaseController, BaseConsumer
from ..utils.opentun import OpenTunLinux, OpenTunMACOS
from ..utils import arrow_up, arrow_down
from ..utils import messages
class TunConsumer(BaseConsumer): class TunConsumer(BaseConsumer):
...@@ -29,22 +26,22 @@ class TunConsumer(BaseConsumer): ...@@ -29,22 +26,22 @@ class TunConsumer(BaseConsumer):
def __init__(self, user, password, session, server, exchange, name, consumer_name, force_bootstrap, ipv6_host, def __init__(self, user, password, session, server, exchange, name, consumer_name, force_bootstrap, ipv6_host,
ipv6_prefix): ipv6_prefix):
self.dispatcher = { self.dispatcher = {
MsgAgentTunStart: self.handle_start, messages.MsgAgentTunStart: self.handle_start,
MsgPacketInjectRaw: self.handle_raw_packet_to_inject, messages.MsgPacketInjectRaw: self.handle_raw_packet_to_inject,
} }
self.tun = None self.tun = None
self.packet_count = 0 self.packet_count = 0
subscriptions = [ subscriptions = [
MsgAgentTunStart.routing_key.replace('*', name), # default rkey is "toAgent.*.ip.tun.start" messages.MsgAgentTunStart.routing_key.replace('*', name), # default rkey is "toAgent.*.ip.tun.start"
MsgPacketInjectRaw.routing_key.replace('*', name) messages.MsgPacketInjectRaw.routing_key.replace('*', name)
] ]
super(TunConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name, subscriptions) super(TunConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name, subscriptions)
if force_bootstrap: if force_bootstrap:
self.handle_start( self.handle_start(
MsgAgentTunStart( messages.MsgAgentTunStart(
name=name, name=name,
ipv6_host=ipv6_host, ipv6_host=ipv6_host,
ipv6_prefix=ipv6_prefix, ipv6_prefix=ipv6_prefix,
...@@ -75,7 +72,7 @@ class TunConsumer(BaseConsumer): ...@@ -75,7 +72,7 @@ class TunConsumer(BaseConsumer):
conf_params = self.tun.get_tun_configuration() conf_params = self.tun.get_tun_configuration()
conf_params.update({'name': self.name}) conf_params.update({'name': self.name})
# publish message in event bus # publish message in event bus
msg = MsgAgentTunStarted(**conf_params) msg = messages.MsgAgentTunStarted(**conf_params)
logging.info('Publishing %s' % repr(msg)) logging.info('Publishing %s' % repr(msg))
producer = Producer(self.connection, serializer='json') producer = Producer(self.connection, serializer='json')
...@@ -146,17 +143,17 @@ class TunConsumer(BaseConsumer): ...@@ -146,17 +143,17 @@ class TunConsumer(BaseConsumer):
return return
self.packet_count += 1 self.packet_count += 1
print(arrow_down)
self.log.debug('\n* * * * * * HANDLE INCOMING PACKET (%s) * * * * * * *' % self.packet_count)
self.log.debug("TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
self.log.debug(" - - - ")
self.log.debug(("Interface", message.interface_name))
self.log.debug(("Data", message.data))
self.log.debug('\n* * * * * * * * * * * * * * * * * * * * * * *')
self.log.info("Message received from testing tool. Injecting in Tun. Message count (downlink): %s" self.log.info("Message received from testing tool. Injecting in Tun. Message count (downlink): %s"
% self.packet_count) % self.packet_count)
print(arrow_down)
self.log.info('\n # # # # # # # # # # # # OPEN TUN # # # # # # # # # # # # ' +
'\n data packet EventBus -> TUN interface' +
'\n' + message.to_json() +
'\n # # # # # # # # # # # # # # # # # # # # # # # # # # # # #'
)
self.tun._eventBusToTun( self.tun._eventBusToTun(
sender="Testing Tool", sender="Testing Tool",
signal="tun inject", signal="tun inject",
......
from . import messages
from . import packet_dumper
from . import serial_listener
import logging import logging
import os import os
...@@ -64,4 +68,15 @@ finterop_banner = \ ...@@ -64,4 +68,15 @@ finterop_banner = \
|_| |_____|_| |_|\\__\\___|_| \\___/| .__/ |_| |_____|_| |_|\\__\\___|_| \\___/| .__/
| | | |
|_| |_|
""" """
\ No newline at end of file
ioppytest_banner=\
"""
_ _ _ _
(_) ___ _ __ _ __ _ _ | |_ ___ ___ | |_ __ _ __ _ ___ _ __ | |_
| | / _ \\ | '_ \\ | '_ \\ | | | || __|/ _ \\/ __|| __|_____ / _` | / _` | / _ \\| '_ \\ | __|
| || (_) || |_) || |_) || |_| || |_| __/\\__ \\| |_|_____|| (_| || (_| || __/| | | || |_
|_| \\___/ | .__/ | .__/ \\__, | \\__|\\___||___/ \\__| \\__,_| \\__, | \\___||_| |_| \\__|
|_| |_| |___/ |___/
"""
\ No newline at end of file
...@@ -12,8 +12,8 @@ import sys ...@@ -12,8 +12,8 @@ import sys
from kombu import Exchange from kombu import Exchange
from utils import arrow_down, arrow_up, finterop_banner from . import arrow_down, arrow_up
from utils.messages import * from . import messages
DEFAULT_IPV6_PREFIX = 'bbbb' DEFAULT_IPV6_PREFIX = 'bbbb'
...@@ -503,7 +503,7 @@ class OpenTunLinux(object): ...@@ -503,7 +503,7 @@ class OpenTunLinux(object):
This function forwards the data to the the EventBus. This function forwards the data to the the EventBus.
""" """
routing_key = MsgPacketSniffedRaw.routing_key.replace('*', self.name) routing_key = messages.MsgPacketSniffedRaw.routing_key.replace('*', self.name)
log.debug("Pushing message to topic: %s" % routing_key) log.debug("Pushing message to topic: %s" % routing_key)
self.packet_count += 1 self.packet_count += 1
...@@ -511,14 +511,14 @@ class OpenTunLinux(object): ...@@ -511,14 +511,14 @@ class OpenTunLinux(object):
% self.packet_count) % self.packet_count)
# dispatch to EventBus # dispatch to EventBus
m = MsgPacketSniffedRaw( m = messages.MsgPacketSniffedRaw(
interface_name=self.ifname, interface_name=self.ifname,
timestamp=time.time(), timestamp=time.time(),
data=data data=data
) )
print(arrow_up) print(arrow_up)
log.info('\n # # # # # # # # # # # # OPEN TUN # # # # # # # # # # # # ' + log.info('\n # # # # # # # # # # # # OPEN TUN # # # # # # # # # # # # ' +
'\n data packet TUN -> EventBus' + '\n data packet TUN interface -> EventBus' +
'\n' + m.to_json() + '\n' + m.to_json() +
'\n # # # # # # # # # # # # # # # # # # # # # # # # # # # # #' '\n # # # # # # # # # # # # # # # # # # # # # # # # # # # # #'
) )
...@@ -548,7 +548,7 @@ class OpenTunLinux(object): ...@@ -548,7 +548,7 @@ class OpenTunLinux(object):
# write over tuntap interface # write over tuntap interface
os.write(self.tunIf, data) os.write(self.tunIf, data)
if log.isEnabledFor(logging.DEBUG): if log.isEnabledFor(logging.DEBUG):
log.debug("data dispatched to tun correctly {0}, {1}".format(signal, sender)) log.debug("data dispatched to tun correctly, event: {0}, sender: {1}".format(signal, sender))
except Exception as err: except Exception as err:
errMsg = formatCriticalMessage(err) errMsg = formatCriticalMessage(err)
log.critical(errMsg) log.critical(errMsg)
...@@ -718,7 +718,7 @@ class OpenTunMACOS(object): ...@@ -718,7 +718,7 @@ class OpenTunMACOS(object):
This function forwards the data to the the EventBus. This function forwards the data to the the EventBus.
""" """
routing_key = MsgPacketSniffedRaw.routing_key.replace('*', self.name) routing_key = messages.MsgPacketSniffedRaw.routing_key.replace('*', self.name)
log.debug("Pushing message to topic: %s" % routing_key) log.debug("Pushing message to topic: %s" % routing_key)
self.packet_count += 1 self.packet_count += 1
...@@ -726,14 +726,14 @@ class OpenTunMACOS(object): ...@@ -726,14 +726,14 @@ class OpenTunMACOS(object):
% self.packet_count) % self.packet_count)
# dispatch to EventBus # dispatch to EventBus
m = MsgPacketSniffedRaw( m = messages.MsgPacketSniffedRaw(
interface_name=self.ifname, interface_name=self.ifname,
timestamp=time.time(), timestamp=time.time(),
data=data data=data
) )
print(arrow_up) print(arrow_up)
log.info('\n # # # # # # # # # # # # OPEN TUN # # # # # # # # # # # # ' + log.info('\n # # # # # # # # # # # # OPEN TUN # # # # # # # # # # # # ' +
'\n data packet TUN -> EventBus' + '\n data packet TUN interface -> EventBus' +
'\n' + m.to_json() + '\n' + m.to_json() +
'\n # # # # # # # # # # # # # # # # # # # # # # # # # # # # #' '\n # # # # # # # # # # # # # # # # # # # # # # # # # # # # #'
) )
...@@ -778,7 +778,7 @@ class OpenTunMACOS(object): ...@@ -778,7 +778,7 @@ class OpenTunMACOS(object):
# write over tuntap interface # write over tuntap interface
os.write(self.tunIf, data) os.write(self.tunIf, data)
if log.isEnabledFor(logging.DEBUG): if log.isEnabledFor(logging.DEBUG):
log.debug("data dispatched to tun correctly {0}, {1}".format(signal, sender)) log.debug("data dispatched to tun correctly, event: {0}, sender: {1}".format(signal, sender))
except Exception as err: except Exception as err:
errMsg = formatCriticalMessage(err) errMsg = formatCriticalMessage(err)
log.critical(errMsg) log.critical(errMsg)
......
...@@ -4,10 +4,11 @@ import json ...@@ -4,10 +4,11 @@ import json
import serial import serial
import logging import logging
import sys import sys
from utils import messages
from kombu import Exchange from kombu import Exchange
from collections import OrderedDict from collections import OrderedDict
from utils import arrow_down, arrow_up, finterop_banner
from ..utils import messages
STATE_OK = 0 STATE_OK = 0
STATE_ESC = 1 STATE_ESC = 1
......
import json
__version__ = (0, 1, 0)
import logging
from requests import Request, Session
from .base import BaseController, BaseConsumer
log = logging.getLogger(__name__)
class HTTPConsumer(BaseConsumer):
"""
AMQP helper
"""
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(HTTPConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def handle_control(self, body, message):
"""
Args:
body:
message:
Returns:
"""
required_keys = {"url", "verb", "data"}
msg = None
try:
msg = json.loads(body)
self.log.debug(message)
except ValueError as e:
message.ack()
self.log.error(e)
self.log.error("Incorrect message: {0}".format(body))
if msg is not None:
present_keys = set(msg.keys())
if not present_keys.issuperset(required_keys):
self.log.error("Error you need those keys: %s" % required_keys)
else:
s = Session()
req = Request(msg["verb"], msg["url"], data=msg["data"])