Commit dcc0eb0d authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'scripts_for_pushing_local_data_to_cloud' into 'master'

Scripts for pushing local data to cloud

See merge request !18
parents 5fb0753e d69679d0
......@@ -18,6 +18,7 @@ import os
import json
import base64
import time
import requests
import dateutil.parser
import datetime
......@@ -29,42 +30,96 @@ import requests as client_request
app = Flask(__name__, )
# get config from environment
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='.')
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='raw_data')
URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost'))
# QuantumLeap subscription (for pushing local data to hsitorical DB)
NOTIFICATION_ID = "5f60dcf399fc2ae29f141b92"
NOTIFICATION_URL = 'http://{}:8668/v2/notify'.format(os.getenv('ORION_HOST', default='localhost'))
LORA_PLATFORM_URL = os.getenv('LORA_PLATFORM_URL')
LORA_PLATFORM_SECRET_FILE = os.getenv('LORA_PLATFORM_SECRET_FILE', 'secret_carouge_lora')
# URN schema for Device entities
# URN_BASE_DEVICE_ENTITY = "urn:ngsi-ld:Device:Device-"
URN_FOUNTAIN_ENTITY = "urn:ngsi-ld:WaterQualityObserved:Fountain-1"
# build header for POST
http_header_post = {
'Fiware-Service': 'carouge',
'Fiware-ServicePath': '/',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
def get_ngsiv2_typed_description(val, force_date_modified=None):
"""
This function updates the attributes data format and meta data for aligining it to ngsi v2
:param val: Value to be added to entity as attribute
:param force_date_modified: Optional datetime for updating dateModified meta data of attribute
:return: Dictionary of attribute typed and formated as NGSI-v2
"""
ret = {}
if force_date_modified and isinstance(force_date_modified, datetime.datetime):
# adds datetime metadata
ret.update(
{"metadata":
{"dateModified":
{"type": "DateTime", "value": str(force_date_modified.isoformat()).replace("+00:00", "Z")}
}
}
)
if force_date_modified and isinstance(force_date_modified, str):
# adds datetime metadata
ret.update(
{"metadata":
{"dateModified": {"type": "DateTime", "value": force_date_modified}}
}
)
assert "Z" in force_date_modified, "Only Zulu time is accepted"
def get_ngsiv2_typed_description(val):
if isinstance(val, Point):
return {'type': 'geo:json', 'value': dict(val)}
ret.update({'type': 'geo:json', 'value': dict(val)})
elif isinstance(val, int) or isinstance(val, float):
return {'type': 'Number', 'value': val}
ret.update({'type': 'Number', 'value': val})
elif isinstance(val, str):
pat = r'[<>"\'=;()]' # see https://fiware-orion.readthedocs.io/en/master/user/forbidden_characters/index.html
r = re.compile(pat)
if r.search(val) is None:
new_val = val
else:
new_val = re.sub(pat, '', val)
app.logger.warning("Special char(s) deleted. This <{}> turned into this <{}>".format(val, new_val))
return {'type': 'Text', 'value': new_val}
ret.update({'type': 'Text', 'value': new_val})
elif isinstance(val, datetime.datetime):
# replace +00:00 is the same as Z but Orion doesnt like it :/
return {'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")}
ret.update({'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")})
elif isinstance(val, dict):
return {'value': val} # bypassed
ret.update({'value': val}) # bypassed
elif isinstance(val, list):
return {'value': val} # bypassed
ret.update({'value': val}) # bypassed
elif isinstance(val, type(None)):
return {'type': 'Text', 'value': None}
ret.update({'type': 'Text', 'value': None})
else:
raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
return ret
# = = = = = = = = API HANDLERS = = = = = = = = = = =
class InvalidUsage(Exception):
status_code = 400
......@@ -94,6 +149,81 @@ def healthcheck():
return 'This service is up and running!'
# = = = = = = = = DATA CONVERSION = = = = = = = = =
def get_model_water_quality_observed(
dateObserved=None,
location=None,
temperature=None,
pH=None,
turbidity=None,
redox=None,
freeChlorine=None,
totalChlorine=None,
chlorateEstimation=None,
meta_datetime=None,
):
"""
If meta_datetime is provided then all the attrs are assigned meta_datetime as dateModified meta date
:param dateObserved:
:param location:
:param temperature:
:param pH:
:param turbidity:
:param redox:
:param freeChlorine:
:param totalChlorine:
:param chlorateEstimation:
:param meta_datetime:
:return:
comments by IBATECH:
--------------------
DIS1 and DIS2 belong to Free Chlorine and Total Chlorine respectively. Combined Chlorine is the subtraction between both.
Normally (in drinking waters) such values are closed to 0. According to our tests, Drinking waters (no public fountains) of Madrid are about:
· Free Chlorine about 0 -0.03 mg/L
· Total Chlorine about 0 - 0.20 mg/L.
In public fountains such values must be higher when disinfections works take place.
Chlorates measurement (in real time) is an indirect measurement carried out by the combination of the measurements of all sensors. I can advance you (if it helps for data models) that according to our preliminary tests the regression formula for Chlorates is the following:
Clo3- = 712,251243429412-44,82636718345*Ph-66,3367276422549*Free Cl2 (mg/L)+372,454561380321*Total Cl2 (mg/L)-0,875793659900675*ORP (mV)-4,51465618459959*Tº (ºC)+1,17321348815874*Turbidity (NTU).
Anyway, it is only a preliminary formula that needs to be further studied when Monitoring water station comes back to IBATECH.
"""
if location is None:
location = Point([6.137187971686302, 46.18405327027239]) # GPS coordinates of Fontaine des Tours
ret = {
'id': URN_FOUNTAIN_ENTITY,
'type': "WaterQualityObserved",
}
values = [
'temperature',
'pH',
'turbidity',
'redox',
'freeChlorine',
'totalChlorine',
'chlorateEstimation',
'dateObserved',
'location'
]
for v in values:
key = v
val = locals()[v] # gets var value from var name
if val is None:
continue
val = get_ngsiv2_typed_description(val, meta_datetime)
ret.update({key: val})
return ret
# = = = = = = = = OTHER AUX FUNCTIONS = = = = = = = =
def _dump_raw_data_to_filesystem():
app.logger.info('dumping request to FS')
......@@ -132,16 +262,21 @@ def get_values_from_raw_reading(message_data):
03 |-> 856
58 _|
Device 1:
units:
ph *100 ,
T celcius * 10,
DIS1 *100
DIS2 *100
DIS1 *100 -> free Cl
DIS2 *100 -> total Cl
REDOX mV
Device 2 (turbidity) still not tested as is not currently in used
:param message_data: modbus base64 encoded message
:return: (ph, T , DIS1, DIS2, redox)
:return: [ph, T , DIS1, DIS2, redox] or [turbidity]
"""
resp = []
reading_hex = base64.standard_b64decode(message_data).hex()
......@@ -159,27 +294,22 @@ def get_values_from_raw_reading(message_data):
app.logger.info("modbus / LoRa registers status : {}".format("{0:b}".format(int(status_registry, 16))))
app.logger.info("modbus / LoRa registers values : {}".format(values_registry))
print("modbus / LoRa registers status : {}".format("{0:b}".format(int(status_registry, 16))))
print("modbus / LoRa registers values : {}".format(values_registry))
# print("modbus / LoRa registers status : {}".format("{0:b}".format(int(status_registry, 16))))
# print("modbus / LoRa registers values : {}".format(values_registry))
return [v for v in resp if v] # clean emtpy values (modbus module always the 16 channels in the logger)
return resp
# = = = = = = = = FLASK ROUTES = = = = = = = =
# = = = = = = = = HTTP SERVER - FLASK ROUTES = = = = = = = =
@app.route('/dca-carouge-fountain-sensed-data/rest/callback/payloads/ul', methods=['POST'])
def post_dca_carouge_fountain_sensed_data_payload():
# get values
# soil_humidity, sensor_eui = get_values_from_raw_reading(request.json['dataFrame']), request.json['deveui']
# log reading
# _log_level_of_moisture(soil_humidity, sensor_eui)
if request.json['deveui'] == '70b3d5078000066e':
app.logger.info('Got message from probe LoRa device, hence sensors have LoRa coverage')
return {'status': 'ok'}
if not request.json['dataFrame'] or request.json['dataFrame'] == '' :
app.logger.error('Message with empty <data frame> field from sensor {}'.format(request.json['deveui'] ))
if not request.json['dataFrame'] or request.json['dataFrame'] == '':
app.logger.error('Message with empty <data frame> field from sensor {}'.format(request.json['deveui']))
raise InvalidUsage(message='Empty data frame field received', payload=request.json)
app.logger.info("Got message from FOUNTAIN devices: {}".format(request.json))
......@@ -187,36 +317,122 @@ def post_dca_carouge_fountain_sensed_data_payload():
# push data to context broker
resp = {}
resp.update({"fs_dump": _dump_raw_data_to_filesystem()})
# resp.update({"update_flowerbed_entities": _update_flowerbed_entity(soil_humidity, sensor_eui)})
# resp.update({"update_device_entities": _update_device_entity(
# deveui=request.json['deveui'],
# comment=None,
# longitude=None,
# latitude=None,
# last_reception=None,
# dataFrame=request.json['dataFrame'],
# dataFrame_rssi=request.json['rssi'],
# dataFrame_timestamp=request.json['timestamp']
# )})
resp.update({"update_entities": _update_fountain_entity(
request.json['dataFrame'],
request.json['timestamp'])
})
return resp
@app.route('/dca-carouge-fountain-sensed-data/rest/callback/nodeinfo', methods=['PUT'])
def put_dca_carouge_fountain_sensed_node_info():
app.logger.info("Got FOUNTAINS message: {}".format(request.json))
# = = = = = = POST to IoT Platform = = = = = =
def _update_fountain_entity(sensor_dataFrame, timestamp):
"""Posts LoRa sensor data to Context Manager (Orion)
"""
values = get_values_from_raw_reading(sensor_dataFrame)
if len(values) == 1: # this is LoRa Device w/ the turbidity sensor
entity = get_model_water_quality_observed(
turbidity=values[0],
dateObserved=timestamp,
)
elif len(values) == 5: # this is the device with 5 sensors
entity = get_model_water_quality_observed(
pH=values[0] / 100 if values[0] else None, # in pH standard unit
temperature=values[1] / 10 if values[1] else None, # in grad C
freeChlorine=values[2] / 100 if values[2] else None, # in mg/L
totalChlorine=values[3] / 100 if values[3] else None, # in mg/L
redox=values[4] / 1000 if values[4] else None, # in volts
dateObserved=timestamp,
)
else:
raise InvalidUsage(
message='Device didnt send amount of values expected (1 or 5), but instead {} '.format(values),
payload="Data frame was {}".format(sensor_dataFrame)
)
# id and type attrs are not needed when patching entity
entity.pop('id')
entity.pop('type')
url = "{}/v2/entities/{}/attrs".format(URL_BASE, URN_FOUNTAIN_ENTITY)
r = client_request.patch(
url=url,
headers=http_header_post,
json=entity
)
if not (200 <= r.status_code < 300):
app.logger.error('HTTP status code not 2xx, something went wrong..')
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "Got error {}".format(r.reason)
else:
app.logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
return "ok"
if request.data:
app.logger.info("\t Request data: %s" % request.data)
if request.form:
app.logger.info("\t Request form: %s" % request.form)
raise InvalidUsage(message='This is still not implemented', payload=request.json)
def _update_historical_db_from_local_raw_files():
"""Posts data to Quantum Leap API directly as notification (without passing through Context Manager)
Used for pushing locally collected data to historical API service
"""
json_files = [os.path.join(COLLECTION_DIR, x) for x in os.listdir(COLLECTION_DIR) if x.endswith("json")]
data = []
for json_file in json_files:
@app.route('/dca-carouge-fountain-sensed-data/rest/callback/payloads/ul', methods=['GET'])
def get_dca_carouge_fountain_sensed_data_callback():
raise InvalidUsage(message='This is still not implemented')
# get data from file
with open(json_file, "r") as f:
json_data = json.load(f)
if not json_data['dataFrame'] or json_data['dataFrame'] == '':
print("(!) Warning found emtpy data frame from device {}".format(json_data['deveui']))
values = get_values_from_raw_reading(json_data['dataFrame'])
if len(values) == 1: # this is LoRa Device w/ the turbidity sensor
entity = get_model_water_quality_observed(
turbidity=values[0],
dateObserved=json_data['timestamp'],
meta_datetime=json_data['timestamp'], # only needed for historical data collected offline
)
elif len(values) == 5: # this is the device with 5 water quality sensors
entity = get_model_water_quality_observed(
pH=values[0] / 100 if values[0] else None, # in pH standard unit
temperature=values[1] / 10 if values[1] else None, # in grad C
freeChlorine=values[2] / 100 if values[2] else None, # in mg/L
totalChlorine=values[3] / 100 if values[3] else None, # in mg/L
redox=values[4] / 1000 if values[4] else None, # in volts
dateObserved=json_data['timestamp'],
meta_datetime=json_data['timestamp'],
)
else:
print("(!) WARNING device values are not what was expected in file {}, "
"expected either 1 or 5 values from device, but got {}".format(json_file, values))
continue
data.append(entity)
with open(os.path.join(COLLECTION_DIR, "merged.json.output"), "w") as f:
json.dump(data, f)
r = requests.post(
url=NOTIFICATION_URL,
headers=http_header_post,
json={"subscriptionId": NOTIFICATION_ID, "data": data}
)
if not (200 <= r.status_code < 300):
print('HTTP status code not 2xx, something went wrong..')
print(r.status_code)
print(r.reason)
print(r.json())
else:
print('History has been rewritten! (HTTP response is 2xx)')
# = = = = = = = = MAIN = = = = = = = =
......@@ -230,6 +446,15 @@ def main():
if __name__ == "__main__":
import sys
if 'push-locally-collected' in sys.argv: # call `python3 app.py push-locally-collected`
print("Pushing locally collected data at {} to IoT Platform historical component".format(COLLECTION_DIR))
_update_historical_db_from_local_raw_files()
sys.exit(0)
print("Running app as collector and push to IoT Platform")
# configs
with open(LORA_PLATFORM_SECRET_FILE, 'r') as file:
secret = file.read().replace('\n', '')
assert secret, 'No auth token defined for LoRa platform API'
......@@ -243,7 +468,4 @@ if __name__ == "__main__":
'Accept': 'application/json',
}
print("Starting to collect..")
# init()
main()
......@@ -370,20 +370,23 @@ def post_dca_carouge_watering_sensed_data_payload():
# log reading
_log_level_of_moisture(soil_humidity, sensor_eui)
# push data to context broker
# saves locally and pushed data to FlowerBed and Device entities
resp = {}
resp.update({"fs_dump": _dump_raw_data_to_filesystem()})
resp.update({"update_flowerbed_entities": _update_flowerbed_entity(soil_humidity, sensor_eui)})
# resp.update({"update_device_entities": _update_device_entity(
# deveui=request.json['deveui'],
# comment=None,
# longitude=None,
# latitude=None,
# last_reception=None,
# dataFrame=request.json['dataFrame'],
# dataFrame_rssi=request.json['rssi'],
# dataFrame_timestamp=request.json['timestamp']
# )})
resp.update({"update_flowerbed_entities": _update_flowerbed_entity(
soil_humidity,
sensor_eui
)})
resp.update({"update_device_entities": _update_device_entity(
deveui=request.json['deveui'],
comment=None,
longitude=None,
latitude=None,
last_reception=None,
dataFrame=request.json['dataFrame'],
dataFrame_rssi=request.json['rssi'],
dataFrame_timestamp=request.json['timestamp']
)})
return resp
......
......@@ -3,7 +3,7 @@
FROM python:3.8-slim-buster
MAINTAINER Federico Sismondi <fsismondi@udgalliance.org>
RUN apt-get update
RUN apt-get update && apt install unzip
RUN mkdir /code
WORKDIR /code
......@@ -12,6 +12,7 @@ COPY requirements.txt requirements.txt
RUN pip install --timeout 300 -r requirements.txt
COPY . /code
RUN unzip /code/data_harmonization_tool.zip
CMD ["echo", "please override this CMD with some .sh or .py script call"]
# docker run -it --rm --name dca-carouge-weather --env ORION_HOST=10.81.6.109 naiades-client ./weather_02_get_all_raw_data.py
......
......@@ -41,7 +41,9 @@ def get_ngsiv2_typed_description(val):
return {'type': 'Text', 'value': new_val}
elif isinstance(val, datetime):
# replace +00:00 is the same as Z but Orion doesnt like it :/
return {'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")}
timestamp = str(val.isoformat()).replace("+00:00", "Z")
print("rewriting timestamp to {}".format(timestamp))
return {'type': 'DateTime', 'value': timestamp}
elif isinstance(val, dict):
return {'value': val} # bypassed
elif isinstance(val, list):
......@@ -116,8 +118,8 @@ attrs = {
entities = [
# prediction use case - weather observed (current)
('urn:ngsi-ld:WeatherObserved:WeatherObserved', ['carouge', 'braila']),
('urn:ngsi-ld:WeatherObserved:WeatherObserved-1', ['alicante']),
('urn:ngsi-ld:WeatherObserved:WeatherObserved-2', ['alicante']),
#('urn:ngsi-ld:WeatherObserved:WeatherObserved-1', ['alicante']),
#('urn:ngsi-ld:WeatherObserved:WeatherObserved-2', ['alicante']),
("urn:ngsi-ld:WaterQualityObserved:Fountain-1", ['carouge']),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment