Adding MQTT to the IoT Gateway

The primary function of an IoT Gateway is moving data from one input source (often sensor data) to another output destination (like a control algorithm) with some data conversion and storage in between. Our first milestone building the IoT Gateway is reading data from MQTT then holding it in RAM for the upcoming REST API we will build for Milestone 2.

MQTT Architecture

This page marks the beginning of the Organic Gardner (OG) IoT Project Milestone 1 development! If you want to program along but have not yet worked with the Go programming language check this intro: Getting ready to Go.

A Brief About MQTT

MQTT is the messaging protocol that a Collector will use to periodically publish sensor data (like temperature and humidity) to the IoT Gateway. MQTT will also be used to signal when the Control Station or Controller will respond to commands to turn a sprinkler pump on or off.

Why MQTT

MQTT fits nicely into compact hardware and embedded systems where RAM and compute power are limited. Which is one reason why it is ubiquitous in the IoT applications.

MQTT is easy to use both programming and operations. It compiles into a small and fast binary. MQTT is built atop of TCP the Internets workhorse protocol which also means it is reliable and adapts well to busy or low bandwidth networks.


MQTT Architecture


MQTT Architecture

MQTT has three primary components: a broker, publishers and subscribers. Publishers send messages to Brokers, Brokers then forward the message to Subscribers.

Messages are segregated by Topics that resemble the path hierarchy of a file-system. For example the Collector sends the current temperature to the topic data/temperature and the topic data/soil-moisture is used for moisture as examples..

Similarly the pump for a sprinkler system would subscribed to the topic ctl/sprinkler waiting for commands to turn a sprinkler on and off.

That was just enough description of MQTT to get us started. As the project progresses we will dive into more detail of MQTT particulars as they effect our project. Now let’s actually add MQTT to the IoT Gateway as required by the first Milestone.

Import the Paho MQTT Library

The third party package Paho MQTT Go is a nice little library that is going to make it easy for us to subscribe to the appropriate MQTT topics as well as enabling us to publish commands for the sprinkler controller.

First we need to do is import the package directly from it’s repository and with Go nothing could be easier! Just run the following command from the command line.

% go get github.com/eclipse/paho.mqtt.golang

Second the package must be imported into the application source code during compile time with the following line of code.

import mqtt "github.com/eclipse/paho.mqtt.golang")

As matter of fact here is the entire snippet for importing the MQTT package and connecting to the MQTT with this code:

We can now connect to a MQTT Broker, which by default will be located on the same host running our IoT-Gateway (i.e. localhost). However that may not always be the case, the gateway may need to connect to an external broker.

For this reason we are going to make the brokers address configurable. This leads us to a brief introduction to the Go builtin flag package allowing us to easily create a command line argument that is capable of setting the broker configuration variable.

The Configuration Struct

I typically create a struct called Configuration and a single global variable (singleton) called config to house all the programs configuration variables. Like so:

import (
    "flags"
)

typedef Configuration struct {
    Broker string           `json:"broker"`
}

var (
    config Configuration
)

func init() {
    flags.StringVar(&config.Broker, "broker", "localhost", "Set the MQTT Broker")
}

func main() {
    flags.Parse()
    
    mqtt_init(config.Broker)
}

If you would like to read more about the configuration struct as well as a discussion saving and reading the configuration structure from a file and a quick introduction to Go’s twist on “Object Oriented” programming check out this article on the Go Config Struct.

Now we have turned the config.Broker variable into a command line argument that defaults to localhost. Meaning if we run the command as:

% ./iot-gateway

It will automatically connect to the MQTT broker running on localhost. Otherwise we can have our IoT Gateway connect to a public MQTT broker for example:

% ./iothub -broker mqtt.eclipse.org

Using the command above all of the data from the topics we subscribe to will be published from the global broker mqtt.eclipse.org.

MQTT Topics for OG

The IoT-Gateway in it’s first version will of course collect environmental data including temperature, humidity, soil moisture and luminescence from various sensors scattered about.

As mentioned earlier MQTT topics are strings with a hiearchal structure very similar to a file-path. We are going to take advantage of this fact and structure our topics such that we can extract the StationID and SensorID directly from the topic itself.

ss/data/{:stationid}/{:sensorid}

Where the :stationid and :sensorid are variables to be replaced by the values extracted from the topic path-like strings.

For example a station with an ID 10.11.4.22 will publish the temperature in Fahrenheit with a sensor ID of tempf results in the data topic

ss/data/10.11.4.22/tempf

MQTT and Wildcards

Lucky for us MQTT topics can be subscribed to using the ‘+’ wildcard symbol to capture the StationID and SensorID even though the hub did not know about any of these stations or sensors before they were published by the Collection Station.

By subscribing to a single Topic using wildcards we can be ensured of receiving data from all stations and sensors with the single call to hub.Subscribe() below:

	hub.Subscribe("data", "ss/data/+/+", dataCB)

Where the above call gives our subscription the name “data”. The path ss/data/+/+ contains two wildcards represented with the + character. The third argument is the callback that will be invoked every time a value is published to one of the above topics.

Here is the Subscribe() function from the IoT gateway:

func (s *Hub) Subscribe(id string, path string, f mqtt.MessageHandler) {
	sub := &Subscriber{id, path, f, nil}
	s.Subscribers[id] = sub

	qos := 0
	if token := mqttc.Subscribe(path, byte(qos), f); token.Wait() && token.Error() != nil {
		panic(token.Error())
	} else {
		if config.Verbose {
			log.Printf("subscribe token: %v", token)
		}
	}
	log.Println(id, " subscribed to ", path)
}

