Update fup-anemomoter-user.py

Remove unused debug var : debug_p & debug_flag
Add debug var : debug_json
Check if http authentification is true
This commit is contained in:
Julien Lazarewicz 2024-06-21 19:40:15 +02:00
parent aaddd30df5
commit b53a7b615b

View File

@ -21,10 +21,8 @@ import json
# import argparse # import argparse
import argparse import argparse
debug_p = False
debug_flag = True
debug_args = True debug_args = True
debug_json = True
debug_working_directory = os.path.dirname(os.path.realpath(__file__)) debug_working_directory = os.path.dirname(os.path.realpath(__file__))
prod_flag = False prod_flag = False
@ -34,90 +32,94 @@ meteobridge_template = "cgi-bin/template.cgi?template={%22time%22:%22[DD][MM][YY
def main(meteobridge_url, meteobridge_user, meteobridge_passwd, max_host, max_port): def main(meteobridge_url, meteobridge_user, meteobridge_passwd, max_host, max_port):
http_auth_status = False
oscSender = udp_client.UDPClient(max_host, max_port) oscSender = udp_client.UDPClient(max_host, max_port)
# http log to meteobridge
passman = urllib.request.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, meteobridge_url, meteobridge_user, meteobridge_passwd)
authhandler = urllib.request.HTTPBasicAuthHandler(passman)
opener = urllib.request.build_opener(authhandler)
urllib.request.install_opener(opener)
try:
res = urllib.request.urlopen(meteobridge_url)
except urllib.error.HTTPError as e:
print('HTTP Error code: ', e.code)
raise SystemExit(e)
except urllib.error.URLError as e:
print('URL Error Reason: ', e.reason)
raise SystemExit(e)
else:
res_body = res.read()
print(res_body.decode('utf-8'))
meteobridge_sensor_req = meteobridge_url + meteobridge_template meteobridge_sensor_req = meteobridge_url + meteobridge_template
while True: while True:
# logger.debug("this is a DEBUG message") if http_auth_status == False:
# logger.info("this is an INFO message") # http log to meteobridge
# logger.error("this is an ERROR message") passman = urllib.request.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, meteobridge_url, meteobridge_user, meteobridge_passwd)
authhandler = urllib.request.HTTPBasicAuthHandler(passman)
opener = urllib.request.build_opener(authhandler)
urllib.request.install_opener(opener)
# store the response of URL try:
try: res = urllib.request.urlopen(meteobridge_url)
response = urllib.request.urlopen(meteobridge_sensor_req)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
print('HTTP Error code: ', e.code) print('HTTP Error code: ', e.code)
raise SystemExit(e) raise SystemExit(e)
except urllib.error.URLError as e: except urllib.error.URLError as e:
print('URL Error Reason: ', e.reason) print('URL Error Reason: ', e.reason)
raise SystemExit(e) raise SystemExit(e)
# storing the JSON response else:
# from url in data res_body = res.read()
try: print(res_body.decode('utf-8'))
data_json = json.loads(response.read()) http_auth_status = True
print(data_json)
wind_speed = float(data_json["speed"])
wind_lastspeed = float(data_json["lastspeed"])
wind_batt = int(data_json["battery"])
wind_age = int(data_json["wind-age"])
wind_signal = int(data_json["signal"])
except json.JSONDecodeError as e:
print("Invalid JSON syntax:", e)
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(1)
oscSender.send(msg.build())
# print the json response
# print(data_json)
else: else:
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/speed") try:
msg.add_arg(wind_lastspeed) response = urllib.request.urlopen(meteobridge_sensor_req)
oscSender.send(msg.build())
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/battery") except urllib.error.HTTPError as e:
msg.add_arg(wind_batt) print('HTTP Error code: ', e.code)
oscSender.send(msg.build()) msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(1)
oscSender.send(msg.build())
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/age") except urllib.error.URLError as e:
msg.add_arg(wind_age) print('URL Error Reason: ', e.reason)
oscSender.send(msg.build()) msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(1)
oscSender.send(msg.build())
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/signal") # storing the JSON response
msg.add_arg(wind_signal) # from url in data
oscSender.send(msg.build()) try:
data_json = json.loads(response.read())
if debug_json == True:
print(data_json)
wind_speed = float(data_json["speed"])
wind_lastspeed = float(data_json["lastspeed"])
wind_batt = int(data_json["battery"])
wind_age = int(data_json["wind-age"])
wind_signal = int(data_json["signal"])
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error") except json.JSONDecodeError as e:
msg.add_arg(0) print("Invalid JSON syntax:", e)
oscSender.send(msg.build()) msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(1)
oscSender.send(msg.build())
# print the json response
# print(data_json)
else:
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/speed")
msg.add_arg(wind_lastspeed)
oscSender.send(msg.build())
time.sleep(1) msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/battery")
msg.add_arg(wind_batt)
oscSender.send(msg.build())
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/age")
msg.add_arg(wind_age)
oscSender.send(msg.build())
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/signal")
msg.add_arg(wind_signal)
oscSender.send(msg.build())
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(0)
oscSender.send(msg.build())
time.sleep(1)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="FUP anemometer for OSC") parser = argparse.ArgumentParser(description="FUP anemometer for OSC")