Commit 7ab6545a authored by Luca Lamorte's avatar Luca Lamorte

Create connfiguration for AMQP and new WS messaging exchange

parent df384acc
{"exchange": "default", "orchestrator": "http://locahost:50001", "version": "0.0.1", "connected": false, "url": "amqp://luca:luca@finterop1/session1", "component": "gui_tt"}
\ No newline at end of file
import osimport pikaimport tracebackimport loggingimport errno VERSION = '0.0.1'COMPONENT_ID = "gui_tt" ALLOWED_EXTENSIONS = set(['json', 'pcap']) RABBIT_MQ_CONNECTED = False logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG) project_dir = os.path.abspath(os.path.join(os.path.realpath(__file__), os.pardir)) if '/testing_tool_gui' in project_dir: project_dir = os.path.abspath(os.path.join(project_dir, os.pardir)) TMPDIR = os.path.join(project_dir, 'tmp')DATADIR = os.path.join(project_dir, 'data') UPLOAD_FOLDER = TMPDIR # generate dirs for d in TMPDIR, DATADIR: try: os.makedirs(d) except OSError as e: if e.errno != errno.EEXIST: raise try: AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE']) except KeyError as e: print('couldnt import AMQP_EXCHANGE from environment') AMQP_EXCHANGE = "default" try: AMQP_URL = str(os.environ['AMQP_URL']) except KeyError as e: print('couldnt import AMQP_URL from environment') # TODO: REMOVE PERSONAL CONFIGURATION ## AMQP_URL = "amqp://guest:guest@localhost/" AMQP_URL = str(os.getenv('AMQP_URL', 'amqp://luca:luca@finterop1/session1')) def get_connection(AMQP_URL): try: print('Setting up AMQP connection... %s' % AMQP_URL) # setup AMQP connection return pika.BlockingConnection(pika.URLParameters(AMQP_URL)) except pika.exceptions.ConnectionClosed as cc: print(' AMQP cannot be established, is message broker up? \n More: %s' % traceback.format_exc()) return None def get_configuration(status = None): global RABBIT_MQ_CONNECTED if status: RABBIT_MQ_CONNECTED = status cfg = dict() cfg["url"] = AMQP_URL cfg["exchange"] = AMQP_EXCHANGE cfg["version"] = VERSION cfg["component"] = COMPONENT_ID cfg["connected"] = RABBIT_MQ_CONNECTED return cfg
\ No newline at end of file
import loggingimport errnoimport os VERSION = '0.0.1'COMPONENT_ID = "gui_tt" ALLOWED_EXTENSIONS = set(['json', 'pcap'])RABBIT_MQ_CONNECTED = False logger = logging.getLogger(__name__)logger.setLevel(logging.DEBUG)sh = logging.StreamHandler()sh.setFormatter(logging.Formatter('%(asctime)s - %(name)-1s:%(lineno)d\t - %(levelname)s - %(message)s'))sh.setLevel(logging.DEBUG)logger.addHandler(sh) project_dir = os.path.abspath(os.path.join(os.path.realpath(__file__), os.pardir)) # Set WS LOGs level ERROR werklog = logging.getLogger('werkzeug') werklog.setLevel(logging.ERROR) if '/testing_tool_gui' in project_dir: project_dir = os.path.abspath(os.path.join(project_dir, os.pardir)) TMPDIR = os.path.join(project_dir, 'tmp') DATADIR = os.path.join(project_dir, 'data') UPLOAD_FOLDER = TMPDIR # generate dirs for d in TMPDIR, DATADIR: try: os.makedirs(d) except OSError as e: if e.errno != errno.EEXIST: raise try: AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE']) except KeyError as e: logger.debug('Couldnt import AMQP_EXCHANGE from environment') AMQP_EXCHANGE = "default" try: AMQP_URL = str(os.environ['AMQP_URL']) except KeyError as e: print('Couldnt import AMQP_URL from environment') # TODO: REMOVE PERSONAL CONFIGURATION ## AMQP_URL = "amqp://guest:guest@localhost/" ## AMQP_URL = str(os.getenv('AMQP_URL', 'amqp://luca:luca@finterop1/session1'))
\ No newline at end of file
......
......@@ -9,36 +9,42 @@ from flask_socketio import SocketIO
from testing_tool_gui.utils.amqp_synch_call import publish_message
from testing_tool_gui.utils.messages import *
from testing_tool_gui.gui.amqp_sniffer import *
from testing_tool_gui.gui import *
from werkzeug.utils import secure_filename
from testing_tool_gui.gui.amqp_sniffer import AmqpSniffer
from testing_tool_gui.gui.utils import *
from testing_tool_gui.gui import UPLOAD_FOLDER, ALLOWED_EXTENSIONS
from werkzeug.utils import secure_filename
amqp_connection = get_connection(AMQP_URL)
logger = logging.getLogger(__name__)
if amqp_connection:
my_channel = amqp_connection.channel()
get_configuration(status=True)
my_channel = None
amqp_listener = None
thread = None
# WEB SOCKET
NAMESPACE = "/ws"
async_modes = ['threading', 'eventlet', 'gevent','gevent_uwsgi']
async_mode = async_modes[0]
# FLASK API
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['SECRET_KEY'] = 'secret!'
# START SOCKETIO
socketio = SocketIO(app, async_mode=async_mode)
thread = None
ALLOWED_EXTENSIONS = set(['pcap'])
#Read Configuration
app_cfg = Configuration(socketio)
def start_amqp_sniffer():
amqp_listener = AmqpSniffer(amqp_connection, socketio)
global amqp_listener
global app_cfg
amqp_listener = AmqpSniffer(app_cfg, socketio)
amqp_listener.start()
socketio.emit("change_cfg", json.dumps(app_cfg.get()), namespace=NAMESPACE)
def allowed_file(filename):
......@@ -50,35 +56,56 @@ def allowed_file(filename):
@socketio.on('send_amqp_msg', namespace=NAMESPACE)
def test_message(message):
print("SEND MESSAGE %s" % message["msg"])
if my_channel:
try:
global app_cfg
logger.info("SEND MESSAGE %s" % message["msg"])
try:
if app_cfg.is_connected:
msg = message_types_dict[message["msg"]]()
publish_message(my_channel, msg)
except Exception as ex:
print("EXCEPTION %s!"%ex)
else:
print("Channel is not available")
publish_message(app_cfg.get_channel(), msg)
logger.debug("MSG Sent!")
else:
socketio.emit("info",
"warning#Connection with RabbitMQ is not available. Please try later or change settings!.",
namespace=NAMESPACE)
except Exception as ex:
logger.error("EXCEPTION %s!" %ex)
socketio.emit("info",
"danger# Exception while sending message: %s" %ex,
namespace=NAMESPACE)
@socketio.on('change_config', namespace=NAMESPACE)
def test_message(config):
global amqp_listener
global app_cfg
try:
logger.info(" Change configuration: %s"+json.dumps(config))
if amqp_listener:
amqp_listener.stop()
logger.info("CDF: %s" % config)
cfg = config
app_cfg.update(url=cfg["url"],
exchange=cfg["exchange"],
orchestrator=cfg["orchestrator"])
logger.info("Try to connect sniffer...")
socketio.emit("change_cfg", "warning#Configuration saved. Trying to connect ...", namespace=NAMESPACE)
socketio.start_background_task(target=start_amqp_sniffer)
except Exception as error:
logger.error("Exception in %s" % error)
socketio.emit("info",
"danger# Exception while saving config: %s" % error,
namespace=NAMESPACE)
print(" Change configuration: %s"+json.dumps(config))
# global AMQP_EXCHANGE
# global AMQP_URL
# global amqp_connection
#
# AMQP_URL = config["url"]
# AMQP_EXCHANGE = config["exchange"]
# amqp_connection = get_connection(AMQP_URL)
# socketio.start_background_task(target=start_amqp_sniffer)
@socketio.on('connect', namespace=NAMESPACE)
def test_connect():
print("WS Connected")
logger.info("WS Connected")
global thread
if thread is None:
......@@ -87,7 +114,7 @@ def test_connect():
@socketio.on('disconnect', namespace=NAMESPACE)
def test_disconnect():
print('WS disconnected')
logger.info('WS disconnected')
# ================= WEB ROUTES ===================== #
......@@ -99,7 +126,7 @@ def main_page():
@app.route('/html/<template>')
def get_html_template(template):
print("request for %s template" % template)
logger.info("request for %s template" % template)
try:
return render_template('%s.html' % template)
except TemplateNotFound:
......@@ -125,10 +152,10 @@ def configuration():
pass
else:
cfg = get_configuration()
print(json.dumps(cfg))
global app_cfg
logger.info(" [GET] CONFIGUTATION : %s" %repr(app_cfg))
response = app.response_class(
response=json.dumps(cfg),
response=json.dumps(app_cfg.get()),
status=200,
mimetype='application/json'
)
......@@ -171,18 +198,17 @@ def upload():
msg = MsgPrivacyAnalyze(
filename='%s' % filename,
value=enc.decode("utf-8")
)
logger.info("Message ready, PCAP bytes: \n" + repr(msg))
logger.info("Sending Request through AMQP interface ...")
publish_message(my_channel, msg)
else:
return "HAI MANDATO GET"
return redirect("/", code=302)
return redirect("/", code=302)
return redirect("/", code=404)
if __name__ == '__main__':
print("Start App listening on port 8080")
logger.debug("Start App listening on port 8080")
socketio.run(app, port=8080)
import threading
import uuid
import json
from collections import OrderedDict
import datetime
import json
import pika
from testing_tool_gui.gui import *
logger = logging.getLogger(__name__)
class AmqpSniffer(threading.Thread):
def __init__(self, conn, socketio):
def __init__(self, configuration, socketio):
threading.Thread.__init__(self)
# queues & default exchange declaration
self.message_count = 0
self.connection = conn
self.cfg = configuration
self.socketio = socketio
self.channel = self.connection.channel()
self.channel = self.cfg.get_amqp_channel()
services_queue_name = 'services_queue@%s' % COMPONENT_ID
self.channel.queue_declare(queue=services_queue_name)
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
self.channel.queue_bind(exchange=self.cfg.get_amqp_exchange(),
queue=services_queue_name,
routing_key='#')
# Hello world message
......@@ -44,6 +47,7 @@ class AmqpSniffer(threading.Thread):
self.channel.stop_consuming()
services_queue_name = 'services_queue@%s' % COMPONENT_ID
self.channel.queue_delete(queue=services_queue_name)
logger.debug("Sniffer stopped!")
def on_request(self, ch, method, props, body):
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
......@@ -52,7 +56,7 @@ class AmqpSniffer(threading.Thread):
# return
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
logging.info("Message sniffed: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
logger.info("Message sniffed: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
self.message_count += 1
props_dict={
......@@ -73,19 +77,19 @@ class AmqpSniffer(threading.Thread):
#let's get rid of values which are empty
props_dict_only_non_empty_values = {k: v for k, v in props_dict.items() if v is not None}
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *'%self.message_count)
print("TIME: %s"%datetime.datetime.time(datetime.datetime.now()))
print(" - - - ")
print("ROUTING_KEY: %s" % method.routing_key)
print(" - - - ")
print("HEADERS: %s" % props.headers)
print(" - - - ")
print("PROPS: %s" %json.dumps(props_dict_only_non_empty_values))
print(" - - - ")
print('BODY %s' % json.dumps(req_body_dict))
print(" - - - ")
logger.debug('* * * * * * MESSAGE SNIFFED (%s) * * * * * * *'%self.message_count)
logger.debug("TIME: %s"%datetime.datetime.time(datetime.datetime.now()))
logger.debug(" - - - ")
logger.debug("ROUTING_KEY: %s" % method.routing_key)
logger.debug(" - - - ")
logger.debug("HEADERS: %s" % props.headers)
logger.debug(" - - - ")
logger.debug("PROPS: %s" %json.dumps(props_dict_only_non_empty_values))
logger.debug(" - - - ")
logger.debug('BODY %s' % json.dumps(req_body_dict))
logger.debug(" - - - ")
#print("ERRORS: %s" % )
print('* * * * * * * * * * * * * * * * * * * * * \n')
logger.debug('* * * * * * * * * * * * * * * * * * * * * \n')
amqp_msg = dict()
......@@ -98,18 +102,18 @@ class AmqpSniffer(threading.Thread):
try:
self.socketio.emit("amqp_msg", json.dumps(amqp_msg), namespace="/ws")
except Exception as err:
print(" print error: %s" %err)
logger.error(" print error: %s" %err)
if props.content_type != "application/json":
print('* * * * * * API VALIDATION WARNING * * * * * * * ')
print("props.content_type : " + str(props.content_type))
print("application/json was expected")
print('* * * * * * * * * * * * * * * * * * * * * \n')
logger.debug('* * * * * * API VALIDATION WARNING * * * * * * * ')
logger.debug("props.content_type : " + str(props.content_type))
logger.debug("application/json was expected")
logger.debug('* * * * * * * * * * * * * * * * * * * * *')
if '_type' not in req_body_dict.keys():
print('* * * * * * API VALIDATION WARNING * * * * * * * ')
print("no < _type > field found")
print('* * * * * * * * * * * * * * * * * * * * * \n')
logger.debug('* * * * * * API VALIDATION WARNING * * * * * * * ')
logger.debug("no < _type > field found")
logger.debug('* * * * * * * * * * * * * * * * * * * * * ')
def get_message(self):
return self.messages
......@@ -125,9 +129,9 @@ class AmqpSniffer(threading.Thread):
def run(self):
print("Starting thread listening on the event bus")
logger.debug("Starting thread listening on the event bus")
self.channel.start_consuming()
print('Bye byes!')
logger.debug('Bye byes!')
def publish_event(channel, message):
......
......@@ -27,12 +27,28 @@ $(document).ready(function() {
$("#total_msg").html(parseInt($("#total_msg").html())+1)
});
socket.on('info', function (msg) {
var info = msg.split("#")
send_alert("INFO:", info[1], info[0])
});
socket.on('configuration', function (data) {
console.log("Received Configuration: "+data)
socket.on("change_cfg", function (data) {
console.log("Change config");
var msg = JSON.parse(data)
console.log("Connected? " + msg["connected"]);
if (msg["connected"] == true){
$('#amqp_icon').removeClass("glyphicon-remove").addClass("glyphicon-ok");
$('#amqp_status').text("Sniffer ON").css('color', 'greenyellow');
send_alert("AMQP", "Connection ennstablished with RabbitMQ", "info")
}
else {
$('#amqp_icon').removeClass("glyphicon-ok").addClass("glyphicon-remove");
$('#amqp_status').text("Sniffer OFF").css('color', 'grey');
send_alert("AMQP", "Connection refused with AMQP_URL: "+msg["url"], "danger")
}
});
$('#filter_rk').keyup(function() {
filterByRoutingKey()
});
......@@ -40,6 +56,7 @@ $(document).ready(function() {
$.get('/amqp_msg_list', function(data){
read_message_list(data)
});
$.get('/configuration', function(data){
read_configuration(data)
});
......@@ -78,8 +95,13 @@ function showDetails(value){
function change_cfg(){
$('#amqp_icon').removeClass("glyphicon-ok").addClass("glyphicon-remove");
$('#amqp_status').text("Sniffer OFF").css('color', 'grey');
console.log("send configuration to server")
socket.emit('change_config', { 'url' : $("#amqp_url").val(), "exchange": $("#amqp_exchange").val()});
socket.emit('change_config', { 'url' : $("#amqp_url").val(),
"exchange": $("#amqp_exchange").val(),
"orchestrator": $("#orcherstrator_url").val()});
}
......@@ -90,10 +112,11 @@ function send_amqp_message(message){
function read_configuration(cfg){
// console.log(data)
//console.log(data)
// var cfg = JSON.parse(data)
$("#amqp_url").val(cfg["url"])
$("#amqp_exchange").val(cfg["exchange"])
$("#orcherstrator_url").val(cfg["orchestrator"])
if (cfg["connected"] ){
$('#amqp_icon').removeClass("glyphicon-remove").addClass("glyphicon-ok");
$('#amqp_status').text("Sniffer ON").css('color', 'greenyellow');
......
......@@ -127,7 +127,6 @@
<li><a href="#">Get Status</a></li>
</ul>
</li>
<li><a href="coap.html">CoAP TT</a></li>
<li><a href="#">Orchestrator API</a></li>
<li><a href="http://doc.f-interop.eu/">Docs</a></li>
</ul>
......@@ -151,7 +150,7 @@
<div class="col-lg-8">
<p class="text-left"><strong>Current GUI Configuration</strong></p>
<p class="text-left">
<a href="#" class="btn btn-primary btn-block btn-sm" onclick="send_alert('TBD!','Not Implemented yet', 'warning')">Save</a>
<a href="#" class="btn btn-primary btn-block btn-sm" onclick="change_cfg()">Save</a>
</p>
</div>
</div>
......
import pika
import logging
import json
import os
from testing_tool_gui.gui import DATADIR, VERSION, COMPONENT_ID
logger = logging.getLogger(__name__)
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
except KeyError as e:
logger.debug('Couldnt import AMQP_EXCHANGE from environment')
AMQP_EXCHANGE = "default"
try:
AMQP_URL = str(os.environ['AMQP_URL'])
except KeyError as e:
print('Couldnt import AMQP_URL from environment')
# TODO: REMOVE PERSONAL CONFIGURATION
AMQP_URL = "amqp://guest:guest@localhost/"
## AMQP_URL = str(os.getenv('AMQP_URL', 'amqp://luca:luca@finterop1/session1'))
class Configuration:
FILENAME = 'configuration.cfg'
def __init__(self, url=AMQP_URL, exchange=AMQP_EXCHANGE, orchestrator="", connected=False):
global logger
self.logger = logger
self.url = url
self.exchange = exchange
self.orchestrator_url = orchestrator
self.connected = connected
self.amqp_channel = None
self.amqp_connection = None
self.read_file()
def get_amqp_channel(self, create_new=False):
if self.amqp_connection is None or create_new:
try:
self.logger.debug('Setting up AMQP connection... %s' % self.url)
self.amqp_connection = pika.BlockingConnection(pika.URLParameters(self.url))
self.logger.debug('Connected with RabbitMQ!')
except pika.exceptions.ConnectionClosed as cc:
self.logger.error('Cannot establish a communication with RabbitMQ using this URL: %s' % self.url)
self.logger.error(cc)
return None
if self.amqp_channel is None or create_new:
self.amqp_channel = self.amqp_connection.channel()
self.connected = True
return self.amqp_channel
def update(self, url, exchange, orchestrator):
logger.info("Update configuration ...")
self.url = url
self.exchange = exchange
self.orchestrator_url = orchestrator
self.connected = False
self.amqp_channel = None
self.amqp_connection = None
self.logger.debug("CFG Update with %s %s %s" % (self.url, self.orchestrator_url, self.exchange))
self.to_file()
def get_channel(self):
return self.amqp_channel
def get_amqp_url(self):
return self.url
def get_amqp_exchange(self):
return self.exchange
def get_orchestrator_url(self):
return self.orchestrator_url
def is_connected(self):
return self.connected
def to_file(self, filename=FILENAME):
cfg_file = os.path.join(DATADIR, filename)
self.logger.debug("Write configuration to file %s" % cfg_file)
with open(cfg_file, 'w+') as outfile:
json.dump(self.get(), outfile)
self.logger.debug("Create a new configuration file: %s " % cfg_file)
self.logger.debug("CFG: %s " % repr(self))
def read_file(self, filename=FILENAME):
cfg_file = os.path.join(DATADIR, filename)
if os.path.exists(cfg_file):
self.logger.debug("Read configuration from file: %s" % cfg_file)
with open(cfg_file) as data_file:
data = json.load(data_file)
self.url = data["url"]
self.orchestrator_url = data["orchestrator"]
self.exchange = data["exchange"]
self.logger.debug("CFG update from file: %s" % cfg_file)
self.logger.debug("CFG: %s"% repr(self))
else:
self.logger.error("File %s does not exists!" % cfg_file)
def get(self):
cfg = dict()
cfg["url"] = self.url
cfg["exchange"] = self.exchange
cfg["orchestrator"] = self.orchestrator_url
cfg["connected"] = self.connected
cfg["version"] = VERSION
cfg["component"] = COMPONENT_ID
return cfg
def __repr__(self):
return json.dumps(self.get())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment