MQTT with Processing

For me, being more engineer and less artist, Processing is a handy tool for data visualisations.: easy to use, but with the full power of java under the hood if it’s needed.

Matchboxes
by Schoenerwissen.

(courtesy of Processing.org)

However, if you want to sketch marvellous stuff like Matchboxes above, you need to feed in information from an external source; twitter, arduino, databases, web, hardware etc and there are many great libraries to make it easier. However, for each of these data streams, you need to program an interface into the sketch; it’s a point solution. It’s easy to get to a place where those libraries, threads, urls and api keys just start making life complicated. Maybe it would be better if we acted a bit more ‘Internet of Things‘ ?

Perhaps our sketch could talk to an MQTT broker instead through a single interface? All the widgets and datastreams could publish their data and the sketch could subscribe to the ones it needs. If a new data source is wanted, get it to publish itself as well and then it’s not only available to the sketch, but to everything else that might want the data as well. Of course, it works the other way as well, if the sketch can publish it’s data then it’s available to another sketch or an arduino etc.

Lets start with a really simple, slightly rubbish, sketch that bounces ‘Hello World’ about:

private float x = 10; // x location of square
private float y = 0; // y location of square
private float speed = 0; // speed of square
private static final float gravity = 0.1f;
public String current;

public void setup() {
 size(300, 300);
 background(0);
 newmessage("Hello World");
 }
public void draw() {
 background(255);
 fill(0);
text(current, x, y);
 y = y + speed;
 speed = speed + gravity;
 x = frameCount % width;
if (y > height) {
 if (speed < 0.2)
 speed = 0;
 else
 speed = speed * -0.95f; 
 }
}

public void newmessage(String s) {
	current = s;
	speed = 0;
	y = 0;
	fill( random(255), random(255), random(255), random(255)); 
	}

The job is to replace the text with data from an MQTT broker by getting MQTT to call newmessage(). Handily, the MQTT system comes with a java client, a jar we can add and a nice simple interface we can use. If you’re following along using the Processing IDE (source code for sketches is here but you’ll need wmqtt.jar as below):

  1. Download the IA92 code from IBM, unzip it and find wmqtt.jar.
  2. Find your Processing/libraries folder (Cmd-K) and create wmqtt/library
  3. Put wmqtt.jar into it.
  4. Create a new tab in you sketch call MQTTLib and drop in the following:
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttSimpleCallback;
import com.ibm.mqtt.MqttException;
 public class MQTTLib {
 private MqttSimpleCallback callback;
 private MqttClient client = null;

 MQTTLib(String broker, MqttSimpleCallback p) {
 callback = p;
 try {
 client = (MqttClient) MqttClient.createMqttClient(broker, null);
 //class to call on disconnect or data received
 client.registerSimpleHandler(callback);
 } catch (MqttException e) {
 System.out.println( e.getMessage() );
 }
 }

 public boolean connect(String client_id, boolean clean_start, short keep_alive) {

 try {
 //connect - clean_start=true drops all subscriptions, keep-alive is the heart-beat
 client.connect(client_id, clean_start, keep_alive);
 //subscribe to TOPIC
 return true;
 } catch (MqttException e) {
 System.out.println( e.getMessage() );
 return false;
 }
 }

 public boolean subscribe(String[] topics, int[] qos ) {
 try {
 //subscribe to TOPIC
 client.subscribe(topics, qos);
 return true;
 } catch (MqttException e) {
 System.out.println( e.getMessage() );
 return false;
 }
 }

}

This new class takes as a constructer, a URL string for a broker and the class that gets called when either the connection is dropped, or we have new data. We also have  connect and subscribe methods to, well connect and subscribe. All this code is pretty short on error handling, but is sufficient for prototyping. All we have to do now is alter our main script to get it all going.

The key here is providing the callback so our script is informed when new data is ready. For that we need a class that implements MqttSimpleCallback, so we add an inner class into our main sketch:

private class MessageHandler implements MqttSimpleCallback {
public void connectionLost() throws Exception {
 System.out.println( "Connection has been lost." );
 //do something here
 }
public void publishArrived( String topicName, byte[] payload, int QoS, boolean retained ){
 String s = new String(payload);
 //Display the string
 newmessage(s);
 } 

 }

This class has only two methods, one gets called if we lose the connection – it should reconnect etc in the real world. The second gets called when the broker has data for us from a subscription topic. In our code we simple set our display string to the payload.

Last step then, is to alter the sketch in setup to create the MQTTLib class, connect and subscribe to something, so now we have:

import com.ibm.mqtt.MqttSimpleCallback;

//MQTT Parameters
private MQTTLib m;
private String MQTT_BROKER ="tcp://test.mosquitto.org:1883";
private String CLIENT_ID = "TestProcessing";
private int[] QOS = {1};
private String[] TOPICS = { "$SYS/broker/clients/active"};
private boolean CLEAN_START = true;
private short KEEP_ALIVE = 30;
private float x = 10; // x location of square
private float y = 0; // y location of square
private float speed = 0; // speed of square
private static final float gravity = 0.1f;
public String current;
public void setup() {
 size(300, 300);
 background(0);
 // set up broker connection
 m = new MQTTLib(MQTT_BROKER, new MessageHandler());
 m.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
 m.subscribe(TOPICS, QOS);
 current = "Hello World";
 }
public void draw() {
 background(255);
 stroke(0);
text(current, x, y);
 y = y + speed;
 speed = speed + gravity;
 x = frameCount % width;
if (y > height) {
 if (speed < 0.2)
 speed = 0;
 else
 speed = speed * -0.95f;

 }
}

That’s it. Every time the broker gets a message published to the topic, it tells the client, running in it’s own thread, and it triggers the callback. That resets the display message.

There are several open MQTT brokers on the web we can talk to. There is a test mosquitto broker chosen above that we can use and the topic is the number of current connections. This topic is useful because it changes fairly rapidly. Cosm have another excellent one worth looking at. A couple of other things:

  • The broker needs a unique Client ID for each connection, so that probably needs changing, otherwise it throws the duplicate off.
  • Clean Start signals the broker to drop any topics we’ve subscribed to. Our connection is treated as totally new.
  • Keep Alive is the heartbeat signal time to the broker, e.g we ping it every N secs so it knows we’re still there.

To Do

Well, the code could be better, we can’t publish yet, and the Qos stuff isn’t explained. Also, the IA92 code is deprecated and we should be using Paho code instead. But, it’s enough to be getting on with and it should be fairly simple to retrofit an existing sketch to get it’s data from an MQTT broker.

Advertisements