Compare commits
No commits in common. "0cd8bd2b7253a7896c7ec2445c5f57e965bded6b" and "e061b64ec1605cd4c743030c4fc6626a6f4e3aff" have entirely different histories.
0cd8bd2b72
...
e061b64ec1
|
@ -1,4 +0,0 @@
|
||||||
listener 8883
|
|
||||||
connection_messages true
|
|
||||||
log_type all
|
|
||||||
websockets_log_level 9
|
|
|
@ -1,3 +1 @@
|
||||||
@echo off
|
docker run --rm --name mosquitto -p 8883:8883 -v %cd%\test\broker-config:/mosquitto/config eclipse-mosquitto
|
||||||
if %1%==secure (set config=broker-tls-config) else (set config=broker-no-tls-config)
|
|
||||||
docker run --rm -d --name mosquitto -p 8883:8883 -v "%cd%\test\%config%:/mosquitto/config" eclipse-mosquitto
|
|
||||||
|
|
|
@ -1,14 +1,8 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
CONFIG=broker-no-tls-config
|
docker run -it --rm \
|
||||||
|
|
||||||
if [[ $1 == "secure" ]]; then
|
|
||||||
CONFIG=broker-tls-config
|
|
||||||
fi
|
|
||||||
|
|
||||||
docker run --rm -d \
|
|
||||||
--user "$UID" \
|
--user "$UID" \
|
||||||
-p 8883:8883 \
|
-p 8883:8883 \
|
||||||
-v "$(pwd)/test/$CONFIG:/mosquitto/config" \
|
-v "$(pwd)/test/broker-config:/mosquitto/config" \
|
||||||
--name mosquitto \
|
--name mosquitto \
|
||||||
eclipse-mosquitto
|
eclipse-mosquitto
|
|
@ -1,167 +0,0 @@
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import ssl
|
|
||||||
import sys
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
import unittest
|
|
||||||
from datetime import datetime
|
|
||||||
from unittest.case import TestCase
|
|
||||||
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
|
|
||||||
from torch_sub import torch_sub
|
|
||||||
|
|
||||||
broker_hostname = "mqtt.example.com"
|
|
||||||
broker_port = 8883
|
|
||||||
|
|
||||||
agent_config_path = "test/agent-config/"
|
|
||||||
mqtt_ca_file = agent_config_path + "ca.crt"
|
|
||||||
mqtt_cert_file = agent_config_path + "vagrant.crt"
|
|
||||||
mqtt_key_file = agent_config_path + "vagrant.key"
|
|
||||||
|
|
||||||
subscriber_config_path = "test/subscriber-config/"
|
|
||||||
subscriber_ca_file = subscriber_config_path + "ca.crt"
|
|
||||||
subscriber_cert_file = subscriber_config_path + "subscriber.crt"
|
|
||||||
subscriber_key_file = subscriber_config_path + "subscriber.key"
|
|
||||||
|
|
||||||
|
|
||||||
def agent_connect(use_tls=True):
|
|
||||||
client = mqtt.Client()
|
|
||||||
if use_tls:
|
|
||||||
client.tls_set(
|
|
||||||
ca_certs=mqtt_ca_file,
|
|
||||||
certfile=mqtt_cert_file,
|
|
||||||
keyfile=mqtt_key_file,
|
|
||||||
cert_reqs=ssl.CERT_REQUIRED)
|
|
||||||
client.connect(broker_hostname, broker_port, 60)
|
|
||||||
return client
|
|
||||||
|
|
||||||
|
|
||||||
def publish(client_id, onion_hostname, use_tls=True):
|
|
||||||
payload = {
|
|
||||||
'clientId': client_id,
|
|
||||||
'timestamp': datetime.now().strftime("%d-%b-%Y (%H:%M:%S.%f)"),
|
|
||||||
'onionAddress': onion_hostname,
|
|
||||||
'sshPort': 22
|
|
||||||
}
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
client = agent_connect(use_tls=use_tls)
|
|
||||||
client.publish("torch/" + client_id + "/wake", json.dumps(payload))
|
|
||||||
client.disconnect()
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
|
|
||||||
class GivenBroker(TestCase):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
if os.path.exists(torch_sub.database_file):
|
|
||||||
os.remove(torch_sub.database_file)
|
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
os.system("docker container stop mosquitto")
|
|
||||||
if os.path.exists(torch_sub.database_file):
|
|
||||||
os.remove(torch_sub.database_file)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def run_broker(tls):
|
|
||||||
cli = "test/run-broker.sh " + tls
|
|
||||||
if sys.platform.startswith('win32'):
|
|
||||||
cli = "test\\run-broker.bat " + tls
|
|
||||||
os.system(cli)
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def run_subscriber():
|
|
||||||
threading.Thread(target=torch_sub.subscribe,
|
|
||||||
args=(broker_hostname,
|
|
||||||
broker_port,
|
|
||||||
"torch/+/wake",
|
|
||||||
{
|
|
||||||
'ca_certs': subscriber_ca_file,
|
|
||||||
'certfile': subscriber_cert_file,
|
|
||||||
'keyfile': subscriber_key_file
|
|
||||||
}),
|
|
||||||
daemon=True).start()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def run_insecure_subscriber():
|
|
||||||
threading.Thread(target=torch_sub.subscribe,
|
|
||||||
args=(broker_hostname,
|
|
||||||
broker_port,
|
|
||||||
"torch/+/wake",
|
|
||||||
None),
|
|
||||||
daemon=True).start()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def loadDatabase():
|
|
||||||
with open(torch_sub.database_file, "r") as database_file:
|
|
||||||
return json.load(database_file)
|
|
||||||
|
|
||||||
|
|
||||||
class GivenTlsBroker(GivenBroker):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
self.run_broker("secure")
|
|
||||||
self.run_subscriber()
|
|
||||||
|
|
||||||
def test_when_agent_publishes_should_get_hostname_from_subscriber(self):
|
|
||||||
publish("client1", "crazy_onion.onion")
|
|
||||||
|
|
||||||
database = self.loadDatabase()
|
|
||||||
self.assertEqual(database['client1']['onionAddress'], "crazy_onion.onion")
|
|
||||||
|
|
||||||
def test_when_agent_publishes_should_get_hostname_from_subscriber2(self):
|
|
||||||
publish("client2", "crazy_onion2.onion")
|
|
||||||
|
|
||||||
database = self.loadDatabase()
|
|
||||||
self.assertEqual(database['client2']['onionAddress'], "crazy_onion2.onion")
|
|
||||||
|
|
||||||
def test_when_agent_publishes_multiple_hosts_should_provide_latest(self):
|
|
||||||
publish("client2", "crazy_onion2-34.onion")
|
|
||||||
publish("client3", "crazy_onion3.onion")
|
|
||||||
publish("client1", "crazy_onion1.onion")
|
|
||||||
publish("client2", "crazy_onion2-56.onion")
|
|
||||||
publish("client3", "crazy_onion3.onion")
|
|
||||||
|
|
||||||
database = self.loadDatabase()
|
|
||||||
self.assertEqual(database['client1']['onionAddress'], "crazy_onion1.onion")
|
|
||||||
self.assertEqual(database['client2']['onionAddress'], "crazy_onion2-56.onion")
|
|
||||||
self.assertEqual(database['client3']['onionAddress'], "crazy_onion3.onion")
|
|
||||||
|
|
||||||
|
|
||||||
class GivenNonTlsBroker(GivenBroker):
|
|
||||||
def setUp(self) -> None:
|
|
||||||
self.run_broker("insecure")
|
|
||||||
self.run_insecure_subscriber()
|
|
||||||
|
|
||||||
def test_when_agent_publishes_should_get_hostname_from_subscriber(self):
|
|
||||||
self.insecure_publish("client1", "crazy_onion.onion")
|
|
||||||
|
|
||||||
database = self.loadDatabase()
|
|
||||||
self.assertEqual(database['client1']['onionAddress'], "crazy_onion.onion")
|
|
||||||
|
|
||||||
def test_when_agent_publishes_should_get_hostname_from_subscriber2(self):
|
|
||||||
self.insecure_publish("client2", "crazy_onion2.onion")
|
|
||||||
|
|
||||||
database = self.loadDatabase()
|
|
||||||
self.assertEqual(database['client2']['onionAddress'], "crazy_onion2.onion")
|
|
||||||
|
|
||||||
def test_when_agent_publishes_multiple_hosts_should_provide_latest(self):
|
|
||||||
self.insecure_publish("client2", "crazy_onion2-34.onion")
|
|
||||||
self.insecure_publish("client3", "crazy_onion3.onion")
|
|
||||||
self.insecure_publish("client1", "crazy_onion1.onion")
|
|
||||||
self.insecure_publish("client2", "crazy_onion2-56.onion")
|
|
||||||
self.insecure_publish("client3", "crazy_onion3.onion")
|
|
||||||
|
|
||||||
database = self.loadDatabase()
|
|
||||||
self.assertEqual(database['client1']['onionAddress'], "crazy_onion1.onion")
|
|
||||||
self.assertEqual(database['client2']['onionAddress'], "crazy_onion2-56.onion")
|
|
||||||
self.assertEqual(database['client3']['onionAddress'], "crazy_onion3.onion")
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def insecure_publish(client_id, onion_address):
|
|
||||||
publish(client_id, onion_address, use_tls=False)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
113
test/test_integration_pub_sub.py
Normal file
113
test/test_integration_pub_sub.py
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import ssl
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
|
from torch_sub import torch_sub
|
||||||
|
|
||||||
|
broker_hostname = "mqtt.example.com"
|
||||||
|
broker_port = 8883
|
||||||
|
|
||||||
|
agent_config_path = "test/agent-config/"
|
||||||
|
mqtt_ca_file = agent_config_path + "ca.crt"
|
||||||
|
mqtt_cert_file = agent_config_path + "vagrant.crt"
|
||||||
|
mqtt_key_file = agent_config_path + "vagrant.key"
|
||||||
|
|
||||||
|
subscriber_config_path = "test/subscriber-config/"
|
||||||
|
subscriber_ca_file = subscriber_config_path + "ca.crt"
|
||||||
|
subscriber_cert_file = subscriber_config_path + "subscriber.crt"
|
||||||
|
subscriber_key_file = subscriber_config_path + "subscriber.key"
|
||||||
|
|
||||||
|
|
||||||
|
def agent_connect():
|
||||||
|
client = mqtt.Client()
|
||||||
|
client.tls_set(
|
||||||
|
ca_certs=mqtt_ca_file,
|
||||||
|
certfile=mqtt_cert_file,
|
||||||
|
keyfile=mqtt_key_file,
|
||||||
|
cert_reqs=ssl.CERT_REQUIRED)
|
||||||
|
client.connect(broker_hostname, broker_port, 60)
|
||||||
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
def agent_publish(client_id, onion_hostname):
|
||||||
|
payload = {
|
||||||
|
'clientId': client_id,
|
||||||
|
'timestamp': datetime.now().strftime("%d-%b-%Y (%H:%M:%S.%f)"),
|
||||||
|
'onionAddress': onion_hostname,
|
||||||
|
'sshPort': 22
|
||||||
|
}
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
client = agent_connect()
|
||||||
|
client.publish("torch/" + client_id + "/wake", json.dumps(payload))
|
||||||
|
client.disconnect()
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
class GivenBrokerAndTorchAgent(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self) -> None:
|
||||||
|
cli = "test/run-broker.sh"
|
||||||
|
if sys.platform.startswith('win32'):
|
||||||
|
cli = "test\\run-broker.bat"
|
||||||
|
threading.Thread(target=os.system, args=(cli,), daemon=True).start()
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
if os.path.exists(torch_sub.database_file):
|
||||||
|
os.remove(torch_sub.database_file)
|
||||||
|
|
||||||
|
def tearDown(self) -> None:
|
||||||
|
os.system("docker container stop mosquitto")
|
||||||
|
if os.path.exists(torch_sub.database_file):
|
||||||
|
os.remove(torch_sub.database_file)
|
||||||
|
|
||||||
|
def test_when_agent_publishes_should_get_hostname_from_subscriber(self):
|
||||||
|
self.run_subscriber()
|
||||||
|
agent_publish("client1", "crazy_onion.onion")
|
||||||
|
database = self.loadDatabase()
|
||||||
|
self.assertEqual(database['client1']['onionAddress'], "crazy_onion.onion")
|
||||||
|
|
||||||
|
def test_when_agent_publishes_should_get_hostname_from_subscriber2(self):
|
||||||
|
self.run_subscriber()
|
||||||
|
agent_publish("client2", "crazy_onion2.onion")
|
||||||
|
database = self.loadDatabase()
|
||||||
|
self.assertEqual(database['client2']['onionAddress'], "crazy_onion2.onion")
|
||||||
|
|
||||||
|
def test_when_agent_publishes_multiple_hosts_should_provide_latest(self):
|
||||||
|
self.run_subscriber()
|
||||||
|
agent_publish("client2", "crazy_onion2-34.onion")
|
||||||
|
agent_publish("client3", "crazy_onion3.onion")
|
||||||
|
agent_publish("client1", "crazy_onion1.onion")
|
||||||
|
agent_publish("client2", "crazy_onion2-56.onion")
|
||||||
|
agent_publish("client3", "crazy_onion3.onion")
|
||||||
|
database = self.loadDatabase()
|
||||||
|
self.assertEqual(database['client1']['onionAddress'], "crazy_onion1.onion")
|
||||||
|
self.assertEqual(database['client2']['onionAddress'], "crazy_onion2-56.onion")
|
||||||
|
self.assertEqual(database['client3']['onionAddress'], "crazy_onion3.onion")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def run_subscriber():
|
||||||
|
threading.Thread(target=torch_sub.subscribe,
|
||||||
|
args=(broker_hostname,
|
||||||
|
broker_port,
|
||||||
|
"torch/+/wake",
|
||||||
|
subscriber_ca_file,
|
||||||
|
subscriber_cert_file,
|
||||||
|
subscriber_key_file),
|
||||||
|
daemon=True).start()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def loadDatabase():
|
||||||
|
with open(torch_sub.database_file, "r") as database_file:
|
||||||
|
return json.load(database_file)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
|
@ -14,7 +14,7 @@ def update_client_record(client, userdata, message):
|
||||||
database_blank.write("{}")
|
database_blank.write("{}")
|
||||||
|
|
||||||
with open(database_file, 'r') as infile:
|
with open(database_file, 'r') as infile:
|
||||||
database = json.loads(infile.read())
|
database = json.load(infile)
|
||||||
|
|
||||||
payload = message.payload.decode('utf-8')
|
payload = message.payload.decode('utf-8')
|
||||||
response = json.loads(payload)
|
response = json.loads(payload)
|
||||||
|
@ -25,10 +25,13 @@ def update_client_record(client, userdata, message):
|
||||||
json.dump(database, outfile)
|
json.dump(database, outfile)
|
||||||
|
|
||||||
|
|
||||||
def subscribe(broker_hostname, broker_port, topic="torch", tls=None, auth=None):
|
def subscribe(broker_hostname, broker_port, topic="torch", ca_file=None, cert_file=None, key_file=None):
|
||||||
mqtt.callback(update_client_record,
|
mqtt.callback(update_client_record,
|
||||||
topic,
|
topic,
|
||||||
hostname=broker_hostname,
|
hostname=broker_hostname,
|
||||||
port=broker_port,
|
port=broker_port,
|
||||||
tls=tls,
|
tls={
|
||||||
auth=auth)
|
'ca_certs': ca_file,
|
||||||
|
'certfile': cert_file,
|
||||||
|
'keyfile': key_file
|
||||||
|
})
|
||||||
|
|
Loading…
Reference in New Issue
Block a user