Compare commits
	
		
			4 Commits
		
	
	
		
			ab1434b7a5
			...
			9898104e25
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 9898104e25 | |||
| 2523a2da6d | |||
| 70eb98da76 | |||
| 0a93abe30f | 
|  | @ -62,14 +62,14 @@ class SyncthingMonitor: | ||||||
|         self.syncthing.disable_announce_discovery_and_relay() |         self.syncthing.disable_announce_discovery_and_relay() | ||||||
|         self.syncthing.sync_config() |         self.syncthing.sync_config() | ||||||
| 
 | 
 | ||||||
|         print("Registering etcd update callback...", flush=True) |  | ||||||
|         self.etcd.register_device_update_handler(self.update_devices) |  | ||||||
| 
 |  | ||||||
|         print("Attempting to add this device to etcd...", flush=True) |         print("Attempting to add this device to etcd...", flush=True) | ||||||
|         self.etcd.add_device_to_cluster(self.my_device_id, self.syncthing_node_name, self.syncthing_publish_address) |         self.etcd.add_device_to_cluster(self.my_device_id, self.syncthing_node_name, self.syncthing_publish_address) | ||||||
|         print("Added this device to cluster with publish address: {0}".format(self.syncthing_publish_address), |         print("Added this device to cluster with publish address: {0}".format(self.syncthing_publish_address), | ||||||
|               flush=True) |               flush=True) | ||||||
| 
 | 
 | ||||||
|  |         print("Registering etcd update callback...", flush=True) | ||||||
|  |         self.etcd.register_device_update_handler(self.update_devices) | ||||||
|  | 
 | ||||||
|         self.update_devices(None) |         self.update_devices(None) | ||||||
| 
 | 
 | ||||||
|         print("Entering loop...", flush=True) |         print("Entering loop...", flush=True) | ||||||
|  | @ -86,11 +86,14 @@ class SyncthingMonitor: | ||||||
|         print("Updating syncthing based on change to cluster info...", flush=True) |         print("Updating syncthing based on change to cluster info...", flush=True) | ||||||
|         print("--> Obtaining updated device list...", flush=True) |         print("--> Obtaining updated device list...", flush=True) | ||||||
|         device_list = self.etcd.get_device_list() |         device_list = self.etcd.get_device_list() | ||||||
|  | 
 | ||||||
|         print("--> Adding new devices to syncthing...", flush=True) |         print("--> Adding new devices to syncthing...", flush=True) | ||||||
|         self.syncthing.add_devices(device_list) |         self.syncthing.add_devices(device_list) | ||||||
|  | 
 | ||||||
|         print("--> Updating shared folder with new devices...", flush=True) |         print("--> Updating shared folder with new devices...", flush=True) | ||||||
|         self.syncthing.create_shared_folder(SHARED_FOLDER_ID, SHARED_FOLDER_LABEL, self.syncthing_data_path, |         self.syncthing.create_shared_folder(SHARED_FOLDER_ID, SHARED_FOLDER_LABEL, self.syncthing_data_path, | ||||||
|                                             device_list) |                                             device_list) | ||||||
|  | 
 | ||||||
|         self.syncthing.sync_config() |         self.syncthing.sync_config() | ||||||
|         self.syncthing.print_config() |         self.syncthing.print_config() | ||||||
|         print("--> Update completed successfully!", flush=True) |         print("--> Update completed successfully!", flush=True) | ||||||
|  |  | ||||||
|  | @ -1,6 +1,7 @@ | ||||||
| import json | import json | ||||||
| 
 | 
 | ||||||
| import etcd3 as etcd3 | import etcd3 as etcd3 | ||||||
|  | from retrying import retry | ||||||
| 
 | 
 | ||||||
| CLUSTER_INFO_KEY = '/syncthing_monitor/cluster_info' | CLUSTER_INFO_KEY = '/syncthing_monitor/cluster_info' | ||||||
| 
 | 
 | ||||||
|  | @ -10,6 +11,7 @@ class EtcdClient: | ||||||
|         self.etcd = etcd3.client(host=host, port=port) |         self.etcd = etcd3.client(host=host, port=port) | ||||||
|         self.key = key |         self.key = key | ||||||
| 
 | 
 | ||||||
|  |     @retry(stop_max_delay=60000, wait_fixed=500) | ||||||
|     def load_cluster_info(self): |     def load_cluster_info(self): | ||||||
|         print("Retrieving cluster info from etcd...", flush=True) |         print("Retrieving cluster info from etcd...", flush=True) | ||||||
|         raw_value = self.etcd.get(self.key)[0] |         raw_value = self.etcd.get(self.key)[0] | ||||||
|  | @ -21,9 +23,19 @@ class EtcdClient: | ||||||
|         with self.etcd.lock('syncthing_monitor'): |         with self.etcd.lock('syncthing_monitor'): | ||||||
|             cluster_info = self.load_cluster_info() |             cluster_info = self.load_cluster_info() | ||||||
| 
 | 
 | ||||||
