import json import ssl import threading import time import unittest from datetime import datetime import paho.mqtt.client as mqtt from torch_sub import torch_sub host = "mqtt.example.com" port = 8883 agent_config_path = "agent-config/" mqtt_ca_file = agent_config_path + "ca.crt" mqtt_cert_file = agent_config_path + "vagrant.crt" mqtt_key_file = agent_config_path + "vagrant.key" subscriber_config_path = "subscriber-config/" subscriber_ca_file = subscriber_config_path + "ca.crt" subscriber_cert_file = subscriber_config_path + "subscriber.crt" subscriber_key_file = subscriber_config_path + "subscriber.key" def agent_connect(): client = mqtt.Client() client.tls_set( ca_certs=mqtt_ca_file, certfile=mqtt_cert_file, keyfile=mqtt_key_file, cert_reqs=ssl.CERT_REQUIRED) client.connect(host, port, 60) return client def agent_publish(client_id, onion_hostname): payload = { 'clientId': client_id, 'timestamp': datetime.now().strftime("%d-%b-%Y (%H:%M:%S.%f)"), 'onionAddress': onion_hostname, 'sshPort': 22 } client = agent_connect() topic = "torch/" + client_id + "/wake" client.publish(topic, json.dumps(payload)) print("Debug: Connected to MQTT Broker at %s://%s:%s/%s" % ("mqtts", host, port, topic)) print("Debug: Published payload: " + json.dumps(payload)) client.disconnect() print("Debug: Disconnected from MQTT Broker") pass def subscriber_thread(host, port, filename, topic, cafile, certfile, keyfile): torch_sub.attach(host, port, filename, topic=topic, mqtt_ca_file=cafile, mqtt_cert_file=certfile, mqtt_key_file=keyfile) class GivenBrokerAndTorchAgent(unittest.TestCase): def test_when_agent_publishes_should_get_hostname_from_subscriber(self): outfile = "clients.json" threading.Thread(target=subscriber_thread, args=(host, port, outfile, "torch/+/wake", subscriber_ca_file, subscriber_cert_file, subscriber_key_file), daemon=True).start() time.sleep(0.5) agent_publish("client1", "crazyonion.onion") time.sleep(0.5) file = open(outfile, "r") response = json.load(file) self.assertEqual(response['client1']['onionAddress'], "crazyonion.onion") if __name__ == '__main__': unittest.main()