Commit 5d62ab97 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'weather_fixes' into 'master'

Feat: now app collects, transforms and pushes data to platform. It handles...

See merge request !21
parents d8f963f0 c94dae70
......@@ -17,15 +17,19 @@ import base64
import time
import dateutil.parser
import datetime
import logging
from geojson import Point
from flask import Flask, jsonify
from flask import request
import requests as client_request
app = Flask(__name__, )
print("Starting to collect..")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
# get config from environment
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='raw_data')
URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost'))
......
......@@ -15,5 +15,5 @@ COPY . /code
RUN unzip /code/data_harmonization_tool.zip
CMD ["echo", "please override this CMD with some .sh or .py script call"]
# docker run -it --rm --name dca-carouge-weather --env ORION_HOST=10.81.6.109 naiades-client ./weather_02_get_all_raw_data.py
# docker run -it --rm --name dca-carouge-weather --env ORION_HOST=10.81.6.109 naiades-client ./security_01_get_token_with_password.sh
#!/usr/bin/env python3
"""
NOTES:
- id and type of message sent should follow convention defined for URN in spreadsheet
https://telecombretagneeu-my.sharepoint.com/:x:/g/personal/federico_sismondi_telecom-bretagne_eu/Ebw_b9iHempGoSPEz__sYCcB_qPu0pwxDAgAeKXFoMu6cA?e=6uuUGD
"""
#!/usr/bin/env python3
import re
import os
import time
import json
import requests as client_request
import logging
import datetime
from dateutil import parser
from geojson import Point
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logger = logging.getLogger()
# config environment
URL_weather_service = 'https://www.prevision-meteo.ch/services/json/carouge'
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='raw_data')
DEFAULT_MERGED_DATE_FILEPATH= os.path.join(COLLECTION_DIR, "merged.json")
URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost'))
POST_DATA_PERIOD = int(os.getenv('POST_DATA_PERIOD', default='1800')) # in seconds
# URN
URN_WEATHER_ENTITY = "urn:ngsi-ld:WeatherObserved:WeatherObserved"
# NOTIF info for pushing to historical db (QuantumLeap)
NOTIFICATION_URL = 'http://{}:8668/v2/notify'.format(os.getenv('ORION_HOST', default='localhost'))
NOTIFICATION_ID = '5f8f021eda7e7c9ac9757a76'
logger.info('Config: \n\tPOST_DATA_PERIOD: {} seconds \n\tURL: {}'.format(POST_DATA_PERIOD, URL_BASE))
fiware_data_header = {
'id': 'urn:ngsi-ld:WeatherObserved:WeatherObserved',
'type': 'WeatherObserved'
}
# build header for POST
http_header_post = {
'Fiware-Service': 'carouge',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# build header for GET
http_header_get = {
'Fiware-Service': 'carouge',
'Accept': 'application/json',
}
# build header for POST
http_header_post = {
'Fiware-Service': 'carouge',
'Fiware-ServicePath': '/',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
def get_ngsiv2_typed_description(val, force_date_modified=None):
"""This function updates the attributes data format and meta data for aligning it to ngsi v2
(!) there are several copies of this function, please keep them all updated
or else start importing of create a package and install it properly
:param val: Value to be added to entity as attribute
:param force_date_modified: Optional datetime for updating dateModified meta data of attribute
:return: Dictionary of attribute typed and formated as NGSI-v2
"""
ret = {}
if force_date_modified and isinstance(force_date_modified, datetime.datetime):
# adds datetime metadata
ret.update(
{"metadata":
{"dateModified":
{"type": "DateTime", "value": str(force_date_modified.isoformat()).replace("+00:00", "Z")}
}
}
)
if force_date_modified and isinstance(force_date_modified, str):
# adds datetime metadata
ret.update(
{"metadata":
{"dateModified": {"type": "DateTime", "value": force_date_modified}}
}
)
assert "Z" in force_date_modified, "Only Zulu time is accepted"
if isinstance(val, Point):
ret.update({'type': 'geo:json', 'value': dict(val)})
elif isinstance(val, int) or isinstance(val, float):
ret.update({'type': 'Number', 'value': val})
elif isinstance(val, str):
pat = r'[<>"\'=;()]' # see https://fiware-orion.readthedocs.io/en/master/user/forbidden_characters/index.html
r = re.compile(pat)
if r.search(val) is None:
new_val = val
else:
new_val = re.sub(pat, '', val)
print("Special char(s) deleted. This <{}> turned into this <{}>".format(val, new_val))
ret.update({'type': 'Text', 'value': new_val})
elif isinstance(val, datetime.datetime):
# replace +00:00 is the same as Z but Orion doesnt like it :/
ret.update({'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")})
elif isinstance(val, dict):
ret.update({'value': val}) # bypassed
elif isinstance(val, list):
ret.update({'value': val}) # bypassed
elif isinstance(val, type(None)):
ret.update({'type': 'Text', 'value': None})
else:
raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
return ret
def update_weather_observed_entity(entity):
"""Posts Weather data to Context Manager (Orion)
"""
# id and type attrs are not needed when patching entity
entity.pop('id')
entity.pop('type')
url = "{}/v2/entities/{}/attrs".format(URL_BASE, URN_WEATHER_ENTITY)
r = client_request.patch(
url=url,
headers=http_header_post,
json=entity
)
if not (200 <= r.status_code < 300):
logger.error('HTTP status code not 2xx, something went wrong..')
logger.error(r.status_code)
logger.error(r.reason)
logger.error(r.json())
return "Got error {}".format(r.reason)
else:
logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
return "ok"
def write_weather_observed_entity_to_historical_db(data):
"""Posts data to Quantum Leap API directly as notification (without passing through Context Manager)
"""
r = client_request.post(
url=NOTIFICATION_URL,
headers=http_header_post,
json={"subscriptionId": NOTIFICATION_ID, "data": data}
)
if not (200 <= r.status_code < 300):
logger.error('HTTP status code not 2xx, something went wrong..')
logger.error(r.status_code)
logger.error(r.reason)
logger.error(r.json())
else:
logger.info('History has been rewritten! (HTTP response is 2xx)')
def convert_raw_to_weather_data_model_with_disy_module(input_raw_filepath, output_filepath):
"""Uses DISY module for the weather data raw file -> WeatherObserved data
relative path needs to be passed
:param input_raw_filepath:
:param output_filepath:
:return:
"""
# get data from file
with open(input_raw_filepath, "r") as f:
json_data = json.load(f)
if 'current_condition' not in json_data \
or not json_data['current_condition'] \
or json_data['current_condition'] == '':
logger.error("Found emtpy data in raw json file {}".format(input_raw_filepath))
cmd = "sh data_harmonization_tool/data_harmonization_workflow/data_harmonization_workflow_run.sh " \
"--context_param use_case=carouge " \
"--context_param input_json={input} " \
"--context_param output_json={output} " \
"--context_param id=urn:ngsi-ld:WeatherObserved:WeatherObserved " \
"--context_param datamodel=wo".format(
input=os.path.join(os.getcwd(), input_raw_filepath),
output=os.path.join(os.getcwd(), output_filepath)
)
print(cmd)
return os.system(cmd)
def convert_raw_to_weather_data_model(input_raw_filepath):
"""Converts weather data raw file -> WeatherObserved data
relative path needs to be passed
input file data:
{
"city_info": {
"name": "Carouge",
"country": "Suisse",
"latitude": "46.1838613",
"longitude": "6.1385878",
"elevation": "388",
"sunrise": "06:07",
"sunset": "21:16"
},
"current_condition": {
"date": "24.07.2020",
"hour": "03:00",
"tmp": 19,
"wnd_spd": 8,
"wnd_gust": 13,
"wnd_dir": "S",
"pressure": 1010.7,
"humidity": 74,
"condition": "Nuit nuageuse",
"condition_key": "nuit-nuageuse",
"icon": "https://prevision-meteo.ch/style/images/icon/nuit-nuageuse.png",
"icon_big": "https://prevision-meteo.ch/style/images/icon/nuit-nuageuse-big.png"
}
}
output:
{
"id": "urn:ngsi-ld:WeatherObserved:WeatherObserved",
"type": "WeatherObserved",
"location": {
"type": "Point",
"coordinates": [
46.1838613,
6.1385878
]
},
"dateObserved": "2021-10-07T20:00:00+02:00",
"source": null,
"dewPoint": null,
"temperature": 25.0,
"relativeHumidity": 63.0,
"precipitation": null,
"windDirection": 225.0,
"atmosphericPressure": 1014.5,
"pressureTendency": null,
"solarRadiation": null,
"illuminance": null,
"windSpeed": 11.0,
"vaporPressure": null
}
:param input_raw_filepath:
:return: WeatherObserved data as dict
"""
# precipitation params available at:
# https://www.prevision-meteo.ch/uploads/pdf/recuperation-donnees-meteo.pdf
#
# estimation is based on:
# https://www.pprune.org/tech-log/580855-defining-rainfall-light-moderate-heavy.html
condition_to_precipitation_mapping = {
'neige-faible': 1,
'neige-modere': 4,
'neige-forte': 10,
'averses-de-pluie-faible' : 2,
'averses-de-pluie-moderee': 8,
'averses-de-pluie-forte': 20,
'pluie-et-neige-melee-faible':2,
'pluie-et-neige-melee-moderee': 8,
'pluie-et-neige-melee-forte': 20,
}
# get data from file
with open(input_raw_filepath, "r") as f:
json_data = json.load(f)
# validate raw data
if 'current_condition' not in json_data \
or not json_data['current_condition'] \
or json_data['current_condition'] == '':
logger.error("Found emtpy data in raw json file {}".format(input_raw_filepath))
if 'city_info' not in json_data \
or not json_data['city_info'] \
or json_data['city_info'] == '':
logger.error("Found emtpy data in raw json file {}".format(input_raw_filepath))
city_info = json_data['city_info']
current_condition = json_data['current_condition']
# convert data format
match_date=re.match(r'(\d{2})\.(\d{2})\.(\d{4})',current_condition['date'])
match_time = re.match(r'(\d{2}):(\d{2})', current_condition['hour'])
meta_datetime = datetime.datetime(year=int(match_date[3]),month=int(match_date[2]),day=int(match_date[1]),hour=int(match_time[1]),minute=int(match_time[2]))
# parsing data
precipitation = condition_to_precipitation_mapping[current_condition['condition_key']] if current_condition['condition_key'] in condition_to_precipitation_mapping else 0
location = Point([float(city_info['latitude']), float(city_info['longitude'])])
date_observed = meta_datetime
source = 'https://www.prevision-meteo.ch/services/json/carouge'
temperature = current_condition['tmp']
humidity = current_condition['humidity']
atmospheric_pressure = current_condition['pressure']
windSpeed = current_condition['wnd_spd']
# formatting and returning data
return {
"id": "urn:ngsi-ld:WeatherObserved:WeatherObserved",
"type": "WeatherObserved",
"location": get_ngsiv2_typed_description(location, meta_datetime),
"dateObserved": get_ngsiv2_typed_description(date_observed, meta_datetime),
"source": get_ngsiv2_typed_description(source, meta_datetime),
"temperature": get_ngsiv2_typed_description(temperature, meta_datetime),
"relativeHumidity": get_ngsiv2_typed_description(humidity, meta_datetime),
"precipitation": get_ngsiv2_typed_description(precipitation,meta_datetime),
"windSpeed": get_ngsiv2_typed_description(windSpeed,meta_datetime),
"atmosphericPressure": get_ngsiv2_typed_description(atmospheric_pressure,meta_datetime),
}
def create_data_from_locally_collected_for_historical_db(raw_data_directory=COLLECTION_DIR,
merged_data_temp_filename = DEFAULT_MERGED_DATE_FILEPATH):
"""
Used for pushing locally collected data to historical API service
"""
raw_json_files = [os.path.join(raw_data_directory, x) for x in os.listdir(raw_data_directory)
if x.endswith("json") and 'merged.json' not in x]
data = []
# Generate big fat http data for rewriting the history of WeatherObserved
for raw in raw_json_files:
#logger.info("Parsing raw file {}".format(raw))
data_harmonized=convert_raw_to_weather_data_model(raw)
#logger.info("Parsed to {}".format(data_harmonized))
data.append(data_harmonized)
with open(merged_data_temp_filename, encoding='utf-8', mode='w') as f:
json.dump(data, f)
def get_data_from_weather_service():
"""
:return: Raw data (json as string)
"""
dump_keys = ['city_info', 'current_condition']
logger.info('Querying weather service..')
try:
r = client_request.get(
url=URL_weather_service,
timeout=90,
)
except (client_request.exceptions.ReadTimeout, client_request.exceptions.ConnectTimeout, client_request.exceptions.ConnectionError):
logger.info('Could not weather reach service')
return None
try:
weather_data = {k: v for k, v in r.json().items() if k in dump_keys}
except json.decoder.JSONDecodeError as e:
logger.error("Got something that doesnt look like a json: {}".format(r.content))
return None
return weather_data
def collect():
"""Queries weather service, dumps raw data in FS and returns path to file
:return:
"""
logger.info("Querying weather service..")
weather_data = get_data_from_weather_service()
filepath = os.path.join(COLLECTION_DIR,'{}_weather_raw_data.json'.format(int(time.time())))
with open(filepath, encoding='utf-8', mode='w') as f:
json.dump(
obj=weather_data,
fp=f
)
logger.info("Dumping to FS {}".format(filepath))
return filepath
def main():
# Send updates in a loop
while True:
filepath = collect()
data = convert_raw_to_weather_data_model(filepath)
update_weather_observed_entity(data)
logger.info("Updated successfully WeatherObserved data in IoT Platform")
time.sleep(POST_DATA_PERIOD)
if __name__ == "__main__":
import sys
if 'push-locally-collected' in sys.argv: # call `python3 app.py push-locally-collected`
logger.info("Pushing locally collected data at {} to IoT Platform historical component".format(COLLECTION_DIR))
create_data_from_locally_collected_for_historical_db()
# get data from file
with open(DEFAULT_MERGED_DATE_FILEPATH, "r") as f:
json_data = json.load(f)
write_weather_observed_entity_to_historical_db(json_data)
logger.info("Historical data pushed to to IoT Platform historical component from {}".format(COLLECTION_DIR))
sys.exit(0)
logger.info("Running app as collector and push to IoT Platform")
# this has never been tested, see postman collection for example on getting token
# with open(IOT_PLATFORM_SECRET_FILE, 'r') as file:
# secret = file.read().replace('\n', '')
#
# assert secret, 'No auth token defined for platform API'
#
# http_header_get = {
# 'Fiware-Service': 'carouge',
# 'Accept': 'application/json',
# 'Authorization': secret,
# }
main()
{
"city_info": {
"name": "Carouge",
"country": "Suisse",
"latitude": "46.1838613",
"longitude": "6.1385878",
"elevation": "388",
"sunrise": "05:57",
"sunset": "21:25"
},
"current_condition": {
"date": "15.07.2020",
"hour": "00:00",
"tmp": 17,
"wnd_spd": 7,
"wnd_gust": 11,
"wnd_dir": "SE",
"pressure": 1011.9,
"humidity": 77,
"condition": "Fortement nuageux",
"condition_key": "fortement-nuageux",
"icon": "https://prevision-meteo.ch/style/images/icon/fortement-nuageux.png",
"icon_big": "https://prevision-meteo.ch/style/images/icon/fortement-nuageux-big.png"
}
}
{
"city_info": {
"name": "Carouge",
"country": "Suisse",
"latitude": "46.1838613",
"longitude": "6.1385878",
"elevation": "388",
"sunrise": "08:02",
"sunset": "16:51"
},
"current_condition": {
"date": "07.12.2020",
"hour": "13:00",
"tmp": 3,
"wnd_spd": 6,
"wnd_gust": 16,
"wnd_dir": "O",
"pressure": 997.5,
"humidity": 83,
"condition": "Eclaircies",
"condition_key": "eclaircies",
"icon": "https://prevision-meteo.ch/style/images/icon/eclaircies.png",
"icon_big": "https://prevision-meteo.ch/style/images/icon/eclaircies-big.png"
}
}
{
"city_info": {
"name": "Carouge",
"country": "Suisse",
"latitude": "46.1838613",
"longitude": "6.1385878",
"elevation": "388",
"sunrise": "07:56",
"sunset": "16:53"
},
"current_condition": {
"date": "01.12.2020",
"hour": "06:00",
"tmp": 3,
"wnd_spd": 16,
"wnd_gust": 25,
"wnd_dir": "S",
"pressure": 1017.3,
"humidity": 83,
"condition": "Pluie mod\u00e9r\u00e9e",
"condition_key": "pluie-moderee",
"icon": "https://prevision-meteo.ch/style/images/icon/pluie-moderee.png",
"icon_big": "https://prevision-meteo.ch/style/images/icon/pluie-moderee-big.png"
}
}
This diff is collapsed.
{"raw_data": "b'eyJjaXR5X2luZm8iOiB7Im5hbWUiOiAiQ2Fyb3VnZSIsICJjb3VudHJ5IjogIlN1aXNzZSIsICJsYXRpdHVkZSI6ICI0Ni4xODM4NjEzIiwgImxvbmdpdHVkZSI6ICI2LjEzODU4NzgiLCAiZWxldmF0aW9uIjogIjM4OCIsICJzdW5yaXNlIjogIjA1OjU1IiwgInN1bnNldCI6ICIyMTowOCJ9LCAiY3VycmVudF9jb25kaXRpb24iOiB7ImRhdGUiOiAiMTkuMDUuMjAyMCIsICJob3VyIjogIjE1OjAwIiwgInRtcCI6IDIzLCAid25kX3NwZCI6IDM2LCAid25kX2d1c3QiOiA1OCwgInduZF9kaXIiOiAiTkUiLCAicHJlc3N1cmUiOiAxMDEyLjksICJodW1pZGl0eSI6IDQ2LCAiY29uZGl0aW9uIjogIkVuc29sZWlsbFx1MDBlOSIsICJjb25kaXRpb25fa2V5IjogImVuc29sZWlsbGUiLCAiaWNvbiI6ICJodHRwczovL3d3dy5wcmV2aXNpb24tbWV0ZW8uY2gvc3R5bGUvaW1hZ2VzL2ljb24vZW5zb2xlaWxsZS5wbmciLCAiaWNvbl9iaWciOiAiaHR0cHM6Ly93d3cucHJldmlzaW9uLW1ldGVvLmNoL3N0eWxlL2ltYWdlcy9pY29uL2Vuc29sZWlsbGUtYmlnLnBuZyJ9fQ=='", "source": "https://www.prevision-meteo.ch/services/json/carouge"}
\ No newline at end of file
#!/usr/bin/env python3
"""
Dumps weather data to disk each POST_DATA_PERIOD seconds
"""
import os
import json
import time
import logging
from weather_api import *
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.info("Starting to collect..")
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='.')
def collect():
logger.info("Querying service..")
weather_data = get_data_from_weather_service()
filepath = os.path.join(COLLECTION_DIR,'{}_weather_raw_data.json'.format(int(time.time())))
with open(filepath, encoding='utf-8', mode='w') as f:
json.dump(
obj=weather_data,
fp=f
)
logger.info("Dumping to FS {}".format(filepath))
collect()
# Send updates in a loop
while True:
time.sleep(POST_DATA_PERIOD)
collect()
#!/usr/bin/env python3
"""
Creates entity in IoT platform, and loops for ever sending updates on the weather data for Carouge
"""
import time
import logging
from weather_api import *