XForms Push with MQTT over Websockets

The idea…

I’ve been looking recently at a project that creates pdfs via a ConTeXt typesetter. The front-end is XForms, feeding the backend queue of jobs that get done with a delay of perhaps a few minutes. The problem is how do I tell the web app, that a particular job has finished, because the user isn’t waiting; they’re on to another document.

What I need is some sort of Push Notification that I can integrate into the XForms world.

Of course within XForms it’s possible to do a sort of polling with native <xf:dispatch delay=”number”/> calling a <xf:submit/> to see if anything interesting has happened, but it really doesn’t scale. At that point you need a ‘push’ technology that will notify you when something you are interested happens instead. Handily in HTML5, there is scope for that in the use of WebSockets. These allow you to create a two-way link back to your server along which you can send/receive pretty much anything. Now there is nothing to say you can’t write your own system but  there are plenty of fine libraries out there that will do the heavy lifting for you.

In my case Pub/Sub was the way to go as a behaviour, and MQTT specifically as a protocol because it’s one of the best (if you don’t believe me ask FaceBook). Pub/Sub in brief is a message pattern (2lemetry have a great Nutshell). Publishers send data (of any sort) to one or more topics, e.g. house/temperature/lounge to a Broker (that’s the server bit). Subscribers, without any knowledge of who is doing the publishing, subscribe to the broker to receive data from one or more topics, either explicitly i.e. house/temperature/lounge or by wildcard (+ = 1 level, #= n level), i.e house/# or house/+/temperature.

The Broker mediates between the publishers and subscribers and makes sure messages get delivered. In my use-case, for instance, for a user Fred, working on a Case 00123, you might assume a topic tree something like fred/docs/00123.  When fred logged onto the web app, he’d subscribe to  fred/# and start receiving anything on that branch and down. The reason not to subscribe to say fred/docs/# is that it leaves the door open to using the same tech for something like system messages targeted at him via fred/messages or some way to lock a form via fred/locks/00123 or track form changes via fred/forms/00123 etc etc. Note: One of the nice benefits to doing it the MQTT way is that there’s nothing to stop another system subscribing to say +/docs/# to watch for problems or gather stats!

However, before any of those bells and whistles, perhaps I need to solve the basic problem of getting it to work at all via a small example. All code and runnable examples are as usual on GitHub.

Demo v1: Connect/Disconnect.

Untitled

click to run example.

First thing is a MQTT broker. The nice people at DC Square (@DCsquare)  have a public broker running at mqtt-dashboard.com we can play on based on HiveMQ which I’ll use in the example. Please be nice to their broker and use responsibly. Want to set up your own broker? The Mosquitto Open Source broker comes highly recommended. Second we need a Javascript library to talk to the broker. The Eclipse Paho project has one that is pretty good. To get it you’ll have to clone their repository like so:

git clone http://git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.javascript.git

Lastly some code…. I’m going for the simplest xsltforms XForm with my MQTT connection details for the DC Square broker on it’s web socket port, and an initial topic to subscribe to (unused just yet) in the <xf:instance/>:

<xf:instance id="connection" xmlns=""> 
<data> 
<host>broker.mqttdashboard.com</host> 
<port>8000</port> 
<clientId></clientId> 
<initialtopic>xforms/chat</initialtopic> 
<status>disconnected</status>
 </data> 
</xf:instance>

Note the clientId is empty. I’m using a bit of xpath in the xforms-ready event to set this to a random value. This makes sure I don’t get kicked off the broker by someone connecting with the same id. (I’ll come back to this later). I’ve two trigger/buttons to connect and disconnect, which each call Javascript using the <xf:load><xf:resource/> method. Here’s the connect call as an example:

<xf:trigger id="mqttconnect">
  <xf:label>Connect</xf:label>
  <xf:load ev:event="DOMActivate">
    <xf:resource value="concat('javascript:mqttstart(&quot;',host,'&quot;,&quot;',
    port,'&quot;,&quot;',clientId,'&quot;)')" />
  </xf:load>
</xf:trigger>

Having to put in quotes around the params makes it a bit messy and it’s can be a pain. On bigger forms I tend to move this sort of JS stuff up into an <xf:event/> at the top of the form and call it, rather than spread it out through the code. Now I just need some JS to call. The main mqttstart() function sets up a Messaging Client with our parameters. The MQTT library does pretty much everything via callbacks, so most of the code is references to other functions.

function mqttstart(host,port,clientId) {
client = new Messaging.Client(host, Number(port), clientId);
client.onConnectionLost = onConnectionLost;
var options = {
timeout: 30,
onSuccess:onConnect,
onFailure:onFailure 
};
 client.connect(options);
};

For this version they’re pretty simple and I’m not interacting with the XForm the other way at all, i.e.

function onConnect() {
alert('Connected');
};
function onFailure() {
alert('Failed');
};
function onConnectionLost(responseObject) {
alert(client.clientId + " disconnected: " + responseObject.errorCode);
};

Only onFailure() is at all fancy and that’s just to get an error code. I’ll call this Version1 – here it is.

Version 2 : Feeding events back into the XForm.

Untitled

click to run example

Now I know the library works, I need to integrate the results from JS, back to my XForm. This is done by calling an <xf:event/> (note this call format is xsltforms specific) . I’ve got a generic JS function for this that takes the name of the event and a payload for the properties. The code is below, along with my new onConnect() function to call it.

Important! The JS side needs to know what the <xf:model/> id is for the call. I’ve hardcoded it here. Make sure it’s set on your XForm as well or it won’t work.

On the XForms side, I’ve now got the <xf:event/> that sets the connection status in my instance(). To add a visual cue, I’ve added a <xf:bind> to set the instance to readonly once connected  and reset if we disconnect:

