Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
naiades
naiades-platform-poc
Commits
e2d51527
Commit
e2d51527
authored
Aug 27, 2021
by
Cédric Crettaz
🖥
Browse files
Upload New File.
parent
82c310d9
Changes
1
Hide whitespace changes
Inline
Side-by-side
WaterQualityPredictions/waterQualityForecast.py
0 → 100644
View file @
e2d51527
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Dec 7 07:18:59 2020
@author: juanfernandez
"""
import
pandas
as
pd
import
requests
import
matplotlib.pylab
as
plt
import
seaborn
as
sns
from
sklearn.ensemble
import
RandomForestRegressor
from
sklearn.model_selection
import
RandomizedSearchCV
,
GridSearchCV
from
sklearn.model_selection
import
train_test_split
from
sklearn.metrics
import
mean_absolute_error
,
mean_squared_error
,
explained_variance_score
,
r2_score
import
numpy
as
np
from
sklearn.preprocessing
import
MinMaxScaler
,
Normalizer
,
RobustScaler
,
StandardScaler
#import tensorflow as tf
# from tensorflow import keras
import
matplotlib.pylab
as
plt
from
sklearn.linear_model
import
LinearRegression
import
joblib
from
datetime
import
datetime
,
timedelta
from
pytz
import
timezone
import
ast
# Imports for webserver
from
flask
import
Flask
from
flask
import
request
import
logging
app
=
Flask
(
__name__
)
logger
=
logging
.
getLogger
(
__name__
)
def
subscribe_Fountains
():
# url = "http://5.53.108.182:1026"
url
=
"http://5.53.108.182/context-api"
http_header_post
=
{
'Fiware-Service'
:
'carouge'
,
'Content-Type'
:
'application/json'
,
}
# !!!!!!!!!!!!!!PONER IP de localhost cuando enviemos docker a plataforma
# Poner mi ip
# SUBSCRIPTION_URL = 'http://170.253.23.106:5000/fountains_water_quality'
SUBSCRIPTION_URL
=
'http://170.253.17.217:5000/fountains_water_quality'
data
=
'''{
"description": "A subscription to subscribe to WaterQualityObserved:WaterQualityObserved",
"subject": {
"entities": [
{
"idPattern": ".*",
"type": "WaterQualityObserved"
}
],
"condition": {
"attrs": []
}
},
"notification": {
"http": {
"url": "'''
+
SUBSCRIPTION_URL
+
'''"
},
"attrs": []
}
}'''
response
=
requests
.
post
(
url
=
url
+
'/v2/subscriptions/'
,
headers
=
http_header_post
,
data
=
data
)
return
response
# Function to check that the predictions are pushed correctly to the Context Manager
def
read_updated_fountains_predictions
():
# url = "http://5.53.108.182:1026"
url
=
"http://5.53.108.182/context-api"
http_header_post
=
{
'Fiware-Service'
:
'carouge'
,
'Content-Type'
:
'application/json'
,
'Accept'
:
'application/json'
,
# 'X-Auth-Token': token, # Los socios decidieron no usar tokens desde dentro de la plataforma
}
response
=
requests
.
get
(
url
=
url
+
'/v2/entities/'
+
'urn:ngsi-ld:WaterQualityForecast:Fountain-1'
,
headers
=
http_header_post
,
data
=
JSONdata
)
print
(
response
.
content
)
# Reads the last 'samples' of water quality parameters measured at the fountain
def
read_historical_fountains
(
samples
):
# To read data initially
# url = "http://5.53.108.182:8668"
url
=
"http://5.53.108.182/time-series-api"
http_header_post
=
{
'Fiware-Service'
:
'carouge'
,
'Content-Type'
:
'application/json'
,
'Accept'
:
'application/json'
,
'Fiware-ServicePath'
:
'/'
,
}
# build header for GET
http_header_get
=
http_header_post
.
copy
()
http_header_get
.
pop
(
'Content-Type'
)
# response = requests.get(url=url + '/v2/entities/urn:ngsi-ld:WaterQualityObserved:Fountain-1?fromDate=2020-10-15T15:35:00&toDate=2020-11-05T15:35:00', headers=http_header_get)
response
=
requests
.
get
(
url
=
url
+
'/v2/entities/urn:ngsi-ld:WaterQualityObserved:Fountain-1?fromDate=2020-11-05T15:35:00&toDate=2020-12-15T15:35:00'
,
headers
=
http_header_get
)
data
=
response
.
json
()
df
=
pd
.
DataFrame
([
x
[
'values'
]
for
x
in
data
[
'attributes'
]])
df
=
df
.
T
df
.
columns
=
[
x
[
'attrName'
]
for
x
in
data
[
'attributes'
]]
# To prepare data initially
# index = [i for i in range(len(df.dateObserved)) if i%2 == 0]
#df_training = df_training.iloc[index,:]
values
=
[
'freeChlorine'
,
'pH'
,
'temperature'
,
'totalChlorine'
,
'turbidity'
,
'redox'
,
'chlorateEstimation'
]
df_data
=
df
[
values
].
astype
(
'float'
)
df_data
=
df_data
.
dropna
()
df_data
.
index
=
df
[
'dateObserved'
]
# madrid = timezone('Europe/Madrid')
# timestamp = madrid.localize(datetime.strptime(df['dateObserved'][0],'%Y-%m-%dT%H:%M:%S.%f')).astimezone(timezone('utc')).strftime('%Y-%m-%dT%H:%M:%SZ')
# madrid.localize(datetime.strptime(Turbidity_IN.iloc[0]['Timestamp'],'%Y-%m-%d %H:%M:%S')).astimezone(timezone('utc')).strftime('%Y-%m-%dT%H:%M:%SZ')
#
return
df_data
def
token_request
():
#### The token is only asked once
# url = 'http://5.53.108.182:3005'
url
=
'http://5.53.108.182/identity-api'
http_token_post
=
{
'Accept'
:
'application/json'
,
'Authorization'
:
'Basic NDU3ODhiM2YtMzRjNy00YThlLTkwZGMtZGZiODdlOGFkMGNjOjVmMmI0YTQ5LTJkMDUtNDQ2Ny04NDQ4LTI1ZDA0OWQwMzQ5OQ=='
,
'Content-Type'
:
'application/x-www-form-urlencoded'
,
}
# User and password are required
# Currently we use standard, but new are required
user
=
"city-pilot-1@example.com"
pwd
=
"test"
result
=
requests
.
post
(
url
=
url
+
'/oauth2/token'
,
headers
=
http_token_post
,
data
=
"username="
+
user
+
"&password="
+
pwd
+
"&grant_type=password"
)
sol
=
result
.
content
sol
=
ast
.
literal_eval
(
sol
.
decode
(
'utf-8'
))
token
=
sol
[
'access_token'
]
return
token
def
data_models
(
dateObserved
,
freeChlorine
,
pH
,
temperature
,
totalChlorine
):
#, turbidity):
# Change all dates to UTC
madrid
=
timezone
(
'Europe/Madrid'
)
freeChlorine
=
pd
.
DataFrame
(
freeChlorine
,
columns
=
[
'freeChlorinePrediction'
],
index
=
[
'value'
])
pH
=
pd
.
DataFrame
(
pH
,
columns
=
[
'pHPrediction'
],
index
=
[
'value'
])
# temperature = pd.DataFrame(temperature, columns = ['temperature'], index = ['value'])
totalChlorine
=
pd
.
DataFrame
(
totalChlorine
,
columns
=
[
'totalChlorinePrediction'
],
index
=
[
'value'
])
data
=
freeChlorine
.
join
([
pH
,
totalChlorine
])
print
(
'Date '
)
print
(
dateObserved
)
# dateObservedUTC = madrid.localize(datetime.strptime(dateObserved,'%Y-%m-%dT%H:%M:%S.%fZ')).astimezone(timezone('utc'))
dateObservedUTC
=
datetime
.
strptime
(
dateObserved
,
'%Y-%m-%dT%H:%M:%S.%fZ'
)
dateObservedUTC_1
=
(
dateObservedUTC
+
timedelta
(
days
=
1
)).
strftime
(
'%Y-%m-%dT%H:%M:%S.%fZ'
)
print
(
'Date2- today '
)
print
(
dateObservedUTC
.
strftime
(
'%Y-%m-%dT%H:%M:%S.%fZ'
))
print
(
'Date2+1 '
)
print
(
dateObservedUTC_1
)
data
[
'validFrom'
]
=
dateObservedUTC
.
strftime
(
'%Y-%m-%dT%H:%M:%S.%fZ'
)
data
[
'validTo'
]
=
dateObservedUTC_1
data
.
loc
[
'type'
,:]
=
np
.
nan
data
.
loc
[
'type'
,
'validFrom'
]
=
'DateTime'
data
.
loc
[
'type'
,
'validTo'
]
=
'DateTime'
# Reordenamos datos
# data = data[['dateObserved','freeChlorine','pH','temperature','totalChlorine']]
print
(
data
)
# data.loc['metadata'] = data.loc['metadata'].apply(lambda x: {'timestamp':{"value": x, "type":"DateTime"}})
# data = data.drop('unit',axis = 0)
# data = data.drop('conductivity', axis = 1)
dataJSON
=
data
.
apply
(
lambda
x
:
[
x
.
dropna
()],
axis
=
0
).
to_json
()
dataJSON
=
dataJSON
.
replace
(
'}]'
,
'}'
)
dataJSON
=
dataJSON
.
replace
(
'[{'
,
'{'
)
print
(
dataJSON
)
return
dataJSON
,
data
#def send_context(token,JSONdata,entityID): # Los socios decidieron no usar tokens desde dentro de la plataforma
def
send_context
(
JSONdata
,
entityID
):
# url = "http://5.53.108.182:1026"
# Send directly to the context broker
url
=
"http://5.53.108.182/context-api"
# Send through the data validator
# url = "http://5.53.108.182:5002/validation"
http_header_post
=
{
'Fiware-Service'
:
'carouge'
,
'Content-Type'
:
'application/json'
,
'Accept'
:
'application/json'
,
# 'X-Auth-Token': token, # Los socios decidieron no usar tokens desde dentro de la plataforma
}
response
=
requests
.
patch
(
url
=
url
+
'/v2/entities/'
+
entityID
+
'/attrs'
,
headers
=
http_header_post
,
data
=
JSONdata
)
# Communication state
print
(
response
.
content
)
def
main
(
datas
):
# Load model and scaller
Modelname
=
'models/fountains/model_linear_regression_201207.sav'
model
=
joblib
.
load
(
Modelname
)
xscalername
=
'models/fountains/xscaler_linear_regression_201207.sav'
scaler
=
joblib
.
load
(
xscalername
)
# Subscribe for input data
# On this ocassion we will just read the desired values from the historical data base
# Since there is not new data being uploaded to the platform
#data = subscribe_Fountains()
x
=
scaler
.
transform
(
datas
)
# By now we ignore Turbidity since that value is not yet meassured
x
=
x
[:,:
-
1
]
prediction
=
model
.
predict
(
x
)
freeChlorine
=
prediction
[:,
0
]
pH
=
prediction
[:,
1
]
temperature
=
prediction
[:,
1
]
totalChlorine
=
prediction
[:,
2
]
# turbidity = prediction[:,3]
print
(
totalChlorine
)
# In the future version we will check it it is possible to send the reliability of the
# measurement (%)
# probabilities = model.predict_proba(x)
# Send data to the platform
# First, the data model is created
JSONdata
,
datos
=
data_models
(
datas
.
index
[
0
],
freeChlorine
,
pH
,
temperature
,
totalChlorine
)
#turbidity)
# Second, the token is created
# token = token_request() # Los socios decidieron no usar tokens desde dentro de la plataforma
# Them it is sent to the context broker
entityID
=
'urn:ngsi-ld:WaterQualityForecast:Fountain-1'
# send_context(token,JSONdata,entityID) # Los socios decidieron no usar tokens desde dentro de la plataforma
send_context
(
JSONdata
,
entityID
)
@
app
.
route
(
'/'
)
@
app
.
route
(
'/healthcheck'
)
def
healthcheck
():
return
'This service is up and running!'
@
app
.
route
(
'/fountains_water_quality'
,
methods
=
[
'POST'
])
def
fountains_water_quality
():
# if request.data:
# app.logger.info("Request data: %s" % request.data)
if
request
.
json
:
wqo
=
request
.
json
[
'data'
]
print
(
'this '
)
df
=
pd
.
DataFrame
(
wqo
[
0
])
wqo_df
=
df
.
loc
[[
'value'
],[
'freeChlorine'
,
'pH'
,
'temperature'
,
'totalChlorine'
,
'turbidity'
]]
wqo_df2
=
df
.
loc
[[
'value'
],[
'freeChlorine'
,
'pH'
,
'temperature'
,
'totalChlorine'
,
'turbidity'
,
'redox'
,
'chlorateEstimation'
]]
wqo_df
.
index
=
[
df
.
loc
[
'value'
,
'dateObserved'
]]
wqo_df2
.
index
=
[
df
.
loc
[
'value'
,
'dateObserved'
]]
print
(
wqo_df
)
print
(
'taquito'
)
# print(wqo_df2)
print
(
wqo_df2
.
iloc
[:,:
-
2
])
print
(
'taqui'
)
# app.logger.info("Got new water quality observed value from Carouge Fountain: %s" % wqo)
main
(
wqo_df2
.
iloc
[:,:
-
2
])
return
'Got POST for /fountains_water_quality, with body %s'
%
request
.
form
if
__name__
==
'__main__'
:
app
.
run
(
host
=
"0.0.0.0"
,
debug
=
True
,
port
=
5000
)
# # Load model and scaller
# Modelname = 'models/fountains/model_linear_regression_201207.sav'
# model = joblib.load(Modelname)
# xscalername = 'models/fountains/xscaler_linear_regression_201207.sav'
# scaler = joblib.load(xscalername)
#
# # Subscribe for input data
# # On this ocassion we will just read the desired values from the historical data base
# # Since there is not new data being uploaded to the platform
# #data = subscribe_Fountains()
# datas = read_historical_fountains(1)
# x = scaler.transform(datas)
#
# # By now we ignore Turbidity since that value is not yet meassured
# x = x[:,:-1]
#
# prediction = model.predict(x)
#
# freeChlorine = prediction[:,0]
# pH = prediction[:,1]
# temperature = prediction[:,1]
# totalChlorine = prediction[:,2]
#In the future version we will check it it is possible to send the reliability of the
# measurement (%)
# probabilities = model.predict_proba(x)
# turbidity = prediction[:,3]
# Send data to the platform
# First, the data model is created
# JSONdata, datos = data_models(datas.index[0],freeChlorine, pH, temperature, totalChlorine)#turbidity)
# # Second, the token is created
# token = token_request()
# # Them it is sent to the context broker
# entityID='urn:ngsi-ld:WaterQualityForecast:Fountain-1'
# send_context(token,JSONdata,entityID)
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment