Towards Lab IoT Part Two

In order to test communication we’ll set up two clients, each representing an instrument, along with a server which acts as the MQTT broker, database and visualization. The three machines are all running ubuntu under VirtualBox.
In this case client1 sends measurement1 data which resets to a random value between 20 and 50 when it receives a signal from trigger. client2 sends measurement2 data which resets to 20 when it receives a signal from trigger. The controller monitors measurement1 and signals the trigger when measurement1 is above 50. The controller also updates influxDB with data from measurement1 and measurement2. client1 and client2 represent the two clients, running instruments and the server is controller. Codes for each are below.

controller (virtual machine 1 - ip address = 10.0.2.15)

## controller
## 
## this is the controller code that runs in the background
## it subscribes to measurement1 and when that value
## reaches the threshold of 50 it publishes a trigger
## event that client 1 and client 2 both respond to
##
## It also doubles up as the subscriber for the influxDB
## database which is code that can be run in a separate
## thread or program
## 
## publish: trigger1
## subscribe: measurement1
## response: if measurement1 > 50 then publish trigger 
 
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import datetime
import logging
import socket
import time
 
MQTT_BROKER = '10.0.2.15'
HOSTNAME = socket.gethostname()
IPADDR = socket.gethostbyname(HOSTNAME)
INSTRUMENT_ID = 'controller'
INFLUX_CLIENT = '10.0.2.15'
INFLUX_DATABASE = 'lab1'
 
# influxDB
influx_client = InfluxDBClient(INFLUX_CLIENT, database = INFLUX_DATABASE)
 