|             if any(device_id in device['id'] for device in cluster_info['devices']): |             print("Checking for existing device with node name '{0}'...".format(node_name), flush=True) | ||||||
|  |             existing_device = next( | ||||||
|  |                 (device for device in cluster_info['devices'] if node_name in device['node_name']), | ||||||
|  |                 None) | ||||||
|  | 
 | ||||||
|  |             if existing_device is not None: | ||||||
|  |                 self.update_existing_device(cluster_info, existing_device, device_id, node_name, address) | ||||||
|                 return |                 return | ||||||
| 
 | 
 | ||||||
|  |             print( | ||||||
|  |                 "Existing device not found; Creating new cluster_info entry for node name: '{0}'...".format(node_name), | ||||||
|  |                 flush=True) | ||||||
|  | 
 | ||||||
|             new_device = { |             new_device = { | ||||||
|                 'id': device_id, |                 'id': device_id, | ||||||
|                 'node_name': node_name, |                 'node_name': node_name, | ||||||
|  | @ -32,6 +44,22 @@ class EtcdClient: | ||||||
| 
 | 
 | ||||||
|             cluster_info['devices'].append(new_device) |             cluster_info['devices'].append(new_device) | ||||||
| 
 | 
 | ||||||
|  |             self.do_cluster_info_update(cluster_info) | ||||||
|  | 
 | ||||||
|  |     def update_existing_device(self, cluster_info, existing_device, device_id, node_name, address): | ||||||
|  | 
 | ||||||
|  |         if device_id == existing_device['id']: | ||||||
|  |             print("Existing device contains up-to-date ID!  No need to update.".format(node_name), flush=True) | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|  |         print("Existing device with my node name '{0}' contains stale info!  Updating...".format(node_name), flush=True) | ||||||
|  |         existing_device['id'] = device_id | ||||||
|  |         existing_device['address'] = address | ||||||
|  | 
 | ||||||
|  |         self.do_cluster_info_update(cluster_info) | ||||||
|  | 
 | ||||||
|  |     @retry(stop_max_delay=60000, wait_fixed=500) | ||||||
|  |     def do_cluster_info_update(self, cluster_info): | ||||||
|         print("Updating etcd value for '{0}': {1}".format(self.key, json.dumps(cluster_info)), flush=True) |         print("Updating etcd value for '{0}': {1}".format(self.key, json.dumps(cluster_info)), flush=True) | ||||||
|         self.etcd.put(self.key, json.dumps(cluster_info)) |         self.etcd.put(self.key, json.dumps(cluster_info)) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -1,6 +1,7 @@ | ||||||
| import json | import json | ||||||
| 
 | 
 | ||||||
| import requests | import requests | ||||||
|  | from retrying import retry | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class SyncthingClient: | class SyncthingClient: | ||||||
|  | @ -137,11 +138,14 @@ class SyncthingClient: | ||||||
|     def make_url(self, endpoint): |     def make_url(self, endpoint): | ||||||
|         return "http://{0}:{1}{2}".format(self.host, self.port, endpoint) |         return "http://{0}:{1}{2}".format(self.host, self.port, endpoint) | ||||||
| 
 | 
 | ||||||
|  |     @retry(stop_max_delay=60000, wait_fixed=500) | ||||||
|     def get(self, endpoint): |     def get(self, endpoint): | ||||||
|         return requests.get(self.make_url(endpoint), headers=self.headers) |         return requests.get(self.make_url(endpoint), headers=self.headers) | ||||||
| 
 | 
 | ||||||
|  |     @retry(stop_max_delay=60000, wait_fixed=500) | ||||||
|     def post(self, endpoint, data): |     def post(self, endpoint, data): | ||||||
|         return requests.post(self.make_url(endpoint), headers=self.headers, data=data) |         return requests.post(self.make_url(endpoint), headers=self.headers, data=data) | ||||||
| 
 | 
 | ||||||
|  |     @retry(stop_max_delay=60000, wait_fixed=500) | ||||||
|     def patch(self, endpoint, data): |     def patch(self, endpoint, data): | ||||||
|         return requests.patch(self.make_url(endpoint), headers=self.headers, data=data) |         return requests.patch(self.make_url(endpoint), headers=self.headers, data=data) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user