Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
f-interop-contributors
agent
Commits
db5809e5
Commit
db5809e5
authored
Jan 15, 2018
by
Federico Sismondi
Browse files
midifications and fixes for using the agent with the new APIv.1.0
parent
c5726116
Changes
17
Expand all
Hide whitespace changes
Inline
Side-by-side
agent.py
View file @
db5809e5
...
...
@@ -34,8 +34,6 @@ import multiprocessing
from
connectors.tun
import
TunConnector
from
connectors.core
import
CoreConnector
from
connectors.http
import
HTTPConnector
from
connectors.ping
import
PingConnector
from
connectors.zeromq
import
ZMQConnector
from
connectors.serialconn
import
SerialConnector
from
utils
import
arrow_down
,
arrow_up
,
finterop_banner
...
...
@@ -46,7 +44,7 @@ try:
except
ImportError
:
from
urlparse
import
urlparse
__version__
=
(
0
,
0
,
1
)
__version__
=
(
0
,
1
,
0
)
DEFAULT_PLATFORM
=
'f-interop.paris.inria.fr'
LOGGER
=
logging
.
getLogger
(
__name__
)
...
...
@@ -163,8 +161,6 @@ For more information, visit: http://doc.f-interop.eu
self
.
plugins
[
"serial"
]
=
SerialConnector
(
**
data
)
else
:
self
.
plugins
[
"tun"
]
=
TunConnector
(
**
data
)
# self.plugins["zmq"] = ZMQConnector(**data)
# self.plugins["ping"] = PingConnector(**data)
# self.plugins["http"] = HTTPConnector(**data)
for
p
in
self
.
plugins
.
values
():
...
...
connectors/base.py
View file @
db5809e5
...
...
@@ -5,18 +5,15 @@ import json
import
logging
from
multiprocessing
import
Process
from
kombu
import
Connection
from
kombu
import
Exchange
from
kombu
import
Queue
from
utils.messages
import
Message
as
EventBusMessage
,
MsgTestingToolComponentReady
as
EventBusMessageComponentReady
from
kombu
import
Connection
,
Queue
,
Exchange
,
Consumer
from
kombu.mixins
import
ConsumerMixin
DEFAULT_EXCHANGE_NAME
=
"amq.topic"
class
BaseConsumer
(
ConsumerMixin
):
DEFAULT_EXCHANGE_NAME
=
"amq.topic"
def
__init__
(
self
,
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
):
def
__init__
(
self
,
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
,
topics
):
"""
Args:
...
...
@@ -27,6 +24,7 @@ class BaseConsumer(ConsumerMixin):
exchange: RMQ exchange for sending messages
name: Identity of the agent. Used by testing tools to identify/differentiate each agent on the session
consumer_name: Name to easily identify a process consuming.
topics: Topics subscriptions for the consumer
"""
self
.
log
=
logging
.
getLogger
(
__name__
)
self
.
log
.
setLevel
(
logging
.
DEBUG
)
...
...
@@ -53,109 +51,54 @@ class BaseConsumer(ConsumerMixin):
type
=
"topic"
,
durable
=
True
)
self
.
control_queue
=
Queue
(
"control.{consumer_name}@{name}"
.
format
(
name
=
name
,
consumer_name
=
consumer_name
),
exchange
=
self
.
exchange
,
routing_key
=
'control.{consumer_name}.toAgent.{name}'
.
format
(
consumer_name
=
consumer_name
,
name
=
name
),
durable
=
False
,
auto_delete
=
True
)
self
.
data_queue
=
Queue
(
"data.{consumer_name}@{name}"
.
format
(
name
=
name
,
consumer_name
=
consumer_name
),
exchange
=
self
.
exchange
,
routing_key
=
'data.{consumer_name}.toAgent.{name}'
.
format
(
consumer_name
=
consumer_name
,
name
=
name
),
durable
=
False
,
auto_delete
=
True
)
# queues created for topic subscriptions
self
.
queues
=
[]
# handle subscriptions
self
.
subscribe_to_topics
(
topics
)
def
subscribe_to_topics
(
self
,
topic_list
):
for
t
in
topic_list
:
queue
=
Queue
(
name
=
"{name}.{consumer_name}::{rkey}"
.
format
(
name
=
self
.
name
,
consumer_name
=
self
.
consumer_name
,
rkey
=
t
),
exchange
=
self
.
exchange
,
routing_key
=
t
,
durable
=
False
,
auto_delete
=
True
)
self
.
queues
.
append
(
queue
)
def
get_consumers
(
self
,
Consumer
,
channel
):
return
[
Consumer
(
queues
=
[
self
.
control_queue
],
callbacks
=
[
self
.
handle_control
],
no_ack
=
True
,
accept
=
[
'json'
]),
Consumer
(
queues
=
[
self
.
data_queue
],
callbacks
=
[
self
.
handle_data
],
no_ack
=
True
,
accept
=
[
"json"
])
Consumer
(
self
.
queues
,
callbacks
=
[
self
.
on_message
],
accept
=
[
'json'
]),
]
def
on_consume_ready
(
self
,
connection
,
channel
,
consumers
,
wakeup
=
True
,
**
kwargs
):
# control plane info
self
.
log
.
info
(
"{consumer_name} listening to control plane "
.
format
(
consumer_name
=
self
.
consumer_name
))
self
.
log
.
info
(
"Queue: control.{consumer_name}@{name} "
.
format
(
consumer_name
=
self
.
consumer_name
,
name
=
self
.
name
))
self
.
log
.
info
(
"Topic: control.{consumer_name}.toAgent.{name}"
.
format
(
consumer_name
=
self
.
consumer_name
,
name
=
self
.
name
))
# data plane info
self
.
log
.
info
(
"{consumer_name} listening to data plane"
.
format
(
consumer_name
=
self
.
consumer_name
))
self
.
log
.
info
(
"Queue: data.{consumer_name}@{name}"
.
format
(
consumer_name
=
self
.
consumer_name
,
name
=
self
.
name
))
self
.
log
.
info
(
"Topic: data.{consumer_name}.toAgent.{name}"
.
format
(
consumer_name
=
self
.
consumer_name
,
name
=
self
.
name
))
def
handle_control
(
self
,
body
,
message
):
self
.
log
.
debug
(
"DEFAULT HANDLE CONTROL"
)
self
.
log
.
debug
((
"Payload"
,
message
.
payload
))
self
.
log
.
debug
((
"Properties"
,
message
.
properties
))
self
.
log
.
debug
((
"Headers"
,
message
.
headers
))
self
.
log
.
debug
((
"body"
,
message
.
body
))
msg
=
None
try
:
msg
=
json
.
loads
(
body
)
self
.
log
.
debug
(
message
)
except
ValueError
as
e
:
message
.
ack
()
self
.
log
.
error
(
e
)
self
.
log
.
error
(
"Incorrect message: {0}"
.
format
(
body
))
if
msg
is
not
None
:
self
.
log
.
debug
(
"Just received that packet"
)
self
.
log
.
debug
(
msg
)
def
handle_data
(
self
,
body
,
message
):
"""
Args:
msg:
Returns:
def
on_message
(
self
,
body
,
message
):
assert
type
(
body
)
is
dict
# assert that kombu deserialized it already
json_body
=
json
.
dumps
(
body
)
"""
self
.
log
.
debug
(
"DEFAULT HANDLE DATA"
)
self
.
log
.
debug
((
"Payload"
,
message
.
payload
))
self
.
log
.
debug
((
"Properties"
,
message
.
properties
))
self
.
log
.
debug
((
"Headers"
,
message
.
headers
))
self
.
log
.
debug
((
"body"
,
message
.
body
))
# msg = None
# try:
# msg = json.loads(body)
# self.log.debug(message)
# except ValueError as e:
# message.ack()
# self.log.error(e)
# self.log.error("Incorrect message: {0}".format(body))
#
# if msg is not None:
# self.log.debug("Just received that packet")
# self.log.debug(msg)
def
test_connection
(
self
):
"""
Test if a component can talk on the event bus.
self
.
log
.
debug
(
"DEFAULT on_message callback, got: {}"
.
format
(
message
.
delivery_info
.
get
(
'routing_key'
)))
msg
=
EventBusMessage
.
load
(
json_body
,
message
.
delivery_info
.
get
(
'routing_key'
))
self
.
_on_message
(
msg
)
Returns:
def
_on_message
(
self
,
message
):
"Class to be overridden by children calss"
return
NotImplementedError
()
"""
# for key in [control key, data key]:
# log.info("Testing on local routing key: %s" % key)
# self.basic_publish(key, "PING!!")
pass
def
on_consume_ready
(
self
,
connection
,
channel
,
consumers
,
wakeup
=
True
,
**
kwargs
):
# control plane info
for
q
in
self
.
queues
:
self
.
log
.
info
(
"Listening on {queue_name} bound to {rkey} "
.
format
(
queue_name
=
q
.
name
,
rkey
=
q
.
routing_key
)
)
class
BaseController
(
Process
):
...
...
connectors/core.py
View file @
db5809e5
...
...
@@ -3,13 +3,12 @@ Plugin to connect to the F-interop backend
"""
import
json
import
logging
from
utils.messages
import
*
from
kombu
import
Producer
from
.base
import
BaseController
,
BaseConsumer
__version__
=
(
0
,
0
,
1
)
__version__
=
(
0
,
1
,
0
)
log
=
logging
.
getLogger
(
__name__
)
...
...
@@ -20,59 +19,33 @@ class CoreConsumer(BaseConsumer):
"""
def
__init__
(
self
,
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
):
super
(
CoreConsumer
,
self
).
__init__
(
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
)
def
get_consumers
(
self
,
Consumer
,
channel
):
return
[
Consumer
(
queues
=
[
self
.
control_queue
],
callbacks
=
[
self
.
handle_control
],
no_ack
=
True
,
accept
=
[
'json'
]),
Consumer
(
queues
=
[
self
.
data_queue
],
callbacks
=
[
self
.
handle_data
],
no_ack
=
True
,
accept
=
[
"json"
])
]
subscriptions
=
[
MsgTestingToolComponentReady
.
routing_key
]
super
(
CoreConsumer
,
self
).
__init__
(
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
,
subscriptions
)
def
on_consume_ready
(
self
,
connection
,
channel
,
consumers
,
wakeup
=
True
,
**
kwargs
):
log
.
info
(
"Backend ready to consume data"
)
# log.info("-------------------------------------------------")
# log.info("Go to this URL: http://{platform}/session/{session}".format(platform=self.server_url,
# session=self.session))
# log.info("-------------------------------------------------")
# let's send bootstrap message
msg
=
{
'_type'
:
'testingtool.component.ready'
,
'component'
:
self
.
name
,
"description"
:
"Component READY to start test suite."
}
producer
=
Producer
(
connection
,
serializer
=
'json'
)
producer
.
publish
(
msg
,
exchange
=
self
.
exchange
,
routing_key
=
'control.session'
)
def
handle_control
(
self
,
body
,
message
):
log
.
debug
(
"DEFAULT HANDLE CONTROL"
)
log
.
debug
((
"Payload"
,
message
.
payload
))
log
.
debug
((
"Properties"
,
message
.
properties
))
log
.
debug
((
"Headers"
,
message
.
headers
))
log
.
debug
((
"body"
,
message
.
body
))
msg
=
None
try
:
msg
=
json
.
loads
(
body
)
log
.
debug
(
message
)
except
ValueError
as
e
:
message
.
ack
()
log
.
error
(
e
)
log
.
error
(
"Incorrect message: {0}"
.
format
(
body
))
if
msg
is
not
None
:
log
.
debug
(
"Just received that packet"
)
log
.
debug
(
msg
)
msg
=
MsgTestingToolComponentReady
(
component
=
'agent.{}'
.
format
(
self
.
name
),
description
=
"Component READY to start test suite."
)
producer
=
Producer
(
connection
,
serializer
=
'json'
)
producer
.
publish
(
body
=
msg
.
to_dict
(),
exchange
=
self
.
exchange
,
routing_key
=
msg
.
routing_key
)
def
_on_message
(
self
,
message
):
self
.
log
.
debug
(
"Consumer specialized handler <{consumer_name}> got: {message}"
.
format
(
consumer_name
=
self
.
consumer_name
,
message
=
repr
(
message
)
)
)
class
CoreConnector
(
BaseController
):
...
...
connectors/http.py
View file @
db5809e5
import
json
__version__
=
(
0
,
0
,
1
)
__version__
=
(
0
,
1
,
0
)
import
logging
from
requests
import
Request
,
Session
...
...
connectors/ping.py
deleted
100644 → 0
View file @
c5726116
"""
Create ICMP packets to test if all nodes are up
"""
import
json
__version__
=
(
0
,
0
,
1
)
import
logging
from
.base
import
BaseController
,
BaseConsumer
import
subprocess
class
PingConsumer
(
BaseConsumer
):
"""
AMQP helper
"""
def
__init__
(
self
,
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
):
super
(
PingConsumer
,
self
).
__init__
(
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
)
def
handle_control
(
self
,
body
,
message
):
"""
Args:
body:
message:
Returns:
"""
required_keys
=
{
"host"
}
msg
=
None
try
:
msg
=
json
.
loads
(
body
)
self
.
log
.
debug
(
message
)
except
ValueError
as
e
:
message
.
ack
()
self
.
log
.
error
(
e
)
self
.
log
.
error
(
"Incorrect message: {0}"
.
format
(
body
))
if
msg
is
not
None
:
present_keys
=
set
(
msg
.
keys
())
if
not
present_keys
.
issuperset
(
required_keys
):
self
.
log
(
"Error you need those keys: %s"
%
required_keys
)
else
:
# Default version is ping
version
=
{
"IPv6"
:
"ping6"
,
"IPv4"
:
"ping"
}
executable
=
version
.
get
(
msg
[
"version"
],
"ping"
)
# default packet count is 1
count_option
=
msg
.
get
(
"count"
,
1
)
# network interface, default is None
interface_option
=
msg
.
get
(
"interface"
,
""
)
# run the ping
command
=
[
executable
,
msg
[
"host"
],
"-c"
,
str
(
count_option
)]
self
.
log
.
info
(
"command launched: %s"
%
command
)
p
=
subprocess
.
call
(
command
)
self
.
log
.
info
(
"result: %s"
%
p
)
class
PingConnector
(
BaseController
):
NAME
=
"ping"
def
__init__
(
self
,
**
kwargs
):
super
(
PingConnector
,
self
).
__init__
(
name
=
PingConnector
.
NAME
)
kwargs
[
"consumer_name"
]
=
PingConnector
.
NAME
self
.
consumer
=
PingConsumer
(
**
kwargs
)
self
.
consumer
.
log
=
logging
.
getLogger
(
__name__
)
self
.
consumer
.
log
.
setLevel
(
logging
.
DEBUG
)
def
run
(
self
):
self
.
consumer
.
run
()
connectors/serialconn.py
View file @
db5809e5
...
...
@@ -9,19 +9,32 @@ import threading
from
connectors.base
import
BaseController
,
BaseConsumer
from
utils.serial_listener
import
SerialListener
from
utils
import
arrow_down
,
arrow_up
,
finterop_banner
from
utils.messages
import
*
__version__
=
(
0
,
0
,
1
)
__version__
=
(
0
,
1
,
0
)
log
=
logging
.
getLogger
(
__name__
)
class
SerialConsumer
(
BaseConsumer
):
"""
AMQP helper
Serial interface consumer:
- Set up process which communicates with serial interface of mote through USB port
- transmits in and out 802.15.4 packet messages
"""
def
__init__
(
self
,
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
):
super
(
SerialConsumer
,
self
).
__init__
(
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
)
self
.
dispatcher
=
{
MsgPacketInjectRaw
:
self
.
handle_data
,
}
subscriptions
=
[
MsgPacketInjectRaw
.
routing_key
.
replace
(
'*'
,
name
).
replace
(
'ip.tun'
,
'802154.serial'
),
]
super
(
SerialConsumer
,
self
).
__init__
(
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
,
subscriptions
)
self
.
message_count
=
0
self
.
output
=
''
self
.
serial_listener
=
None
...
...
@@ -49,24 +62,15 @@ class SerialConsumer(BaseConsumer):
serial_listener_th
.
daemon
=
True
serial_listener_th
.
start
()
except
KeyError
as
e
:
logging
.
warning
(
'Cannot retrieve environment variables for serial connection: '
'FINTEROP_CONNECTOR_SERIAL_PORT/FINTEROP_CONNECTOR_BAUDRATE '
'If no sniffer/injector needed for test ignore this warning '
)
def
handle_data
(
self
,
body
,
message
):
def
handle_data
(
self
,
message
):
"""
Function that will handle serial management messages
exchange:
- default
example:
{
}
Forwards data packets from AMQP BUS to serial interface
"""
if
self
.
serial_port
is
None
:
...
...
@@ -80,7 +84,7 @@ class SerialConsumer(BaseConsumer):
try
:
self
.
output
=
'c0'
for
c
in
body
[
'
data
'
]
:
for
c
in
message
.
data
:
if
format
(
c
,
'02x'
)
==
'c0'
:
# endslip
self
.
output
+=
'db'
...
...
@@ -100,28 +104,9 @@ class SerialConsumer(BaseConsumer):
print
(
arrow_down
)
log
.
info
(
'
\n
# # # # # # # # # # # # SERIAL INTERFACE # # # # # # # # # # # # '
+
'
\n
data packet EventBus -> Serial'
+
'
\n
'
+
json
.
dumps
(
body
)
+
'
\n
'
+
message
.
to_json
(
)
+
'
\n
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # '
)
message
.
ack
()
def
handle_control
(
self
,
body
,
message
):
msg
=
None
try
:
msg
=
json
.
loads
(
body
)
log
.
debug
(
message
)
except
ValueError
as
e
:
message
.
ack
()
log
.
error
(
e
)
log
.
error
(
"Incorrect message: {0}"
.
format
(
body
))
return
if
msg
[
"_type"
]
in
self
.
dispatcher
.
keys
():
self
.
dispatcher
[
msg
[
"_type"
]](
msg
)
else
:
log
.
warning
(
"Not supported action"
)
class
SerialConnector
(
BaseController
):
...
...
connectors/tun.py
View file @
db5809e5
...
...
@@ -7,48 +7,54 @@ import json
import
logging
import
sys
import
datetime
from
utils.opentun
import
OpenTunLinux
,
OpenTunMACOS
from
utils
import
arrow_down
,
arrow_up
,
finterop_banner
from
utils.messages
import
*
from
kombu
import
Producer
from
connectors.base
import
BaseController
,
BaseConsumer
from
utils
import
arrow_down
,
arrow_up
,
finterop_banner
from
utils.opentun
import
OpenTunLinux
,
OpenTunMACOS
__version__
=
(
0
,
0
,
1
)
__version__
=
(
0
,
1
,
0
)
class
TunConsumer
(
BaseConsumer
):
"""
AMQP helper
Tun interface consumer:
- creates tunnel interface (RAW_IP)
- inyects IPv6 packets comming from event bus into tun interaface
- sniffs and forwards packets from tun to event bus
"""
def
__init__
(
self
,
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
):
super
(
TunConsumer
,
self
).
__init__
(
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
)
self
.
dispatcher
=
{
"tun.start"
:
self
.
handle_start
,
MsgAgentTunStart
:
self
.
handle_start
,
MsgPacketInjectRaw
:
self
.
handle_raw_packet_to_inject
,
}
self
.
tun
=
None
self
.
packet_count
=
0
def
handle_start
(
self
,
msg
):
"""
Function that will handle tun management messages
exchange:
- message
subscriptions
=
[
MsgAgentTunStart
.
routing_key
.
replace
(
'*'
,
name
),
MsgPacketInjectRaw
.
routing_key
.
replace
(
'*'
,
name
)
]
queue:
- tun
super
(
TunConsumer
,
self
).
__init__
(
user
,
password
,
session
,
server
,
exchange
,
name
,
consumer_name
,
subscriptions
)
order:
- start_tun
example:
{
"_type": "tun.start",
"ipv6_host": ":2",
"ipv6_prefix": "cccc"
}
def
_on_message
(
self
,
message
):
msg_type
=
type
(
message
)
assert
msg_type
in
self
.
dispatcher
.
keys
(),
'Event message couldnt be dispatched %s'
%
repr
(
message
)
self
.
log
.
debug
(
"Consumer specialized handler <{consumer_name}> got: {message}"
.
format
(
consumer_name
=
self
.
consumer_name
,
message
=
repr
(
message
)
)
)
self
.
dispatcher
[
msg_type
](
message
)
- stop_tun
def
handle_start
(
self
,
msg
):
"""
Function that will handle tun start event emitted coming from backend
"""
if
self
.
tun
is
not
None
:
self
.
log
.
warning
(
'Received open tun control message, but TUN already created'
)
...
...
@@ -56,16 +62,16 @@ class TunConsumer(BaseConsumer):
else
:
self
.
log
.
info
(
'starting tun interface'
)
try
:
ipv6_host
=
msg
.
get
(
"
ipv6_host
"
,
None
)
ipv6_prefix
=
msg
.
get
(
"
ipv6_prefix
"
,
None
)
ipv6_no_forwarding
=
msg
.
get
(
"
ipv6_no_forwarding
"
,
None
)
ipv4_host
=
msg
.
get
(
"
ipv4_host
"
,
None
)
ipv4_network
=
msg
.
get
(
"
ipv4_network
"
,
None
)
ipv4_netmask
=
msg
.
get
(
"
ipv4_netmask
"
,
None
)
ipv6_host
=
msg
.
ipv6_host
ipv6_prefix
=
msg
.
ipv6_prefix
ipv6_no_forwarding
=
msg
.
ipv6_no_forwarding
ipv4_host
=
msg
.
ipv4_host
ipv4_network
=
msg
.
ipv4_network
ipv4_netmask
=
msg
.
ipv4_netmask
except
AttributeError
as
ae
:
self
.
log
.
error
(
'Wrong message format: {0}'
.
format
(
msg
.
payload
)
'Wrong message format: {0}'
.
format
(
repr
(
msg
)
)
)
return
...
...
@@ -96,80 +102,49 @@ class TunConsumer(BaseConsumer):
self
.
log
.
error
(
'Agent TunTap not yet supported for: {0}'
.
format
(
sys
.
platform
))
sys
.
exit
(
1
)
msg
=
{
"_type"
:
"tun.started"
,
'name'
:
self
.
name
,
'ipv6_host'
:
ipv6_host
,
'ipv6_prefix'
:
ipv6_prefix
,
'ipv4_host'
:
ipv4_host
,
'ipv4_network'
:
ipv4_network
,
'ipv4_netmask'
:
ipv4_netmask
,
'ipv6_no_forwarding'
:
ipv6_no_forwarding
,
}
self
.
log
.
info
(
"Tun started. Publishing msg: %s"
%
json
.
dumps
(
msg
))
msg
=
MsgAgentTunStarted
(
name
=
self
.
name
,
ipv6_host
=
ipv6_host
,
ipv6_prefix
=
ipv6_prefix
,
ipv4_host
=
ipv4_host
,
ipv4_network
=
ipv4_network
,
ipv4_netmask
=
ipv4_netmask
,