## define logging file
FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
logging.basicConfig(filename = '/home/harvey/controller.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
 
# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
def on_connect(client, userdata, flags, rc):
    logging.info("Connection returned with result code:" + str(rc), extra = d)
 
# Define the callback to hande publish from broker
def on_message(client, userdata, msg):
    # log the response
    logging.info("Received message, topic:" + msg.topic + "  payload:" + str(msg.payload), extra = d)
    
    # push to influxDB
    current_time = datetime.datetime.utcnow().isoformat()
    topic = msg.topic.split('/')
    measurement_type = topic[2]
    json_body = [
        {
            "measurement": measurement_type,
            "tags": {},
            "time": current_time,
            "fields": {
                "value": int(msg.payload)
            }
        }
    ]
    influx_client.write_points(json_body)
 
    # fire the trigger if measurement1 > 50
    if (msg.topic == 'sensor/expt1/measurement1'):
        if (int(msg.payload) > 50):
            client.publish("sensor/trigger/client1", payload = str(msg.payload))
 
# Callback handles disconnection, log the rc value
def on_disconnect(client, userdata, rc):
    logging.info("Disconnection returned with result code:" + str(rc), extra = d)
 
def main():
 
    # Create an instance of `Client`
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_disconnect= on_disconnect
    client.on_message = on_message
 
    # Connect to broker
    client.connect(MQTT_BROKER, 1883, 60)
 
    # Subscribe to all topics and start the loop
    client.subscribe("sensor/+/+", 0)
    client.loop_forever()
 
if __name__ == '__main__':
    logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
    main()

client1 (virtual machine 2 - ip address = 10.0.2.4)

## client 1 communications
## 
## this is an MQTT client code that runs in the background
## of client 1
## publish: measurement1
## subscribe: trigger1
## response: if trigger1 then reset measurement1 to a 
## random value between 20 and 50
 
import paho.mqtt.client as mqtt
import logging
import socket
import random
import time
 
MQTT_BROKER = '10.0.2.15'
HOSTNAME = socket.gethostname()
IPADDR = socket.gethostbyname(HOSTNAME)
INSTRUMENT_ID = 'machine 1'
 
## define logging file
FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
logging.basicConfig(filename = '/home/harvey/client1.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
 
# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
def on_connect(client, userdata, flags, rc):
    logging.info("Connection returned with result code:" + str(rc), extra = d)
 
# Define the callback to hande publish from broker
def on_message(client, userdata, msg):
    global measurement
    logging.info("Received message, topic:" + msg.topic + "  payload:" + str(msg.payload), extra = d)
    if (msg.topic == 'sensor/trigger/client1'):
        measurement = random.randrange(20, 50)
        logging.info("Trigger 1: measurement reset to " + str(measurement), extra = d)
 
# Callback handles disconnection, log the rc value
def on_disconnect(client, userdata, rc):
    logging.info("Disconnection returned with result code:" + str(rc), extra = d)
 
def main():
 
    # Create an instance of `Client`
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_disconnect= on_disconnect
    client.on_message = on_message
 
    # Connect to broker
    client.connect(MQTT_BROKER, 1883, 60)
 
    # initial value of measurement
    global measurement
    measurement = 0
 
    ## start MQTT client loop
    client.loop_start()
 
    # Subscribe to a topic
    client.subscribe("sensor/trigger/client1", 0)
 
    ## continuously update measurement value every second
    ## simulates instrument operation
    while True:
        client.publish("sensor/expt1/measurement1", payload = measurement)
        time.sleep(1)
        measurement += 1
 
    # Disconnection
    time.sleep(1) # wait till all messages are processed
    client.loop_stop()
    client.disconnect()
 
if __name__ == '__main__':
    logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
    main()

client2 (virtual machine 3 - ip address = 10.0.2.5)

## client 2 communications
## 
## this is an MQTT client code that runs in the background
## of client 2
## publish: measurement2
## subscribe: trigger1
## response: if trigger1 then reset measurement2 to 20 
 
import paho.mqtt.client as mqtt
import logging
import socket
import time
 
MQTT_BROKER = '10.0.2.15'
HOSTNAME = socket.gethostname()
IPADDR = socket.gethostbyname(HOSTNAME)
INSTRUMENT_ID = 'machine 2'
 
## define logging file
FORMAT = '%(asctime)-15s %(clientip)s %(instrumentid)-10s %(message)s'
logging.basicConfig(filename = '/home/harvey/client2.log', format = FORMAT, datefmt = '%m/%d/%Y %I:%M:%S %p', level = logging.DEBUG)
d = {'clientip': IPADDR, 'instrumentid': INSTRUMENT_ID}
 
# Define the callback to handle CONNACK from the broker, if the connection created normal, the value of rc is 0
def on_connect(client, userdata, flags, rc):
    logging.info("Connection returned with result code:" + str(rc), extra = d)
 
# Define the callback to hande publish from broker
def on_message(client, userdata, msg):
    global measurement
    logging.info("Received message, topic:" + msg.topic + "  payload:" + str(msg.payload), extra = d)
    if (msg.topic == 'sensor/trigger/client1'):
        measurement = 20
        logging.info("Trigger 1: measurement reset to " + str(measurement), extra = d)
 
# Callback handles disconnection, log the rc value
def on_disconnect(client, userdata, rc):
    logging.info("Disconnection returned with result code:" + str(rc), extra = d)
 
def main():
 
    # Create an instance of `Client`
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_disconnect= on_disconnect
    client.on_message = on_message
 
    # Connect to broker
    client.connect(MQTT_BROKER, 1883, 60)
 
    ## start MQTT client loop
    client.loop_start()
 
    # Subscribe to a topic
    client.subscribe("sensor/trigger/client1", 0)
 
    # initial value of measurement
    global measurement
    measurement = 0
 
    ## continuously update measurement value every second
    ## simulates instrument operation
    while True:
        client.publish("sensor/expt1/measurement2", payload = measurement)
        time.sleep(1)
        measurement += 1
 
    # Disconnection
    time.sleep(1) # wait till all messages are processed
    client.loop_stop()
    client.disconnect()
 
if __name__ == '__main__':
    logging.info('Starting Broadcast from ' + INSTRUMENT_ID, extra = d)
    main()

Visualization

Running all three codes leads to continuous output which updates every second. measurement1 and measurement2 are stored in the InfluxDB under database lab1 which can be visualized using grafana (default = localhost, port 3000).
measurement1 and measurement2 increment every second. When measurement1 reaches 50 it resets itself to a random value between 20 and 50 and resets measurement2 to 20.


Harvey Lieberman

Written by

Updated