function call_xform_event(xfevent,xfpayload) {
  var model=document.getElementById("modelid")
  XsltForms_xmlevents.dispatch(model,xfevent, null, null, null, null,xfpayload);
}
function onConnect() {
call_xform_event("mqtt_set_status",{status:  "connected"});
};
<xf:bind nodeset="instance('connection')" readonly="status = 'connected'" />
<xf:action ev:event="mqtt_set_status">
<xf:setvalue ref="status" value="event('status')" />
<xf:reset model="modelid" if="event('status') = 'disconnected'" />
</xf:action>

This is a much cleaner system than calling JS. I can set any number of properties on the JS side as a JSON string and they’ll appear as event properties on in the XForm.

Lastly, I need to add a subscription. In this case I’ve chosen xforms/chat.  To subscribe I have to wait to be connected and of course I could have done that at the JS level. However, this is about XForms so I’ve added a <xf:dispatch> to my ‘mqtt-set-status’ event to call another event to do it for me if I’m connected.

 <xf:dispatch name="mqtt_sub_first" targetid="modelid" if="event('status') = 'connected'" />

Next, I need to add support for messages arriving from this subscription. This is done by adding another callback into mqttstart() for onMessageArrived and then fleshing it out so that the message and it’s topic can be sent into my XForm:

function onMessageArrived(message) {
  dispatch_xforms("mqtt_message_arrived",{
    message: message.payloadString,
    topic: message.destinationName
    }); 
};

On the Xform side, I now need the event code. I’m going to drop all the messages into a new top-down “messagelog” instance. So it doesn’t escape off the screen, I’ve limited the log to 15 rows before I prune the bottom row. This simply uses a <xf:repeat> to output onto the screen.

<xf:action ev:event="mqtt_message">
  <xf:setvalue ref="message" value="event('message')" />
  <xf:insert nodeset="instance('messagelog')/event" at="1" position="before" />
  <xf:setvalue ref="instance('messagelog')/event[1]" value="concat(now(),' : ',event('topic'),' : ',event('message'))" />
  <xf:delete nodeset="instance('messagelog')/event[last()]" if="count(instance('messagelog')/event) &gt; 15" />
</xf:action>

You can try out v2 here. Once you connect, you’ll see, hopefully, the classic “Hello”. This is what is called a Retained Message and is sent to anyone subscribing as a default.  If you now go to the mqtt-dashboard.com site and use their Publish option, you should be able to publish anything you like to the xforms/chat topic and see it appear on the XForm – neat!

Version 3 : Publishing.

Untitled

Click to run example

We’re almost there. The last thing is how to publish a message to the broker. That’s actually the easiest bit of all. All I need is another instance to hold the topic and the message, so we can put it in the XForm, along with a <xf:trigger/> to call a JS function with the two values to send it. However, I can make it a little bit more interesting by talking about adding QoS and CleanSession.

QoS, or Quality of Service, is a flag that gets sent with a message or a subscription that sets how hard the broker and client work to handle your message.

At QoS 0, the default, the client deletes the message as soon as it’s sent to the broker. The broker in turn only sends it on to clients that are connected – it’s fire and forget.

At QoS  1, the client keeps the message stored until it knows the broker got it. In turn the broker keeps a copy until not only those connected get it, but those not connected currently but subscribed as well – you’ll get the message at least once, but might get a duplicate if the  broker/client aren’t sure.

At QoS 2 the same happens at QoS 1, with added actions by the broker/client to make sure you only get the message once.

So how does the broker keep track? It uses the clientId as the key to handle messages and subscriptions for a given client. When the client connects, there is a flag that tells the broker to either carry on or reset all messages and subscriptions for this client – that’s cleanSession and it’s true by default.

It’s worth knowing about since for a practical system you’d want cleanSession to be false and have a fixed clientId. Also, you’d probably end up with your messages being transferred at at least QoS 1. As they’re just flags it’s easy to add to the XForm for experimentation as version 3

Wrap Up.

It’s been fairly easy to integrate XForms with MQTT as it is with most JS libraries. Now I’ve a working example I should be able to generate my notification system fairly simply. Pub/Sub is an great technology and opens the way for some interesting form/form and form/server interactions.

Resources.

  • MQTT.org All things MQTT and where to go for libraries and brokers.
  • Eclipse Paho. M2M and IoT protocols. Home to the reference MQTT clients.
  • XSLTForms. Great client-side XForms engine.

 

Advertisements

Christmas with Processing

Over the holidays I got involved with the Cheerlights project by ioBridge Labs. The basic premise is that if you tweeted a colour to @cheerlights, it would appear on a series of feeds they were hosting, enabling all sorts of synchronised physical and virtual christmas light gizmo’s to be constructed. Coming latish to the game, I spent a happy afternoon on Christmas Eve knocking up a version in Processing. [all code here on GitHub.]

Background image.

Sketch 1, which  just consisted of a simple triangular tree, and one bauble showed that drawing an interesting tree was going to be a real pain, not to mention rather slow. So I went in search of a tree image I could use as a background onto which I could simply place the baubles and snow.  Luck led to the fine image above on the internet which was ideal. Now I could put it in a simple sketch.

Note: I needed a way to get the baubles positions. To avoid calculating it, I temporarily added the mousePressed() method and printed out the mouseX/Y co-ordinates to the debug screen while clicking on the right parts of the tree which I then cut and pasted back into the sketch.

Baubles.

I created a class of Bauble as an ArrayList of Points. Points is an inner class to handle the display of a particular bauble. There’s not much to Bauble; an add() method to add a Point and a display() method to run through the array and show the Points. They in turn have a show() method with a little bit of extra code to add a basic ‘twinkle’:

void show() {
int w = 10;
int h = 10;
if (frameCount % (int) random(4,10) == 0) {
    x = (int) random(4,10);
    y = (int) random(4,10);
}
ellipse(x,y,w,h); 
}

What this does is use the frame count to sure all the points don’t alter at the same time and each time one does it gets a random size. The effect is ok; but not quite like the real thing.

Snow.

SnowFlakes is again a class, with an inner called Flake. Flake creates a ellipse() of a random size and position on the screen. To make it look realistic each flake drifts slightly across the screen. SnowFlakes creates a (300) Flake array and then the fall() method runs through the array calling each Flake’s update() method. Again nothing too interesting apart from a little bit of code to add a slight drift depending on size of flake and slightly different speeds for each fall :

void update() {
 ellipse(xPosition, yPosition, flakeSize, flakeSize);
 //make them drift at slightly different speeds depending on size 
 if(direction == 0) {
 xPosition += flakeSize * 0.1;
 } else {
 xPosition -= flakeSize * 0.1;
 }

 // falls between 0.7 and 100%
 yPosition += (flakeSize * fall) + direction; 

 //reset if off the screen
 if(xPosition > width + flakeSize || xPosition < -flakeSize || yPosition > height + flakeSize) {
 xPosition = random(0, width);
 yPosition = -flakeSize;
 } 
 }

Untitled

At that point it was going pretty well. I’d some baubles and some mostly-realistic snow. The next step was to hook it up to the @cheerlights system. Initially, I was going to use the API text file to get the latest colour, until I realised from Twitter that the excellent @andysc had an MQTT version up as well.

MQTT+CheerLights

Now I’d done a previous blog post almost exactly a year ago about hooking up Processing and MQTT. Sill not updated to the latest library from IA92, but pretty sure it would still work, I was able to simply hook up that library to my new code to get the latest colour. The MQTTLib pde does the heavy lifting with a callback to send the new value to the Bauble class which gained a change() method. A bit of copy-paste to drop in the MessageHandler and add the right paths for the Cheer Lights MQTT broker/topic later I’d a working prototype which looks like so (support classes excluded).

//MQTT
import com.ibm.mqtt.MqttSimpleCallback;
private MQTTLib m;
private String MQTT_BROKER ="tcp://test.mosquitto.org:1883";
private String CLIENT_ID = "Tingenek23";
private int[] QOS = {0};
private String[] TOPICS = { "cheerlights"};
private boolean CLEAN_START = true;
private boolean RETAINED = false;
private short KEEP_ALIVE = 30;
PImage bg;
Baubles baubles;
SnowFlakes snowflakes;

void setup() {
size(640,480);
 bg = loadImage("snowtree.jpg");
 m = new MQTTLib(MQTT_BROKER, new MessageHandler());
 m.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
 m.subscribe(TOPICS, QOS);
 frameRate(30);
 noStroke();
 smooth(); 
 baubles = new Baubles();
 snowflakes = new SnowFlakes(300);
}
void draw() {
 background(bg);
 baubles.show();
 snowflakes.fall();
}

void mousePressed() {
 baubles.add(mouseX,mouseY);
 println(mouseX+","+mouseY);
}

private class MessageHandler implements MqttSimpleCallback {
public void connectionLost() throws Exception {
 System.out.println( "Connection has been lost." );
 //do something here
 m.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
 m.subscribe(TOPICS, QOS);
 }
public void publishArrived( String topicName, byte[] payload, int QoS, boolean retained ){
 String s = new String(payload);
 //Display the string
 println("New colour is " + s);
 baubles.change(s);
 }

We’re not done yet though. This works fine as a Java applet, but the latest Processing also supports an Android mode. Now you have to set this up, but once done, simply connecting up a phone, changing to Android mode and re-running, and I’ve got a version compiled onto my phone as well. [Remember to change ClientID before you run on your phone to avoid conflict with your desktop version]

Enjoy and have a great 2014!

Apache Camel for home monitoring.

It’s pretty interesting monitoring stuff at home and over time I’ve put together scripts to handle everything reliably so I can concentrate on the data. What it isn’t is pretty: there are several languages and libraries and some cron jobs to make it work, plus some stuff off the internet (I don’t fully understand). So, as part of a project with Apache ActiveMQ at work I was interested to find the sub-project Apache Camel: it’s a sort of integration engine, a switchboard between technologies that uses a ruleset to say what goes where.

Camel is intriguing because it opens the door to something, perhaps better, that can be more configured than built, but without being a dead-end if I need something particular. Now Camel is really designed as a framework using Java beans to specify the routing and manipulate the message contents. But, once built, with the appropriate connectivity, a Camel application can use just XML to describe the rules. So, in theory, I can take a Camel example, add the connectivity I want and then configure the system as needed. That last bit’s important to me; I don’t want to swap my scripts for a load of Java. The trade-off is that I may have to be more inventive to get it to do what I want via configuration.

The new world

The best place to start is with a test scenario:

  1. Get temperatures from the 1-wire system (exposed through OWFS as folders and files).
  2. Publish them onto a MQTT broker. I’ve blogged about doing this one already using Tcl, though I had to write the mqtt client code myself.
  3. Send them to MySQL for further analysis.

First steps is to get a copy of Camel and you’ll need Maven if you haven’t already got it installed.

  • Go to the examples and find the ‘camel-example-console’.
  • Open a terminal there – we’ll need it in a moment.
  • At the console type mvn compile to set it up and then mvm exec:java to start it using the local Jetty.

It’s really simple: you can type some text in and get it back in uppercase – not too exciting but lets us know it works. We’re just using it as a base.

For the test, we’ll need to extend the abilities of the example with things we want, like SQL and MQTT. So, open pom.xml and add the following dependencies:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-mqtt</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel</groupId>
  <artifactId>camel-script</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-jdbc</artifactId>
    </dependency>
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-sql</artifactId>
    </dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.9</version>
</dependency>

Call mvn compile again to recompile the example. From now on we’re just using configuration, so look in the directory target/classes/META-INF/spring for the file camel-context.xml.  This is where the magic happens.

At the moment the only route there is this one :