and the callback function that is invoked every time new data arrives:

// TimeseriesCB call and parse callback data
func dataCB(mc mqtt.Client, mqttmsg mqtt.Message) {
	topic := mqttmsg.Topic()

	// extract the station from the topic
	paths := strings.Split(topic, "/")
	category:= paths[1] 
	station := paths[2]
	sensor  := paths[3]
	payload := mqttmsg.Payload()

	consumers := hub.GetConsumers(category) 
	if consumers == nil {
		log.Println("DataCB no consumers for ", topic)
		return					// nobody is listening
	}

	log.Printf("MQTT Message topic %s - value %s\n", topic, string(payload))
	switch (category) {
	case "data":
		msg := Msg{}
		msg.Station = station
		msg.Sensor = sensor
		msg.Data = payload
		msg.Time = time.Now().Unix()
		for _, consumer := range consumers {
			consumer.GetRecvQ() <- msg
		}

	default:
		log.Println("Warning: do not know how to handle", topic)
	}
}

MQTT Handling Incoming Data

The callback shown above is pretty simple:

  1. Extract the StationID and SensorID from the MQTT topic
  2. Extract the value delivered
  3. Save the timestamp for when the data was received
  4. Use the StationID and SensorID to index the RAM Cache
  5. Send the {timestamp, value} tuple to the RAM Cache consumer[1].

The Ram Cache consumer is a Go routine that receives the incoming Msg over a channel. We’ll talk about these novel Inter-Process Communication (IPC) mechanisms supplied by Go when we add Websockets during the 4th milestone.

Controlling Memory Usage

Following this algorithm our memory usage is going to increase in direct proportion to the number of stations, sensors and frequency of data publications.

Todo: in the future we’ll add configurations that will allow us control over how much data to keep in RAM and how long to keep it.

Controlling memory in sophisticated ways is an exercise for later. Until then we’ll just put a limit on the number of data points that can be kept, like say 1,000 so we don’t perpetually run out of memory before implementing more complex memory controls.

The Msg Data Structure

Data is reformatted into the following Msg struct by the dataCB() callback for every new datapoint. The structure is defined as

type Msg struct {
    StationID   string
    SensorID    string
    time.Timestamp
    Value       interface{}
};

The interface value allows for an arbitrary value type. For example Value can be an integer, floating point or a string formatted as JSON.

The Msg structure is fine for handling the immediate incoming data and passing it along to a consumer, it is not efficient for storing in memory for a quick API response. For this reason we need to define a structure more appropriate for indexed retrieval defined below.

Internal Data Model

Data will be cached in RAM with the following format built from Datapoint tuples {timestamp, value} as a series hanging from a Sensor which in turn is part of multiple sensors associated with a Collection Station.


s s t t a a t t i i o o n n 1 2 s s e e n n s s o o r r 1 2 [ [ t t s s 1 1 , , t t s s 2 2 , , t t s s 3 3 , , ] ]

With this model we can easily adjust the number of timestamps allowed per sensor to limit the amount of memory that will be consumed.

Testing the Hub

We now have enough code that we can build and test our fledgling IoT Gateway. First we will of course write the obligatory unit tests we all should be writing as part of our Software Development Process (SDP) with Test Driven Design (TDD)

Go and Unit Tests

We won’t go into any detail writing Go unit tests here as there are plenty of good resources on the net including the best place to start which is the Go testing package documentation itself.

The unit tests above can be considered white box tests implying that the test code has access to programs internal data structures and functions directly for testing.

System Tests with MQTT

System tests, however are considered black box test and operate completely outside the Gateway by accessing the Gateway’s external public API.

Mocking Collection Stations

Easy testing is one of the beautiful things about working with protocols like MQTT and HTTP they are inherently mockable. We’ll use the popular mosquito_pub MQTT publishing tool to mock the Collection Stations (CS) that have not yet been developed as of this writing.

To demonstrate a quick test of the hub we will add a -verbose flag to print data as it is received. The data is then cached in RAM and made ready for the REST API coming in the next article (milestone).

To mock a Collector publishing temperature data all we have to do is run the following command:

% mosquitto_pub -t ss/data/10.11.1.1/tempf -m 98.6

Our IoT-Gateway will pick up the fake data value 98.6 from sensor tempf extracted from the topic ss/data/10.11.1.11/tempf.

The upper screenshot shows logs from the IoT Hub starting up then having just received it’s first data point from MQTT. The lower screen shows the invocation of the mosquitto_pub command.

High Level Sensor Station

Victory!

In the above screen shot mosquitto_pub published the temperature in Fahrenheit to the topic ss/data/10.11.1.11/tempf where the CS station id is represented by 10.11.1.11. Likewise, the SensorID is represented by the string tempf. The value passed in 98.6 degrees Fahrenheit.

We can see the Hub receiving the data and parsing the StationID and SensorID from the topic string. The data is parsed, formatted and temporarily saved in RAM.

HTTP REST Server Next …

The gateway now receives periodic data from one of more stations, each with one or more sensors. The data is reformatted and stored as a time-series in RAM.

Now it is time build our REST API to get the data out of the IoT Gateway.

In this next article we are going to import Go’s builtin net/http package to setup our HTTP server that will in turn handle our REST Endpoints. This same package will later allow us to serve up the IoT Gateway web app.

Next Adding the REST API