1.5.29. fejezet, MQTT

Kapcsolódó hivatkozások

import random
from concurrent.futures import ThreadPoolExecutor
 
import paho.mqtt.client as mqtt
 
broker = 'mqserver.me.local'
port = 1883
serial_in_topic = "TOC_IN"
serial_out_topic = "TOC_OUT"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
 
def connect_mqtt():
    def on_connect(client, userdata, flags, rc, properties):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
 
    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client
 
def publish(client):
     msg = "BALANCE"
     result = client.publish(serial_in_topic, msg)
     status = result[0]
     if status == 0:
         print(f"Send `{msg}` to topic `{serial_in_topic}`")
     else:
         print(f"Failed to send message to topic {serial_in_topic}")
 
def subscribe(client: mqtt):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
 
    client.subscribe(serial_out_topic)
    client.on_message = on_message
 
def run_publish():
    client = connect_mqtt()
    client.loop_start()
    publish(client)
    client.loop_stop()
 
def run_subscribe():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()
 
 
with ThreadPoolExecutor(max_workers=2, thread_name_prefix="Worker") as executor:
    executor.submit(run_subscribe)
    executor.submit(run_publish)
wait = input("Press ENTER to exit...\n\n")