  <route>
      <from uri="stream:in?promptMessage=Enter something: "/>
      <transform>
        <simple>${body.toUpperCase()}</simple>
      </transform>
      <to uri="stream:out"/>
  </route>

It’s pretty clear: stream uri is the console, and we get an input message from it and output it back. In the middle we transform the content to uppercase using the “simple” language. Of course, <from> could be a lot of things: ftp, files, uri, rest, etc and <to> could be those same things or say twitter or sql or web sockets. Also, since Camel is a switchboard, we can route the same  <from> to a series of <to>s in parallel, or series and if we want to, we can transform the message itself.

Getting File data in

Back at our test (but excited I hope by the possibilities), step 1 is to get regular readings from the “files” that are in OWFS: welcome to the File component.

The File component polls directories for files and sucks them up into the route. The component is really designed to consume files not re-read the same one, so we have to tell it a few specific things about the OWFS directory via the uri:

  • noop=true. Don’t delete the file.
  • readLock=none Don’t try to lock the file.
  • idempotent=false Keep re-reading the same file (default is just once).
  • delay=10000 Only read the directory every 10s.
  • antInclude=*/temperature Use Ant syntax to just pick out temperature files.
  • recursive=true Dive into sub directory.

This gives us the following <from> :

<from uri="file:/Users/mark/work/test?noop=true&readLock=none&idempotent=false&delay=10000&antInclude=*/temperature&recursive=true"/>

Ok, so we now have readings trooping into our route. However, it’s worth noting that the message is currently a file handle object wrapped in an envelope that contains various headers and properties. That means that for both an MQTT and SQL destinations, we’ll need to get hold of the actual contents. It also means we can use the headers to send meta-data along with the file.

Headers and message bodies

At the moment all we have are a stream of readings, but we also have the CamelFilePath header telling us where in OWFS they came i.e. lounge/temperature, office/temperature etc. If we get the first folder from the path, we can use it to create a topic on the fly in the <out> route later e.g. temperatures/lounge and as a key in SQL. Luckily, Camel lets us inject a script language to do the lifting:

<setHeader headerName="topic">
<javaScript>
    request.getHeader('CamelFileName').split('/')[0].toLowerCase();
</javaScript>     
</setHeader>

Here’s another one we’ll need later using the in-built <simple> (it really is) language:

<setHeader headerName="UTFDateTime">
<simple>
    ${date:now:yyyy-MM-dd hh:mm:ss}
</simple>
</setHeader>

Lastly, we need to make sure that the message body is the current reading as a string:

<transform>
<simple>${bodyAs(String)}</simple>
</transform>

So, we’ve the message body set up and the two ‘variables’ we’ll need later. Time to do some routing!

Sending to MQTT

First stop is the MQTT broker I’ve got running. I won’t go into the ins-and-outs of brokers; they’re very useful, you’ll just have to believe me.

There is a MQTT Component, so it should be as simple as this to talk to the local broker:

<to uri="mqtt:test:?publishTopic=temperatures.${header.topic}"/>

Sadly, that’s not the case. Camel doesn’t let you do this sort of thing in XML. All is not lost however. We can use the Recipient List from the Patterns routing recipes. Recipient List lets you send to a given list of outputs but more importantly, lets you use scripting to build them. It’s a bit of a bodge to make a list of one thing but it works.

Since it’s a bit more complicated now, we’ll split out the MQTT stuff into a sub-route (via the direct: uri) that we can call from the main one. Anyone who’s spent any time writing Ant scripts will recognise this game right away…

<route id="R1">
    <from uri="direct:mqtt"/>
        <recipientList ignoreInvalidEndpoints="true" >
            <javaScript>'mqtt:test?publishTopicName=temperatures.' + request.getHeader('topic');
            </javaScript>                               
        </recipientList>
</route>

Sending to MySQL

We can use exactly the same idiom to send the data into SQL. We’ll use the SQL Component in the same way, but first we need a bit of setup to go in above the <routes> to tell Camel about the jdbc connection to MySQL (this apparently is standard Spring):

<bean id="myDS" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://localhost:3306/1wire" />
    <property name="username" value="user" />
    <property name="password" value="password" />
</bean>

There are fancier ways to do this with pooling etc, but this will do for us. All that’s left is to make up an INSERT statement to drop readings into the 1wire database, where we’ve got these three fields in the readings table:

