FUP-Anemometer/fup_anemometre_daemon.py
Julien Lazarewicz 5590713619 Initial commit
2024-06-12 00:12:41 +02:00

133 lines
3.4 KiB
Python

#!/usr/bin/env python3
"""
OSC/Random Example: send random numbers to OSC.
This example sends a pseudo-random number between 0 and 1024
to the OSC receiver on UDP port 2222.
"""
from pythonosc import udp_client
from pythonosc import osc_message_builder
import time
import signal
import random
import sys
import os
import time
import argparse
import logging
import daemon
from daemon import pidfile
# import urllib library
from urllib.request import urlopen
# import json
import json
debug_p = False
debug_flag = True
debug_working_directory = os.path.dirname(os.path.realpath(__file__))
prod_flag = False
prod_working_directory = "/usr/local/com.fup.anemometre_daemon"
# SIGINT, SIGKILL, SIGTERM, SIGTSTP
def rcv_SIGNAL(signum, frame, logf):
print(signum)
def check_anemometer(logf, inf):
### This does the "work" of the daemon
logger = logging.getLogger('fup_anemo_daemon')
logger.setLevel(logging.INFO)
fh = logging.FileHandler(logf)
fh.setLevel(logging.INFO)
formatstr = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(formatstr)
fh.setFormatter(formatter)
logger.addHandler(fh)
oscSender = udp_client.UDPClient("localhost", 2222)
# store the URL in url as
# parameter for urlopen
url = inf
while True:
# logger.debug("this is a DEBUG message")
# logger.info("this is an INFO message")
# logger.error("this is an ERROR message")
# store the response of URL
response = urlopen(url)
# storing the JSON response
# from url in data
data_json = json.loads(response.read())
# print the json response
# print(data_json)
wind_speed = float(data_json["speed"])
logger.info(wind_speed)
msg = osc_message_builder.OscMessageBuilder(address = "/env_sensors/anemometer")
msg.add_arg(wind_speed)
oscSender.send(msg.build())
time.sleep(5)
def start_daemon(pidf, logf, inf):
### This launches the daemon in its context
global debug_p
if debug_p:
print("fup_anemo_daemon: entered run()")
print("fup_anemo_daemon: pidf = {} logf = {}".format(pidf, logf, inf))
print("fup_anemo_daemon: about to start daemonization")
if debug_flag:
### XXX pidfile is a context
with daemon.DaemonContext(
#working_directory='/var/lib/fup_anemo_daemon',
working_directory=debug_working_directory,
umask=0o002,
pidfile=pidfile.TimeoutPIDLockFile(pidf),
# SIGINT, SIGKILL, SIGTERM, SIGTSTP
signal_map={
signal.SIGTERM: rcv_SIGNAL,
signal.SIGTSTP: rcv_SIGNAL
}
) as context:
check_anemometer(logf, inf)
if prod_flag:
with daemon.DaemonContext(
#working_directory='/var/lib/fup_anemo_daemon',
working_directory=prod_working_directory,
umask=0o002,
pidfile=pidfile.TimeoutPIDLockFile(pidf)#,
# SIGINT, SIGKILL, SIGTERM, SIGTSTP
# signal_map={
# signal.SIGTERM: rcv_SIGNAL,
# signal.SIGTSTP: rcv_SIGNAL,
# signal.SIGINT: rcv_SIGNAL,
# signal.SIGKILL: rcv_SIGNAL
# }
) as context:
check_anemometer(logf, inf)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Example daemon in Python")
parser.add_argument('-p', '--pid-file', default='fup_anemo_daemon.pid')
parser.add_argument('-l', '--log-file', default='fup_anemo_daemon.log')
parser.add_argument('-i', '--input-file', default='http://localhost/anemometer.json')
args = parser.parse_args()
start_daemon(pidf=args.pid_file, logf=args.log_file, inf=args.input_file)