Commit 4161cc12 authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'messages_API_1.0' into 'master'

Messages api 1.0

See merge request !26
parents c5726116 1fb1c9a8
......@@ -34,8 +34,6 @@ import multiprocessing
from connectors.tun import TunConnector
from connectors.core import CoreConnector
from connectors.http import HTTPConnector
from connectors.ping import PingConnector
from connectors.zeromq import ZMQConnector
from connectors.serialconn import SerialConnector
from utils import arrow_down, arrow_up, finterop_banner
......@@ -46,7 +44,7 @@ try:
except ImportError:
from urlparse import urlparse
__version__ = (0, 0, 1)
__version__ = (0, 1, 0)
DEFAULT_PLATFORM = 'f-interop.paris.inria.fr'
LOGGER = logging.getLogger(__name__)
......@@ -163,8 +161,6 @@ For more information, visit: http://doc.f-interop.eu
self.plugins["serial"] = SerialConnector(**data)
else:
self.plugins["tun"] = TunConnector(**data)
# self.plugins["zmq"] = ZMQConnector(**data)
# self.plugins["ping"] = PingConnector(**data)
# self.plugins["http"] = HTTPConnector(**data)
for p in self.plugins.values():
......
......@@ -5,18 +5,15 @@ import json
import logging
from multiprocessing import Process
from kombu import Connection
from kombu import Exchange
from kombu import Queue
from utils.messages import Message as EventBusMessage, MsgTestingToolComponentReady as EventBusMessageComponentReady
from kombu import Connection, Queue, Exchange, Consumer
from kombu.mixins import ConsumerMixin
DEFAULT_EXCHANGE_NAME = "amq.topic"
class BaseConsumer(ConsumerMixin):
DEFAULT_EXCHANGE_NAME = "amq.topic"
def __init__(self, user, password, session, server, exchange, name, consumer_name):
def __init__(self, user, password, session, server, exchange, name, consumer_name, topics):
"""
Args:
......@@ -27,6 +24,7 @@ class BaseConsumer(ConsumerMixin):
exchange: RMQ exchange for sending messages
name: Identity of the agent. Used by testing tools to identify/differentiate each agent on the session
consumer_name: Name to easily identify a process consuming.
topics: Topics subscriptions for the consumer
"""
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.DEBUG)
......@@ -53,109 +51,54 @@ class BaseConsumer(ConsumerMixin):
type="topic",
durable=True)
self.control_queue = Queue("control.{consumer_name}@{name}".format(name=name,
consumer_name=consumer_name),
exchange=self.exchange,
routing_key='control.{consumer_name}.toAgent.{name}'.format(
consumer_name=consumer_name,
name=name),
durable=False,
auto_delete=True)
self.data_queue = Queue("data.{consumer_name}@{name}".format(name=name,
consumer_name=consumer_name),
exchange=self.exchange,
routing_key='data.{consumer_name}.toAgent.{name}'.format(consumer_name=consumer_name,
name=name),
durable=False,
auto_delete=True)
# queues created for topic subscriptions
self.queues = []
# handle subscriptions
self.subscribe_to_topics(topics)
def subscribe_to_topics(self, topic_list):
for t in topic_list:
queue = Queue(
name="{name}.{consumer_name}::{rkey}".format(
name=self.name,
consumer_name=self.consumer_name,
rkey=t
),
exchange=self.exchange,
routing_key=t,
durable=False,
auto_delete=True
)
self.queues.append(
queue
)
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=[self.control_queue],
callbacks=[self.handle_control],
no_ack=True,
accept=['json']),
Consumer(queues=[self.data_queue],
callbacks=[self.handle_data],
no_ack=True,
accept=["json"])
Consumer(self.queues, callbacks=[self.on_message], accept=['json']),
]
def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs):
# control plane info
self.log.info(
"{consumer_name} listening to control plane ".format(consumer_name=self.consumer_name))
self.log.info(
"Queue: control.{consumer_name}@{name} ".format(consumer_name=self.consumer_name, name=self.name))
self.log.info(
"Topic: control.{consumer_name}.toAgent.{name}".format(consumer_name=self.consumer_name, name=self.name))
# data plane info
self.log.info(
"{consumer_name} listening to data plane".format(consumer_name=self.consumer_name))
self.log.info(
"Queue: data.{consumer_name}@{name}".format(consumer_name=self.consumer_name, name=self.name))
self.log.info(
"Topic: data.{consumer_name}.toAgent.{name}".format(consumer_name=self.consumer_name, name=self.name))
def handle_control(self, body, message):
self.log.debug("DEFAULT HANDLE CONTROL")
self.log.debug(("Payload", message.payload))
self.log.debug(("Properties", message.properties))
self.log.debug(("Headers", message.headers))
self.log.debug(("body", message.body))
msg = None
try:
msg = json.loads(body)
self.log.debug(message)
except ValueError as e:
message.ack()
self.log.error(e)
self.log.error("Incorrect message: {0}".format(body))
if msg is not None:
self.log.debug("Just received that packet")
self.log.debug(msg)
def handle_data(self, body, message):
"""
Args:
msg:
Returns:
def on_message(self, body, message):
assert type(body) is dict # assert that kombu deserialized it already
json_body = json.dumps(body)
"""
self.log.debug("DEFAULT HANDLE DATA")
self.log.debug(("Payload", message.payload))
self.log.debug(("Properties", message.properties))
self.log.debug(("Headers", message.headers))
self.log.debug(("body", message.body))
# msg = None
# try:
# msg = json.loads(body)
# self.log.debug(message)
# except ValueError as e:
# message.ack()
# self.log.error(e)
# self.log.error("Incorrect message: {0}".format(body))
#
# if msg is not None:
# self.log.debug("Just received that packet")
# self.log.debug(msg)
def test_connection(self):
"""
Test if a component can talk on the event bus.
self.log.debug("DEFAULT on_message callback, got: {}".format(message.delivery_info.get('routing_key')))
msg = EventBusMessage.load(json_body, message.delivery_info.get('routing_key'))
self._on_message(msg)
Returns:
def _on_message(self, message):
"Class to be overridden by children calss"
return NotImplementedError()
"""
# for key in [control key, data key]:
# log.info("Testing on local routing key: %s" % key)
# self.basic_publish(key, "PING!!")
pass
def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs):
# control plane info
for q in self.queues:
self.log.info(
"Listening on {queue_name} bound to {rkey} ".format(queue_name=q.name,
rkey=q.routing_key)
)
class BaseController(Process):
......
......@@ -3,13 +3,12 @@ Plugin to connect to the F-interop backend
"""
import json
import logging
from utils.messages import *
from kombu import Producer
from .base import BaseController, BaseConsumer
__version__ = (0, 0, 1)
__version__ = (0, 1, 0)
log = logging.getLogger(__name__)
......@@ -20,59 +19,33 @@ class CoreConsumer(BaseConsumer):
"""
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(CoreConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=[self.control_queue],
callbacks=[self.handle_control],
no_ack=True,
accept=['json']),
Consumer(queues=[self.data_queue],
callbacks=[self.handle_data],
no_ack=True,
accept=["json"])
]
subscriptions = [MsgTestingToolComponentReady.routing_key]
super(CoreConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name,
subscriptions)
def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs):
log.info("Backend ready to consume data")
# log.info("-------------------------------------------------")
# log.info("Go to this URL: http://{platform}/session/{session}".format(platform=self.server_url,
# session=self.session))
# log.info("-------------------------------------------------")
# let's send bootstrap message
msg = {
'_type': 'testingtool.component.ready',
'component': self.name,
"description": "Component READY to start test suite."
}
producer = Producer(connection,serializer='json')
producer.publish(msg,
exchange=self.exchange,
routing_key='control.session'
)
def handle_control(self, body, message):
log.debug("DEFAULT HANDLE CONTROL")
log.debug(("Payload", message.payload))
log.debug(("Properties", message.properties))
log.debug(("Headers", message.headers))
log.debug(("body", message.body))
msg = None
try:
msg = json.loads(body)
log.debug(message)
except ValueError as e:
message.ack()
log.error(e)
log.error("Incorrect message: {0}".format(body))
if msg is not None:
log.debug("Just received that packet")
log.debug(msg)
msg = MsgTestingToolComponentReady(
component=self.name,
description="Component READY to start test suite."
)
producer = Producer(connection, serializer='json')
producer.publish(
body=msg.to_dict(),
exchange=self.exchange,
routing_key=msg.routing_key
)
def _on_message(self, message):
self.log.debug(
"Consumer specialized handler <{consumer_name}> got: {message}".format(
consumer_name=self.consumer_name,
message=repr(message)
)
)
class CoreConnector(BaseController):
......
import json
__version__ = (0, 0, 1)
__version__ = (0, 1, 0)
import logging
from requests import Request, Session
......
"""
Create ICMP packets to test if all nodes are up
"""
import json
__version__ = (0, 0, 1)
import logging
from .base import BaseController, BaseConsumer
import subprocess
class PingConsumer(BaseConsumer):
"""
AMQP helper
"""
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(PingConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def handle_control(self, body, message):
"""
Args:
body:
message:
Returns:
"""
required_keys = {"host"}
msg = None
try:
msg = json.loads(body)
self.log.debug(message)
except ValueError as e:
message.ack()
self.log.error(e)
self.log.error("Incorrect message: {0}".format(body))
if msg is not None:
present_keys = set(msg.keys())
if not present_keys.issuperset(required_keys):
self.log("Error you need those keys: %s" % required_keys)
else:
# Default version is ping
version = {"IPv6": "ping6", "IPv4": "ping"}
executable = version.get(msg["version"], "ping")
# default packet count is 1
count_option = msg.get("count", 1)
# network interface, default is None
interface_option = msg.get("interface", "")
# run the ping
command = [executable, msg["host"], "-c", str(count_option)]
self.log.info("command launched: %s" % command)
p = subprocess.call(command)
self.log.info("result: %s" % p)
class PingConnector(BaseController):
NAME = "ping"
def __init__(self, **kwargs):
super(PingConnector, self).__init__(name=PingConnector.NAME)
kwargs["consumer_name"] = PingConnector.NAME
self.consumer = PingConsumer(**kwargs)
self.consumer.log = logging.getLogger(__name__)
self.consumer.log.setLevel(logging.DEBUG)
def run(self):
self.consumer.run()
......@@ -9,19 +9,32 @@ import threading
from connectors.base import BaseController, BaseConsumer
from utils.serial_listener import SerialListener
from utils import arrow_down, arrow_up, finterop_banner
from utils.messages import *
__version__ = (0, 0, 1)
__version__ = (0, 1, 0)
log = logging.getLogger(__name__)
class SerialConsumer(BaseConsumer):
"""
AMQP helper
Serial interface consumer:
- Set up process which communicates with serial interface of mote through USB port
- transmits in and out 802.15.4 packet messages
"""
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(SerialConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
self.dispatcher = {
MsgPacketInjectRaw: self.handle_data,
}
subscriptions = [
MsgPacketInjectRaw.routing_key.replace('*', name).replace('ip.tun', '802154.serial'),
]
super(SerialConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name,
subscriptions)
self.message_count = 0
self.output = ''
self.serial_listener = None
......@@ -49,24 +62,15 @@ class SerialConsumer(BaseConsumer):
serial_listener_th.daemon = True
serial_listener_th.start()
except KeyError as e:
logging.warning(
'Cannot retrieve environment variables for serial connection: '
'FINTEROP_CONNECTOR_SERIAL_PORT/FINTEROP_CONNECTOR_BAUDRATE '
'If no sniffer/injector needed for test ignore this warning ')
def handle_data(self, body, message):
def handle_data(self, message):
"""
Function that will handle serial management messages
exchange:
- default
example:
{
}
Forwards data packets from AMQP BUS to serial interface
"""
if self.serial_port is None:
......@@ -80,7 +84,7 @@ class SerialConsumer(BaseConsumer):
try:
self.output = 'c0'
for c in body['data']:
for c in message.data:
if format(c, '02x') == 'c0':
# endslip
self.output += 'db'
......@@ -100,28 +104,9 @@ class SerialConsumer(BaseConsumer):
print(arrow_down)
log.info('\n # # # # # # # # # # # # SERIAL INTERFACE # # # # # # # # # # # # ' +
'\n data packet EventBus -> Serial' +
'\n' + json.dumps(body) +
'\n' + message.to_json() +
'\n # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # '
)
message.ack()
def handle_control(self, body, message):
msg = None
try:
msg = json.loads(body)
log.debug(message)
except ValueError as e:
message.ack()
log.error(e)
log.error("Incorrect message: {0}".format(body))
return
if msg["_type"] in self.dispatcher.keys():
self.dispatcher[msg["_type"]](msg)
else:
log.warning("Not supported action")
class SerialConnector(BaseController):
......
......@@ -7,48 +7,54 @@ import json
import logging
import sys
import datetime
from utils.opentun import OpenTunLinux, OpenTunMACOS
from utils import arrow_down, arrow_up, finterop_banner
from utils.messages import *
from kombu import Producer
from connectors.base import BaseController, BaseConsumer
from utils import arrow_down, arrow_up, finterop_banner
from utils.opentun import OpenTunLinux, OpenTunMACOS
__version__ = (0, 0, 1)
__version__ = (0, 1, 0)
class TunConsumer(BaseConsumer):
"""
AMQP helper
Tun interface consumer:
- creates tunnel interface (RAW_IP)
- inyects IPv6 packets comming from event bus into tun interaface
- sniffs and forwards packets from tun to event bus
"""
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(TunConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
self.dispatcher = {
"tun.start": self.handle_start,
MsgAgentTunStart: self.handle_start,
MsgPacketInjectRaw: self.handle_raw_packet_to_inject,
}
self.tun = None
self.packet_count = 0
def handle_start(self, msg):
"""
Function that will handle tun management messages
exchange:
- message
subscriptions = [
MsgAgentTunStart.routing_key.replace('*', name),
MsgPacketInjectRaw.routing_key.replace('*', name)
]
queue:
- tun
super(TunConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name, subscriptions)
order:
- start_tun
example:
{
"_type": "tun.start",
"ipv6_host": ":2",
"ipv6_prefix": "cccc"
}
def _on_message(self, message):
msg_type = type(message)
assert msg_type in self.dispatcher.keys(), 'Event message couldnt be dispatched %s' % repr(message)
self.log.debug(
"Consumer specialized handler <{consumer_name}> got: {message}".format(
consumer_name=self.consumer_name,
message=repr(message)
)
)
self.dispatcher[msg_type](message)
- stop_tun
def handle_start(self, msg):
"""
Function that will handle tun start event emitted coming from backend
"""
if self.tun is not None:
self.log.warning('Received open tun control message, but TUN already created')
......@@ -56,16 +62,16 @@ class TunConsumer(BaseConsumer):
else:
self.log.info('starting tun interface')
try:
ipv6_host = msg.get("ipv6_host", None)
ipv6_prefix = msg.get("ipv6_prefix", None)
ipv6_no_forwarding = msg.get("ipv6_no_forwarding", None)
ipv4_host = msg.get("ipv4_host", None)
ipv4_network = msg.get("ipv4_network", None)
ipv4_netmask = msg.get("ipv4_netmask", None)
ipv6_host = msg.ipv6_host
ipv6_prefix = msg.ipv6_prefix
ipv6_no_forwarding = msg.ipv6_no_forwarding
ipv4_host = msg.ipv4_host
ipv4_network = msg.ipv4_network
ipv4_netmask = msg.ipv4_netmask
except AttributeError as ae:
self.log.error(
'Wrong message format: {0}'.format(msg.payload)
'Wrong message format: {0}'.format(repr(msg))
)
return
......@@ -96,80 +102,49 @@ class TunConsumer(BaseConsumer):
self.log.error('Agent TunTap not yet supported for: {0}'.format(sys.platform))
sys.exit(1)
msg = {
"_type": "tun.started",
'name': self.name,
'ipv6_host': ipv6_host,
'ipv6_prefix': ipv6_prefix,
'ipv4_host': ipv4_host,
'ipv4_network': ipv4_network,
'ipv4_netmask': ipv4_netmask,
'ipv6_no_forwarding': ipv6_no_forwarding,
}
self.log.info("Tun started. Publishing msg: %s" % json.dumps(msg))
msg = MsgAgentTunStarted(
name=self.name,
ipv6_host=ipv6_host,
ipv6_prefix=ipv6_prefix,
ipv4_host=ipv4_host,
ipv4_network=ipv4_network,
ipv4_netmask=ipv4_netmask,
ipv6_no_forwarding=ipv6_no_forwarding,
)
self.log.info("Tun started. Publishing msg: %s" % repr(msg))
producer = Producer(self.connection, serializer='json')
producer.publish(msg,
exchange=self.exchange,
routing_key='control.tun.fromAgent.%s' % self.name
)
producer.publish(
body=msg.to_dict(),
exchange=self.exchange,
routing_key='fromAgent.{0}.ip.tun.started'.format(self.name)
)
def handle_data(self, body, message):
def handle_raw_packet_to_inject(self, message):
"""
Args:
msg:
Returns:
Handles data messages to be injected in network interface
"""
if self.tun is None:
self.log.error("Cannot handle data packet, no tun interface yet configured")
return
self.packet_count += 1
print(arrow_down)
self.log.debug('\n* * * * * * HANDLE DATA (%s) * * * * * * *' % self.packet_count)
self.log.debug('\n* * * * * * HANDLE INCOMING PACKET (%s) * * * * * * *' % self.packet_count)
self.log.debug("TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
self.log.debug(" - - - ")
self.log.debug(("Payload", message.payload))
self.log.debug(("Properties", message.properties))
self.log.debug(("Headers", message.headers))
self.log.debug(("Body", body))
self.log.debug(("Message type (_type field)", body["_type"]))
self.log.debug(("Interface", message.interface_name))
self.log.debug(("Data", message.data))
self.log.debug('\n* * * * * * * * * * * * * * * * * * * * * * *')
# body is already a dict, no need to json.load it
msg = body
if msg["_type"] == 'packet.to_inject.raw':
self.log.info("Message received from F-Interop. Injecting in Tun. Message count (downlink): %s"
% self.packet_count)
self.tun._eventBusToTun(
sender="F-Interop server",
signal="tun inject",
data=msg["data"]
)
def handle_control(self, body, message):
msg = None
try:
msg = body
self.log.debug(message)
if msg["_type"]:
self.log.debug('HANDLE CONTROL from tun processing event type: {0}'.format(msg["_type"]))
except (ValueError, KeyError) as e:
message.ack()
self.log.error(e)
self.log.error("Incorrect message: {0}".format(message.payload))
return
if msg["_type"] in self.dispatcher.keys():
self.dispatcher[msg["_type"]](msg)
else:
self.log.debug("Not supported action")
self.log.info("Message received from F-Interop. Injecting in Tun. Message count (downlink): %s"
% self.packet_count)
self.tun._eventBusToTun(
sender="F-Interop server",
signal="tun inject",
data=message.data
)
class TunConnector(BaseController):
......@@ -180,11 +155,6 @@ class TunConnector(BaseController):
NAME = "tun"
def __init__(self, **kwargs):
"""
Args:
key: ZMQ message
"""
super(TunConnector, self).__init__(name=TunConnector.NAME)
self.tun = None
kwargs["consumer_name"] = TunConnector.NAME
......
"""
Send ZMQ messages
"""
import json
import pdb
from multiprocessing import Process
from kombu import Connection
from kombu import Producer
__version__ = (0, 0, 1)
import logging
import zmq
from .base import BaseController,