Commit 480d9b6f authored by Federico Sismondi's avatar Federico Sismondi

Add() first version of fountain DCA (data collection and agregation) module....

Add() first version of fountain DCA (data collection and agregation) module. For the time being it only dumps to file system received POST'ed data from LoRa server
parent f3d97966
FROM python:3.6
ADD . /code
WORKDIR /code
RUN pip install -r requirements.txt
CMD python app.py
#!/usr/bin/env python3
"""
HTTP server implements logic:
1. listens to LoRa platform notifications (sensor readings, and more),
2. re-formats data (to fit fiware data model)
3. forwards data to IoT platform.
some other things it does:
- initializes Device entities
- updates Device and WaterQuality entities' attributes.
- dumps LoRa readings to disk
# datetime for now should be datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
"""
import re
import os
import json
import base64
import time
import dateutil.parser
import datetime
from geojson import Point
from flask import Flask, jsonify
from flask import request
import requests as client_request
app = Flask(__name__, )
print("Starting to collect..")
# get config from environment
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='.')
URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost'))
LORA_PLATFORM_URL = 'https://{}'.format(os.getenv('LORA_PLATFORM_SERVER', default='lora-ns.sig-ge.ch:443'))
LORA_PLATFORM_SECRET_FILE = os.getenv('LORA_PLATFORM_SECRET_FILE', 'secret_carouge_lora')
with open(LORA_PLATFORM_SECRET_FILE, 'r') as file:
secret = file.read().replace('\n', '')
assert secret, 'No auth token defined for LoRa platform API'
# AUTH Header for lora platform
h = {'Authorization': secret}
# build header for GET
http_header_get = {
'Fiware-Service': 'carouge',
'Accept': 'application/json',
}
def get_ngsiv2_typed_description(val):
if isinstance(val, Point):
return {'type': 'geo:json', 'value': dict(val)}
elif isinstance(val, int) or isinstance(val, float):
return {'type': 'Number', 'value': val}
elif isinstance(val, str):
pat = r'[<>"\'=;()]' # see https://fiware-orion.readthedocs.io/en/master/user/forbidden_characters/index.html
r = re.compile(pat)
if r.search(val) is None:
new_val = val
else:
new_val = re.sub(pat, '', val)
app.logger.warning("Special char(s) deleted. This <{}> turned into this <{}>".format(val, new_val))
return {'type': 'Text', 'value': new_val}
elif isinstance(val, datetime.datetime):
# replace +00:00 is the same as Z but Orion doesnt like it :/
return {'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")}
elif isinstance(val, dict):
return {'value': val} # bypassed
elif isinstance(val, list):
return {'value': val} # bypassed
elif isinstance(val, type(None)):
return {'type': 'Text', 'value': None}
else:
raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
# = = = = = = = = API HANDLERS = = = = = = = = = = =
class InvalidUsage(Exception):
status_code = 400
def __init__(self, message, status_code=None, payload=None):
Exception.__init__(self)
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or ())
rv['message'] = self.message
return rv
@app.errorhandler(InvalidUsage)
def handle_invalid_usage(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response
@app.route('/')
@app.route('/healthcheck')
def healthcheck():
return 'This service is up and running!'
# = = = = = = = = OTHER AUX FUNCTIONS = = = = = = = =
def _dump_raw_data_to_filesystem():
app.logger.info('dumping request to FS')
try:
filepath = os.path.join(COLLECTION_DIR, '{}.json'.format(int(time.time())))
with open(filepath, encoding='utf-8', mode='w') as f:
json.dump(
obj=request.json,
fp=f
)
except Exception as e:
app.logger.error(e)
return 'error'
return 'ok'
def _dummy_post_handler():
if request.data:
app.logger.info("Request data: %s" % request.data)
if request.form:
app.logger.info("Request form: %s" % request.form)
return 'Got POST with json %s' % request.json
# = = = = = = = = FLASK ROUTES = = = = = = = =
@app.route('/dca-carouge-fountain-sensed-data/rest/callback/payloads/ul', methods=['POST'])
def post_dca_carouge_fountain_sensed_data_payload():
# get values
#soil_humidity, sensor_eui = _get_reading(request.json['dataFrame']), request.json['deveui']
# log reading
#_log_level_of_moisture(soil_humidity, sensor_eui)
# push data to context broker
resp = {}
resp.update({"fs_dump": _dump_raw_data_to_filesystem()})
#resp.update({"update_flowerbed_entities": _update_flowerbed_entity(soil_humidity, sensor_eui)})
# resp.update({"update_device_entities": _update_device_entity(
# deveui=request.json['deveui'],
# comment=None,
# longitude=None,
# latitude=None,
# last_reception=None,
# dataFrame=request.json['dataFrame'],
# dataFrame_rssi=request.json['rssi'],
# dataFrame_timestamp=request.json['timestamp']
# )})
return resp
@app.route('/dca-carouge-fountain-sensed-data/rest/callback/nodeinfo', methods=['PUT'])
def put_dca_carouge_fountain_sensed_node_info():
_dummy_post_handler()
raise InvalidUsage(message='This is still not implemented', payload=request.json)
@app.route('/dca-carouge-fountain-sensed-data/rest/callback/payloads/ul', methods=['GET'])
def get_dca_carouge_fountain_sensed_data_callback():
raise InvalidUsage(message='This is still not implemented')
# = = = = = = = = MAIN = = = = = = = =
def init():
pass
def main():
app.run(host="0.0.0.0", debug=True, port=80)
if __name__ == "__main__":
#init()
main()
dump dir for collected raw data
flask
requests
unidecode
geojson
python-dateutil
......@@ -42,11 +42,12 @@ for pilot in entities_map.keys():
print(r.json())
else:
print('Entities updated (HTTP response is 2xx)')
for entity in r.json():
entities_map[pilot].append(entity['id'])
except StopIteration:
pass
for entity in r.json():
entities_map[pilot].append(entity['id'])
pprint(entities_map)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment