Update fup-anemomoter-user.py

Add OSC errors codes.
This commit is contained in:
Julien Lazarewicz 2024-06-21 20:31:35 +02:00
parent b53a7b615b
commit 876a7da730

View File

@ -21,10 +21,23 @@ import json
# import argparse # import argparse
import argparse import argparse
debug_args = True # Errors codes
debug_json = True OSC_no_error = 0
OSC_error = 1
OSC_meteobridge_auth_error = 10
OSC_meteobridge_url_error = 11
OSC_meteobridge_http_error = 12
OSC_meteobridge_nosensor_error = 20
debug_args_print = True
debug_json_print = True
debug_urllib_print = True
debug_working_directory = os.path.dirname(os.path.realpath(__file__)) debug_working_directory = os.path.dirname(os.path.realpath(__file__))
# define urllib timeout in seconds
url_timeout = 10
prod_flag = False prod_flag = False
prod_working_directory = "/usr/local/com.fup.anemometre_daemon" prod_working_directory = "/usr/local/com.fup.anemometre_daemon"
@ -48,42 +61,63 @@ def main(meteobridge_url, meteobridge_user, meteobridge_passwd, max_host, max_po
urllib.request.install_opener(opener) urllib.request.install_opener(opener)
try: try:
res = urllib.request.urlopen(meteobridge_url) res = urllib.request.urlopen(meteobridge_url, timeout = url_timeout)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
if debug_urllib_print:
print('HTTP Error code: ', e.code) print('HTTP Error code: ', e.code)
raise SystemExit(e) msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(OSC_meteobridge_http_error)
oscSender.send(msg.build())
# Sleep 5 seconds if error
time.sleep(5)
# raise SystemExit(e)
except urllib.error.URLError as e: except urllib.error.URLError as e:
if debug_urllib_print:
print('URL Error Reason: ', e.reason) print('URL Error Reason: ', e.reason)
raise SystemExit(e) msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(OSC_meteobridge_url_error)
oscSender.send(msg.build())
# Sleep 5 seconds if error
time.sleep(5)
# raise SystemExit(e)
else: else:
res_body = res.read() res_body = res.read()
if debug_urllib_print:
print(res_body.decode('utf-8')) print(res_body.decode('utf-8'))
http_auth_status = True http_auth_status = True
# Sleep 2 seconds if no errors
time.sleep(2)
else: else:
try: try:
response = urllib.request.urlopen(meteobridge_sensor_req) response = urllib.request.urlopen(meteobridge_sensor_req, timeout = url_timeout)
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
if debug_urllib_print:
print('HTTP Error code: ', e.code) print('HTTP Error code: ', e.code)
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error") msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(1) msg.add_arg(OSC_meteobridge_http_error)
oscSender.send(msg.build()) oscSender.send(msg.build())
# Sleep 5 seconds if error
time.sleep(5)
except urllib.error.URLError as e: except urllib.error.URLError as e:
if debug_urllib_print:
print('URL Error Reason: ', e.reason) print('URL Error Reason: ', e.reason)
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error") msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(1) msg.add_arg(OSC_meteobridge_url_error)
oscSender.send(msg.build()) oscSender.send(msg.build())
# Sleep 5 seconds if error
time.sleep(5)
# storing the JSON response else:
# from url in data
try: try:
data_json = json.loads(response.read()) data_json = json.loads(response.read())
if debug_json == True: if debug_json_print == True:
print(data_json) print(data_json)
wind_speed = float(data_json["speed"]) wind_speed = float(data_json["speed"])
wind_lastspeed = float(data_json["lastspeed"]) wind_lastspeed = float(data_json["lastspeed"])
@ -92,12 +126,14 @@ def main(meteobridge_url, meteobridge_user, meteobridge_passwd, max_host, max_po
wind_signal = int(data_json["signal"]) wind_signal = int(data_json["signal"])
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
if debug_json_print:
print("Invalid JSON syntax:", e) print("Invalid JSON syntax:", e)
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error") msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(1) msg.add_arg(OSC_meteobridge_nosensor_error)
oscSender.send(msg.build()) oscSender.send(msg.build())
# print the json response # Sleep 5 seconds if error
# print(data_json) time.sleep(5)
else: else:
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/speed") msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/speed")
msg.add_arg(wind_lastspeed) msg.add_arg(wind_lastspeed)
@ -116,10 +152,10 @@ def main(meteobridge_url, meteobridge_user, meteobridge_passwd, max_host, max_po
oscSender.send(msg.build()) oscSender.send(msg.build())
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error") msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer/error")
msg.add_arg(0) msg.add_arg(OSC_no_error)
oscSender.send(msg.build()) oscSender.send(msg.build())
# Sleep 2 seconds if no errors
time.sleep(1) time.sleep(2)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="FUP anemometer for OSC") parser = argparse.ArgumentParser(description="FUP anemometer for OSC")
@ -130,8 +166,8 @@ if __name__ == "__main__":
parser.add_argument('-mp','--max-port', default=2222) parser.add_argument('-mp','--max-port', default=2222)
args = parser.parse_args() args = parser.parse_args()
# if debug_args is True, print arguments # if debug_args_print is True, print arguments
if debug_args: if debug_args_print:
print('Meteobridge URL:', args.url_meteobridge) print('Meteobridge URL:', args.url_meteobridge)
print('Meteobridge User:', args.user_meteobridge) print('Meteobridge User:', args.user_meteobridge)
print('Meteobridge Password:', args.passwd_meteobridge) print('Meteobridge Password:', args.passwd_meteobridge)