  • reading  – FLOAT.
  • topic – VARCHAR(32).
  • tdate – DATETIME.

Again we’ll use the subroutine approach to factor out this destination and to keep the xml in manageable chunks:

<route id="R2">
       <from uri="direct:mysql"/>
             <recipientList ignoreInvalidEndpoints="true" >
                <javaScript>
                  "sql:insert into readings (reading,topic,tdate) values(" + request.body + ",'" + request.getHeader('topic') + "','"+ request.getHeader('UTFDateTime') + "')?dataSourceRef=myDS";  
                </javaScript>                                
            </recipientList>  
   </route>

That’s it: call the named datasource with the statement.  Now it’s time to put it all together.

Pipelines and Multicast

One gotcha from Camel is that multiple endpoints are chained together by default. So if you just list <to a> <to b> <to c> then the return from a is sent to b etc. This can lead to some very strange results, depending on your component! What we need instead is a <multicast>. This sends a copy of the message to each recipient. Knowing that, we can construct the whole route like so:

<!-- Poll for temperatures and route to mqtt and sql -->      
<route id="GetTemperatures">
<from uri="file:/Users/mark/work/test?noop=true&amp;readLock=none&amp;idempotent=false&amp;delay=10000&amp;antInclude=*/temperature&amp;recursive=true" />
<setHeader headerName="topic">
  <javaScript>
      request.getHeaders().get('CamelFileName').split('/')[0].toLowerCase();
  </javaScript>     
</setHeader>
<setHeader headerName="UTFDateTime">
    <simple>${date:now:yyyy-MM-dd hh:mm:ss}</simple>
</setHeader>
<transform>
    <simple>${bodyAs(String)}</simple>
</transform>
   <multicast stopOnException="true"> 
          <to uri="direct:mqtt"/>
          <to uri="direct:mysql"/>
   </multicast>        
</route>
<!-- Push the message to MQTT using topic in header -->
  <route id="Sub_1">
            <from uri="direct:mqtt"/>
            <recipientList ignoreInvalidEndpoints="false" >
                <javaScript>
                    'mqtt:test?publishTopicName=temperatures.' + request.getHeader('topic');
                 </javaScript>                               
            </recipientList>
   </route>

<!-- Push the message to SQL using topic and date in header --> 
   <route id="Sub_2">
       <from uri="direct:mysql"/>
             <recipientList ignoreInvalidEndpoints="false" >
                <javaScript>
                  "sql:insert into readings (reading,topic,tdate) values(" + request.body + ",'" + request.getHeader('topic') + "','"+ request.getHeader('UTFDateTime') + "')?dataSourceRef=myDS";  
                </javaScript>                                
            </recipientList>  
   </route>

If you’ve got MQTT and MySQL, then you can add this route to the config file and restart the example to see it in action. If you don’t you can use the <log> or <stream> endpoints instead to just print out the results to the console.

Thoughts and ideas

So, 35’ish lines of Camel config gets us a polling file reader and outputs to MySQL and MQTT – that’s not bad. Also we now have a single place to do logging and a fairly simple idiom that can be expanded if other components are needed. Moreover, if as the base, we’d used something like the servlet-tomcat example to built a web app, we’d have the basis of a neat little switchboard with a web interface thrown in. Everything is rosy? not quite:

  • It’s evident that the Camel docs lean heavily towards the java, beans, maven side rather than the config side. It can be difficult to get information. For example it was a few hours of internet wandering before I found out about Recipient List.
  • The lack of variables or replacing properties in uri’s makes routing less straightforward than it should be. Luckily the user groups are pretty friendly.

Having said that it works, and if a Twitter output was needed or a websocket feed than it’s not much harder to add them.

Ok, so is this easier than using a scripting language I know well? Obviously, not at first but as someone who does a lot of XML anyway it’s just as viable. Now, if I didn’t know how to write scripts, then this idea becomes pretty useful.

I could see perhaps a pre-compiled version of Camel webapp with common routes set up being a really easy start point for the home measurement community (uncomment to us X etc) and perhaps more approachable than say ‘download ruby and then do…’.

Maybe what the Internet of Things needs is a good switchboard.

MQTT with Processing

For me, being more engineer and less artist, Processing is a handy tool for data visualisations.: easy to use, but with the full power of java under the hood if it’s needed.

Matchboxes
by Schoenerwissen.

(courtesy of Processing.org)

However, if you want to sketch marvellous stuff like Matchboxes above, you need to feed in information from an external source; twitter, arduino, databases, web, hardware etc and there are many great libraries to make it easier. However, for each of these data streams, you need to program an interface into the sketch; it’s a point solution. It’s easy to get to a place where those libraries, threads, urls and api keys just start making life complicated. Maybe it would be better if we acted a bit more ‘Internet of Things‘ ?

Perhaps our sketch could talk to an MQTT broker instead through a single interface? All the widgets and datastreams could publish their data and the sketch could subscribe to the ones it needs. If a new data source is wanted, get it to publish itself as well and then it’s not only available to the sketch, but to everything else that might want the data as well. Of course, it works the other way as well, if the sketch can publish it’s data then it’s available to another sketch or an arduino etc.

Lets start with a really simple, slightly rubbish, sketch that bounces ‘Hello World’ about:

private float x = 10; // x location of square
private float y = 0; // y location of square
private float speed = 0; // speed of square
private static final float gravity = 0.1f;
public String current;

public void setup() {
 size(300, 300);
 background(0);
 newmessage("Hello World");
 }
public void draw() {
 background(255);
 fill(0);
text(current, x, y);
 y = y + speed;
 speed = speed + gravity;
 x = frameCount % width;
if (y > height) {
 if (speed < 0.2)
 speed = 0;
 else
 speed = speed * -0.95f; 
 }
}

public void newmessage(String s) {
	current = s;
	speed = 0;
	y = 0;
	fill( random(255), random(255), random(255), random(255)); 
	}

The job is to replace the text with data from an MQTT broker by getting MQTT to call newmessage(). Handily, the MQTT system comes with a java client, a jar we can add and a nice simple interface we can use. If you’re following along using the Processing IDE (source code for sketches is here but you’ll need wmqtt.jar as below):

  1. Download the IA92 code from IBM, unzip it and find wmqtt.jar.
  2. Find your Processing/libraries folder (Cmd-K) and create wmqtt/library
  3. Put wmqtt.jar into it.
  4. Create a new tab in you sketch call MQTTLib and drop in the following:
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttSimpleCallback;
import com.ibm.mqtt.MqttException;
 public class MQTTLib {
 private MqttSimpleCallback callback;
 private MqttClient client = null;

