torch-subscriber-simple/torch_sub/test/integration_pub_sub.py

68 lines
1.6 KiB
Python

import json
import ssl
import unittest
from datetime import datetime
import paho.mqtt.client as mqtt
from torch_sub import torch_sub
host = "mqtt.example.com"
port = 8883
config_path = "agent-config/"
mqtt_ca_file = config_path + "ca.crt"
mqtt_cert_file = config_path + "vagrant.crt"
mqtt_key_file = config_path + "vagrant.key"
def agent_connect():
client = mqtt.Client()
client.tls_set(
ca_certs=mqtt_ca_file,
certfile=mqtt_cert_file,
keyfile=mqtt_key_file,
cert_reqs=ssl.CERT_REQUIRED)
client.connect(host, port, 60)
return client
def agent_publish(client_id, onion_hostname):
payload = {
'clientId': client_id,
'timestamp': datetime.now().strftime("%d-%b-%Y (%H:%M:%S.%f)"),
'onionAddress': onion_hostname,
'sshPort': 22
}
client = agent_connect()
topic = "torch/" + client_id + "/wake"
client.publish(topic, json.dumps(payload))
print("Debug: Connected to MQTT Broker at %s://%s:%s/%s" % ("mqtts", host, port, topic))
print("Debug: Published payload: " + json.dumps(payload))
client.disconnect()
print("Debug: Disconnected from MQTT Broker")
pass
class GivenBrokerAndTorchAgent(unittest.TestCase):
def test_when_agent_publishes_should_get_hostname_from_subscriber(self):
clients_json = "clients.json"
torch_sub.attach(host, port, clients_json)
agent_publish("client1", "crazyonion.onion")
file = open(clients_json, "r")
response = json.load(file)
self.assertEqual(response['results']['client1']['onionAddress'], "crazyonion.onion")
if __name__ == '__main__':
unittest.main()