125 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			125 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import argparse
 | 
						|
import configparser
 | 
						|
import json
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import threading
 | 
						|
 | 
						|
import paho.mqtt.subscribe as mqtt
 | 
						|
 | 
						|
from torchsub import torch_sub_webserver
 | 
						|
 | 
						|
database_filename = "clients.json"
 | 
						|
database_lock = threading.Lock()
 | 
						|
 | 
						|
 | 
						|
# noinspection PyUnusedLocal
 | 
						|
def update_client_record(client, userdata, message):
 | 
						|
    database_lock.acquire()
 | 
						|
 | 
						|
    with open(database_filename, 'r') as infile:
 | 
						|
        database = json.load(infile)
 | 
						|
 | 
						|
    payload = message.payload.decode('utf-8')
 | 
						|
    response = json.loads(payload)
 | 
						|
 | 
						|
    database[response['clientId']] = response
 | 
						|
 | 
						|
    with open(database_filename, 'w') as outfile:
 | 
						|
        json.dump(database, outfile)
 | 
						|
 | 
						|
    database_lock.release()
 | 
						|
 | 
						|
 | 
						|
def subscribe(broker_hostname, broker_port, topic="torch", tls=None):
 | 
						|
    mqtt.callback(update_client_record,
 | 
						|
                  topic,
 | 
						|
                  hostname=broker_hostname,
 | 
						|
                  port=broker_port,
 | 
						|
                  tls=tls)
 | 
						|
 | 
						|
 | 
						|
class Config:
 | 
						|
    def __init__(self):
 | 
						|
        self.broker_hostname = None
 | 
						|
        self.broker_port = None
 | 
						|
        self.topic = None
 | 
						|
        self.tls = None
 | 
						|
        self.database_file = database_filename
 | 
						|
        self.web_port = 8080
 | 
						|
        parser = self.do_cli_argument_parsing()
 | 
						|
        (config_path, config_filename) = self.get_config_path(parser.parse_args())
 | 
						|
        print("Using torch configuration path: " + config_path)
 | 
						|
        self.read_configuration_file(config_path, config_filename)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def do_cli_argument_parsing():
 | 
						|
        parser = argparse.ArgumentParser(description='Simple Torch Flat-file Database Subscriber')
 | 
						|
        parser.add_argument('--config-dir',
 | 
						|
                            nargs='?',
 | 
						|
                            dest='configPath',
 | 
						|
                            default='/etc/torch',
 | 
						|
                            help='configuration directory (default: /etc/torch)')
 | 
						|
        parser.add_argument('--config-filename',
 | 
						|
                            nargs='?',
 | 
						|
                            dest='configFilename',
 | 
						|
                            default='torch-sub.conf',
 | 
						|
                            help='configuration filename (default: torch-sub.conf)')
 | 
						|
        return parser
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_config_path(args):
 | 
						|
        config_path = args.configPath
 | 
						|
        if "TORCH_CONFIG_DIR" in os.environ:
 | 
						|
            config_path = os.environ.get("TORCH_CONFIG_DIR")
 | 
						|
        if not config_path.endswith("/"):
 | 
						|
            config_path = config_path + "/"
 | 
						|
        return config_path, args.configFilename
 | 
						|
 | 
						|
    def read_configuration_file(self, config_dir, config_filename):
 | 
						|
        config_file_path = os.path.join(config_dir, config_filename)
 | 
						|
        if not os.path.exists(config_file_path):
 | 
						|
            print("Unable to locate config file at '%s'" % config_file_path)
 | 
						|
            sys.exit(1)
 | 
						|
        config = configparser.ConfigParser()
 | 
						|
        config.read(config_file_path)
 | 
						|
        mqtt_config = config['mqtt']
 | 
						|
        self.broker_hostname = mqtt_config.get('BrokerHost', fallback="localhost")
 | 
						|
        self.broker_port = mqtt_config.getint('BrokerPort', fallback=1883)
 | 
						|
        self.topic = mqtt_config.get('Topic', fallback="torch/+/onion_url")
 | 
						|
        require_certificate = mqtt_config.getboolean('RequireCertificate', fallback=False)
 | 
						|
        ca_file = config_dir + mqtt_config.get('CaFile', "")
 | 
						|
        cert_file = config_dir + mqtt_config.get('CertFile', "")
 | 
						|
        key_file = config_dir + mqtt_config.get('KeyFile', "")
 | 
						|
        self.tls = None
 | 
						|
        if require_certificate:
 | 
						|
            self.tls = {
 | 
						|
                'ca_certs': ca_file,
 | 
						|
                'certfile': cert_file,
 | 
						|
                'keyfile': key_file
 | 
						|
            }
 | 
						|
        if config.has_section('database'):
 | 
						|
            self.database_file = config['database'].get('Filename', fallback=database_filename)
 | 
						|
        if config.has_section('web'):
 | 
						|
            self.web_port = config['web'].getint('Port', fallback=self.web_port)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    config = Config()
 | 
						|
 | 
						|
    global database_filename
 | 
						|
    database_filename = config.database_file
 | 
						|
 | 
						|
    if not os.path.exists(database_filename):
 | 
						|
        with open(database_filename, 'w') as database_blank:
 | 
						|
            json.dump({}, database_blank)
 | 
						|
 | 
						|
    threading.Thread(target=torch_sub_webserver.app.run,
 | 
						|
                     args=("localhost", config.web_port),
 | 
						|
                     daemon=True).start()
 | 
						|
 | 
						|
    subscribe(config.broker_hostname,
 | 
						|
              config.broker_port,
 | 
						|
              config.topic,
 | 
						|
              config.tls)
 |