 MQTTLib(String broker, MqttSimpleCallback p) {
 callback = p;
 try {
 client = (MqttClient) MqttClient.createMqttClient(broker, null);
 //class to call on disconnect or data received
 client.registerSimpleHandler(callback);
 } catch (MqttException e) {
 System.out.println( e.getMessage() );
 }
 }

 public boolean connect(String client_id, boolean clean_start, short keep_alive) {

 try {
 //connect - clean_start=true drops all subscriptions, keep-alive is the heart-beat
 client.connect(client_id, clean_start, keep_alive);
 //subscribe to TOPIC
 return true;
 } catch (MqttException e) {
 System.out.println( e.getMessage() );
 return false;
 }
 }

 public boolean subscribe(String[] topics, int[] qos ) {
 try {
 //subscribe to TOPIC
 client.subscribe(topics, qos);
 return true;
 } catch (MqttException e) {
 System.out.println( e.getMessage() );
 return false;
 }
 }

}

This new class takes as a constructer, a URL string for a broker and the class that gets called when either the connection is dropped, or we have new data. We also have  connect and subscribe methods to, well connect and subscribe. All this code is pretty short on error handling, but is sufficient for prototyping. All we have to do now is alter our main script to get it all going.

The key here is providing the callback so our script is informed when new data is ready. For that we need a class that implements MqttSimpleCallback, so we add an inner class into our main sketch:

private class MessageHandler implements MqttSimpleCallback {
public void connectionLost() throws Exception {
 System.out.println( "Connection has been lost." );
 //do something here
 }
public void publishArrived( String topicName, byte[] payload, int QoS, boolean retained ){
 String s = new String(payload);
 //Display the string
 newmessage(s);
 } 

 }

This class has only two methods, one gets called if we lose the connection – it should reconnect etc in the real world. The second gets called when the broker has data for us from a subscription topic. In our code we simple set our display string to the payload.

Last step then, is to alter the sketch in setup to create the MQTTLib class, connect and subscribe to something, so now we have:

import com.ibm.mqtt.MqttSimpleCallback;

//MQTT Parameters
private MQTTLib m;
private String MQTT_BROKER ="tcp://test.mosquitto.org:1883";
private String CLIENT_ID = "TestProcessing";
private int[] QOS = {1};
private String[] TOPICS = { "$SYS/broker/clients/active"};
private boolean CLEAN_START = true;
private short KEEP_ALIVE = 30;
private float x = 10; // x location of square
private float y = 0; // y location of square
private float speed = 0; // speed of square
private static final float gravity = 0.1f;
public String current;
public void setup() {
 size(300, 300);
 background(0);
 // set up broker connection
 m = new MQTTLib(MQTT_BROKER, new MessageHandler());
 m.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
 m.subscribe(TOPICS, QOS);
 current = "Hello World";
 }
public void draw() {
 background(255);
 stroke(0);
text(current, x, y);
 y = y + speed;
 speed = speed + gravity;
 x = frameCount % width;
if (y > height) {
 if (speed < 0.2)
 speed = 0;
 else
 speed = speed * -0.95f;

 }
}

That’s it. Every time the broker gets a message published to the topic, it tells the client, running in it’s own thread, and it triggers the callback. That resets the display message.

There are several open MQTT brokers on the web we can talk to. There is a test mosquitto broker chosen above that we can use and the topic is the number of current connections. This topic is useful because it changes fairly rapidly. Cosm have another excellent one worth looking at. A couple of other things:

  • The broker needs a unique Client ID for each connection, so that probably needs changing, otherwise it throws the duplicate off.
  • Clean Start signals the broker to drop any topics we’ve subscribed to. Our connection is treated as totally new.
  • Keep Alive is the heartbeat signal time to the broker, e.g we ping it every N secs so it knows we’re still there.

To Do

Well, the code could be better, we can’t publish yet, and the Qos stuff isn’t explained. Also, the IA92 code is deprecated and we should be using Paho code instead. But, it’s enough to be getting on with and it should be fairly simple to retrofit an existing sketch to get it’s data from an MQTT broker.

1-wire with MQTT

Background

Over the last few weeks I’ve been experimenting with the Dallas 1-wire system for sensors around the house/garden. Although I originally thought of using Arduino/JeeNodes I couldn’t make the figures work. 1-wire may not be wireless, but it’s inexpensive, simple and robust. Basically, you need a run of 2/3-core wire with sensors on it where you need them: most people seem to use phone cable and RJ12 connectors. Each sensor has a unique address and for short’ish runs can all work on parasitic power from the bus – I got mine from HomeChip. They also do the bus controller that sits on one-end of the wire and has a usb connector so you can talk to the sensors on your server. By the way, it’s not all sensors, there’s A/D converters, switches etc and the bus is of course 2-way. See here for some idea of the things you can do.

Now this is great, and don’t get me wrong, of course you could run 1-wire sensors from an Arduino. What sold the more retro solution to me was the OWFS or 1-wire file system. This is a rather neat bit of software that turns your 1-wire bus sensor data into a file system. Neat! Now instead of an API etc, all the normal tools, scripts etc that work with files work with 1-wire and getting a temperature is as simple as reading the contents of

