import json import etcd3 as etcd3 from retrying import retry CLUSTER_INFO_KEY = '/syncthing_monitor/cluster_info' class EtcdClient: def __init__(self, host, port, key=CLUSTER_INFO_KEY): self.etcd = etcd3.client(host=host, port=port) self.key = key @retry(stop_max_delay=60000, wait_fixed=500) def load_cluster_info(self): print("Retrieving cluster info from etcd...", flush=True) raw_value = self.etcd.get(self.key)[0] if raw_value is None: return {'devices': []} return json.loads(raw_value) def add_device_to_cluster(self, device_id, node_name, address): with self.etcd.lock('syncthing_monitor'): cluster_info = self.load_cluster_info() print("Checking for existing device with node name '{0}'...".format(node_name), flush=True) existing_device = next( (device for device in cluster_info['devices'] if node_name in device['node_name']), None) if existing_device is not None: self.update_existing_device(cluster_info, existing_device, device_id, node_name, address) return print( "Existing device not found; Creating new cluster_info entry for node name: '{0}'...".format(node_name), flush=True) new_device = { 'id': device_id, 'node_name': node_name, 'address': address } cluster_info['devices'].append(new_device) self.do_cluster_info_update(cluster_info) def update_existing_device(self, cluster_info, existing_device, device_id, node_name, address): if device_id == existing_device['id']: print("Existing device contains up-to-date ID! No need to update.".format(node_name), flush=True) return print("Existing device with my node name '{0}' contains stale info! Updating...".format(node_name), flush=True) existing_device['id'] = device_id existing_device['address'] = address self.do_cluster_info_update(cluster_info) @retry(stop_max_delay=60000, wait_fixed=500) def do_cluster_info_update(self, cluster_info): print("Updating etcd value for '{0}': {1}".format(self.key, json.dumps(cluster_info)), flush=True) self.etcd.put(self.key, json.dumps(cluster_info)) def get_device_list(self): device_list = self.load_cluster_info()['devices'] print("Obtained device_list devices from etcd: {0}".format(json.dumps(device_list)), flush=True) return device_list def register_device_update_handler(self, device_update_handler): self.etcd.add_watch_callback(self.key, device_update_handler)