 /mnt/1wire/28.E1A6CC030000/temperature

Publishing to MQTT

A bit of installing and a modicum of soldering got me a working system on my Viglen MPC with a couple of sensors up and running. The common place to put the resulting data was into RDDTool, however it struck me that the filesystem idea in OWFS is a good match for the topic system in MQTT. I could simply push the file structure into MQTT and then subscribe to the topics that interest me, again using tools I already have and various cool ideas that I’ve read about on mqtt.org . And, as it’s two way, there’s nothing to stop me later using MQTT to publish data myself back to those same sensors or switches. Nothing of course stops me doing something else as well. In fact I’ve a cron job that spits everything into MySQL every 10 mins as well so I can analyse it in R stats later.

For the MQTT side, the only issue I had was to write a small Tcl library so that I could publish the data. That’s only because I like to use Tcl, there are already  libraries for Java, Perl and numerous others on Mqtt.org. Once I had a simple lib that could publish data at Qos 0, I just needed a simple script to find all the temperture sensors and publish their paths and values:

#!/usr/bin/env tclsh
lappend auto_path [pwd]
package require mqtt 1.0
#Using tcllib for 'cat', you could use open instead
package require fileutil
#Mount for 1-wire
set base "/mnt/1wire"
set mqtt_conn ""

#Get paths in this dir that are files
proc get_tree {d} {
 foreach s [glob -directory $d -types f *] {get_file $s}
 }
#Get contents of path
proc get_file {d} {
 global mqtt_conn
 set reading [string trim [fileutil::cat $d]]
 puts "Publishing $reading to $d"
 mqtt publish $mqtt_conn $d $reading
}

#Get all thermometers i.e. 28.xxxxxxxxxxx
proc get_thermometers {} {
global base
puts "Getting directories"
foreach d [exec find $base -maxdepth 1 -name 28.* ] {
#Get tree for each dir
 puts "Looking for $d"
 get_tree $d
 }
}

set mqtt_conn [mqtt connect localhost 1883 1Wire]
mqtt publish $mqtt_conn $base "[clock format [clock seconds]]"
get_thermometers
mqtt disconnect $mqtt_conn

With the RSMB broker running on the little Viglen MPC, which is called byron, which also has OWFS on, I could now publish my OWFS data and a test with stdoutsub tool on the laptop showed that I could see all the topics and data if I subscribed to /mnt/# .

./stdoutsub /mnt/# --host byron --qos 0

Fixing Aliases

One problem in the initial data was sensors are by default named by id and 28.2823FBE00300008A isn’t very semantic. Luckily, OWFS has an alias system that lets you link ids to human-readable names with a simple file. Once setup, it changes the OWFS file system and the folders magically get renamed to /office, /outside etc. Handily, if you’ve already got scripts talking sensor ids instead, they’ll still work as well!

All the Tcl script needed to use aliases was an initial look in /settings/alias/list for the list of sensors, rather than finding everything starting 28.* This gives a replacement get_thermometer() function like so:

proc get_thermometer {} {
global base
foreach d [exec cat "$base/settings/alias/list"] {
  set alias_dir "$base/[lindex [split $d =] 1]"
  get_tree $alias_dir
  }
}

If you run the script again you’ll get the more readable listing like so:

Publishing 28E1A6CC03000060 to /mnt/1wire/office/address
Publishing office to /mnt/1wire/office/alias
Publishing 60 to /mnt/1wire/office/crc8
Publishing 28 to /mnt/1wire/office/family
Publishing 21.5 to /mnt/1wire/office/fasttemp
Publishing E1A6CC030000 to /mnt/1wire/office/id
Publishing FFFFFFFFFFFFFFFF to /mnt/1wire/office/locator
Publishing 0 to /mnt/1wire/office/power
Publishing 60000003CCA6E128 to /mnt/1wire/office/r_address
Publishing 000003CCA6E1 to /mnt/1wire/office/r_id
Publishing FFFFFFFFFFFFFFFF to /mnt/1wire/office/r_locator
Publishing 21.4375 to /mnt/1wire/office/temperature
Publishing 21.5 to /mnt/1wire/office/temperature10
Publishing 21.5 to /mnt/1wire/office/temperature11
Publishing 21.4375 to /mnt/1wire/office/temperature12
Publishing 21.5 to /mnt/1wire/office/temperature9
Publishing 75 to /mnt/1wire/office/temphigh
Publishing 70 to /mnt/1wire/office/templow
Publishing DS18B20 to /mnt/1wire/office/type

As you can see, even a small temperature sensor puts out all sorts of data as well as the temperature in various resolutions. One of the beauties of the MQTT side is this complexity can be  simplified to just what’s required. So, by subscribing to /mnt/1wire/+/fasttemp for instance you get:

/mnt/1wire/outside/fasttemp 1
/mnt/1wire/office/fasttemp 21.5

Left to do.

There’s a few things left to do:

  • I’m only sending in Qos 0. It really needs to be Qos 1.
  • I need to set the Retained flag so last temperature is held.
  • I’m not sure I need everything from a sensor.
  • Subscribing is only the beginning – publishing is the end goal so I can interact with the 1-wire system.

But, it’s a start.

Resources.

If you’re interested in 1-wire and you’ve got Debian/Ubuntu, then be grateful to Cyril Lavier (@davromaniak) who has a neatly packaged repository otherwise get the sources from OWFS below and break out the build tools.

The Tcl script and simple MQTT lib at all here.

All the MQTT stuff is over at MQTT.org

All the OWFS stuff is at OWFS.org

I’ve since found out via Twitter that Nicholas Humphrey (@njh) has a fine daemon that does this directly. You can get his code from GitHub here.

MQTT with Lotus Notes

Previously, I’ve used Apache Ant with MQTT Publish/Subscribe to keep track of overnight processes. Since then (2 years – amazing) The list of things talking to the sturdy RSMB broker has grown steadily. What it has lacked up ’til now was a means to get Lotus Notes messages from LotusScript agents.

One of the issues with Notes is that you can’t open network connections, so normal TCP/IP games are out. However, you can run java agents and they can load external libraries. The majority of agents are however written in Notes’ own scripting language,  LotusScript, and that’s the situation here. I didn’t want to rewrite a load of stuff and I wanted something that could be dropped into an existing agent.

What I didn’t know about was the star of this show:  LS2J the “LotusScript to Java Bridge”  which allow you to interact with methods and properties of a java class from LS as if they were native. It’s a clever bit of kit, with a few caveats and the following bits of code show how it works for a simple MQTT class.

If you want to download it, I’ve wrapped all this up in a single NSF for convenience.

Java Class

So, first off you need an empty database and a new Java Script Library entry. I’ve called mine MQTT_Notes. You’ll need to use ‘Edit Project’ to pull in the wmqtt.jar file from the IA92 Java Client into the database.

I’ve kept the java side very simple. There are three methods:  initialize, execute and disconnect to send a QoS 0 publication to a broker. You’ll also notice I’m not doing any try/catch error handling. This is because the errors are being sent up to the LS level above.

The code is listed below, any real java coders, please feel free to tut and shake your heads.

import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;

public class MQTT_Notes {
    private final static boolean CLEAN_START = true;
    private final static boolean RETAINED = false;
    private final static short KEEP_ALIVE = 30;
    private final static short QOS = 0;
    private String name="mqttnotes";
    private String url="tcp://localhost:1882";
    private String topic="mqttnotes";
    private String payload="hello from notes";
    private MqttClient client=null;

public void initialize(String strURL, String strName) throws MqttException {
    name = strName;
    url = strURL;
    client = new MqttClient(url);
    client.connect(name, CLEAN_START, KEEP_ALIVE );
}

public void execute(String strTopic, String strPayload) throws MqttException {
    topic = strTopic;
    payload = strPayload;
    if (client != null) {
        client.publish(topic,payload.getBytes(),QOS,RETAINED);
    }
}

public void disconnect() throws MqttException {
    if (client != null) {
        client.disconnect();
    }
}

}

LotusScript Class

Next up, you’ll need a new LotusScript Library. I’ve called mine Class_MQTT. This wraps up the calls into LS2J and our java class  into a handy drop-in LotusScript class. The key lines are in the Options section:

  1. UseLSX “*javacon” which loads the LS2J bridge code.
  2. Use “Java_MQTT” which loads our java code from above.

The code is a standard LS Class, called MQTTSimple, with the addition of some new types.

  1. JavaSession is the link to the Java machine. It lets us find the classes we want to use. In this case “MQTT_Notes”.
  2. JavaClass is the handle to the class.
  3. JavaObject is the handle to the running object, using the class.
  4. JavaError lets us get errors from the underlying java into the LS On Error model.

Notice that there is an explicit Delete method in the code which is called when the LS object is destroyed to make sure the MQTT connection is closed. The code is listed below:

'Class_MQTT:

Option Public
Option Explicit
Uselsx "*javacon"
Use "Java_MQTT"

Class MQTTSimple
Private MQTTObj As JavaObject
Private MQTTError As JavaError
Private js As JavaSession

Sub New(url As String, title As String)
    Set js = New JavaSession
    Dim MQTTClass As JavaClass

    On Error Goto ErrHandler
    'Get our java class
    Set MQTTClass = js.GetClass ("MQTT_Notes")
    'Instantiate it.
    Set MQTTObj = MQTTClass.CreateObject ()
    'Call our initialize method to get a MQTT connection
    'Takes connection url and connection handle
    Call MQTTObj.Initialize (url, title)
    Exit Sub

ErrHandler:
    Set MQTTError = js.getLastJavaError()
    Print MQTTError.ErrorMsg
    Call js.ClearJavaError()
    Resume Next
End Sub

Sub Delete
    'Called as the object is destroyed.
    'Makes sure MQTT connection is shut.
    On Error Goto ErrHandler
    Call MQTTObj.disconnect()
    Exit Sub

ErrHandler:
    Set MQTTError = js.getLastJavaError()
    Print MQTTError.ErrorMsg
    Call js.ClearJavaError()
    Resume Next
End Sub

Sub Exec(topic As String, payload As String)
    'Publish given payload to given topic
    Call MQTTObj.execute(topic,payload)
    Exit Sub

ErrHandler:
    Set MQTTError = js.getLastJavaError()
    Print MQTTError.ErrorMsg
    Call js.ClearJavaError()
    Resume Next
End Sub

End Class

This code gives us a new LS MQTTSimple class which takes a URL and connection name on creation.
It has one visible method, exec, which takes a topic and payload string.

Notice that the errorhandling in the code uses the standard LS error handler, but gets it’s error information from the underlying java.
It’s not quiet seamless though as you have to cancel the error yourself.

Test Script

Lastly, there is a test script to test it all out. To use the MQTT class, simply add it into the Options section. The little agent below opens the mail file of the user and publishes it’s size in Mb. Assuming you have a broker running and some sort of client subscribed, the following should work. I used the RSMB broker on my laptop on port 1882  and the IA92 client subscribed to the Notes topic.

Option Public
Option Explicit
Use "Class_MQTT"

Sub Initialize
    Dim mqtt As MQTTSimple
    Dim db As NotesDatabase
    'Start the connection: URL and Connection string
    Set mqtt = New MQTTSimple("tcp://localhost:1882","NotesAgent")
    'Open user's mail file
    Set db = New NotesDatabase( "", "" )
    Call db.OpenMail
    'Publish database size in Mb to MQTT in the Notes topic at Qos0
    Call mqtt.Exec("Notes", "Mail database " & Format$(db.Size/1000000,"000,###")) )
End Sub

This is of course a bare bones system, but I can see a lot of uses: Mail-in databases, overnight agents, maintenance tasks etc. The best thing is that MQTT allows me to tie in all these disparate systems easily.

MQTT Ant task

Building an Ant Task

With an interest in publish-subscribe with MQTT , it seems a natural choice to try and get  some other tools I use to talk to the microbroker we’ve got running. I chose Ant, partly because I use it for loads of things and because there is a handy java client library for MQTT to make things easier.

Continue reading