Creating MarkLogic content with Apache Camel

MarkLogic with JMS?

A few weeks back we started looking at MarkLogic  at work as a possible replacement for our mix of Cocoon and eXistDB systems. One of the side issues that’s been raised is how we would get messages to/from our ActiveMQ EIP system. Now, MarkLogic doesn’t have a JMS connector, although to be fair, it seems to have a pretty good system for slurping up content from URL’s, files etc. However, there is a Java API, which gave me the idea of using my old friend Apache Camel.

If I could get Camel to talk to MarkLogic then, not only could I talk to any sort of queue, I could also pull content into MarkLogic from the huge range of other things Camel will talk to, and I would get all the EIP magic thrown in for good measure.

The Camel Routes.

The easiest way to test this was of course to set up a simple Camel project to slurp up some data. The most expedient  producer I could think of was to get an RSS feed from somewhere; JIRA being the most obvious, as it would reliably produce something new at reasonably short intervals. This would need transforming into XML and pushing into Marklogic via a queue. The MarkLogic side would be handled by their Java API mounted as a Bean, in my case written in Groovy so I could work with it as a script. So much for the basic plan. Here’s the start route as Camel sees it :

<route id="jira">
    <from uri="rss:https://issues.apache.org/jira/sr/jira.issueviews:searchrequest-xml
    /temp/SearchRequest.xml?jqlQuery=&amp;tempMax=100&amp;delay=10s"/>
    <marshal><rss /></marshal>
    <setHeaderheaderName="ml_doc">
        <simple>/twitter/${exchangeId}</simple>
    </setHeader>
   <to uri="seda:mlfeed"/>
</route>

The Camel RSS module calls a basic Jira RSS feed, in this case, polling every 10 seconds. I’ve used the module defaults, so each entry is separated out of the feed and passed down the route one at a time. At this point the message body is a Java SyndFeed object, not XML, so it has to be ‘marshalled‘. Now the message body is an XML string ready for upload, but before I can send it I need to make a URI for MarkLogic to use. Each run of the route or ‘exchange’ has a unique id, so I’ve used that via the inbuilt <simple/> language. Alternatively, I could have also parsed something out of the content, like the Jira id or made something up like the current date-time. Finally, the message is dropped into a queue via the SEDA module.
Note, this in-memory queue type isn’t persistent, like JMS or ActiveMQ, but it’s built into camel-core, so was just handy.

There is another route to pull messages from the queue and into MarkLogic.

<route id="marklogic">
    <from uri="seda:mlfeed"/>
    <to uri="bean:marklogic?method=upload_xml"/>
    <!-- <to uri="file:outbox"/> -->
</route>

This route takes messages off the queue and passes them to a Bean written using Camel’s Groovy support. Lastly there’s an optional entry to put the message into a file in an /outbox folder. This is handy if you can’t get the MarkLogic bit working and want to look at the input: comment out the bean instead and just drop the data into files.

The Groovy Code.

The Groovy Bean is mounted in the configuration file, along with some parameters needed to connect to MarkLogic.
Note. To get this working, you’ll need to supply your own parameters, and have a MarkLogic REST server listening, as REST is the basis of their API. You can get instructions here.

<lang:groovy id="marklogic" script-source="classpath:groovy/marklogic.groovy">
<lang:property name="host" value="YOURHOST" />
<lang:property name="port" value="YOURPORT" />
<lang:property name="user" value="YOURNAME" />
<lang:property name="password" value="YOURPASSWORD" />
</lang:groovy>

Once the Bean is running, you simple call it’s methods in the route. You get as input the entire Exchange, so you have access to everything, as well as the ability to alter it as you like. In this case, I’ve simply written the data out and not altered the massage at all. In real life it would probably be more complex. The salient bit of Groovy code (the Get’s for the parameters are not shown) is shown below. This is the MarkLogic basic example with a couple of mods to a) Get the header that has the URI in, and b) Get the body of the input Message as an InputStream:

public void upload_xml(Exchange exchange) {
    // Get the doc url from Camel
    String docId = exchange.getIn().getHeader("ml_doc");
    if (docId == null) docId = "/twitter/" + exchange.getExchangeId();
    // create the client
    DatabaseClient client = DatabaseClientFactory.newClient(host, port,
 user, password,
 Authentication.DIGEST);

    // try to make use of the client connection
    try {
        XMLDocumentManager XMLDocMgr = client.newXMLDocumentManager();
        //Get an InputStream from the Message Body
        InputStreamHandle handle = new InputStreamHandle(exchange.getIn().getBody(InputStream.class));

        //Write out the XML Doc
        XMLDocMgr.write(docId,handle);
        } catch (Exception e) {
            System.out.println("Exception : " + e.toString() );
        } finally {
            // release the client
            client.release();
        }
}

Note.  I’ve connected and disconnected to the MarkLogic database each time. I’m sure this can’t be efficient in anything but a basic use case, but it will do for the present. There’s nothing to stop me creating an Init() method that could be called as the Bean starts to create a persistent connection if that’s better, but all the examples I could find seem to do it this way [If I’ve made any MarkLogic Java API gurus out there wince, I’m sorry. Happy to do it a better way].

Putting it all together.

If you’ve got a handy MarkLogic server, you can try this all out. I’ve put the code here on GitHub as a Maven project, and all you need to do is pull it and run “mvn compile exec:java”. Ten seconds or so after it starts, you should see something similar to this on the console:

[l-1) thread #1 – seda://mlfeed] DocumentManagerImpl INFO Writing content for /jira/ID-twiglet-53205-1398451322665-0-2
[l-1) thread #1 – seda://mlfeed] DatabaseClientImpl INFO Releasing connection

On the MarkLogic side, if you go to the Query Console you can use Explore to look at your database. You should see the files in the database – query them to your heart’s content.

  • I’m using MarkLogic 7 and Java API 2.0-2 with Camel 2.12.0.
  • If you want to change the routes, you’ll find them in src/resources/camel-context.xml.
  • The Groovy script is in resources/groovy/marklogic.groovy.
  • Remember, if you want to use modules outside of camel-core, you’ll need them in the pom.xml!

Bells and Whistles.

Now I’ve got the basic system working there are a couple of other things I could do. As the MarkLogic component reads from the end of a queue, I could for instance add another route that puts messages into the same queue from another source, for example Twitter (for which there’s a module) assuming I had appropriate twitter oauth ‘keys’, like so:

<route id="twitter-demo">
    <from uri="twitter://search?type=polling&amp;delay=60&amp;keywords=marklogic&amp;consumerKey={{twitter.consumerKey}}&amp;consumerSecret={{twitter.consumerSecret}}&amp;accessToken={{twitter.accessToken}}&amp;accessTokenSecret={{twitter.accessTokenSecret}}" />
    <setHeader headerName="ml_doc">
        <simple>/twitter/${body.id}</simple>
    </setHeader>
    <log message="${body.user.screenName} tweeted: ${body.text}" />
    <to uri="seda:mlfeed"/>
</route>

Of course, once you start doing that, you need someway to make sure you can throttle the speed that things get added to the queue to avoid overwhelming the consumer. Camel has several strategies for this, but my favourite is RoutePolicy. With this you can specify rules that allow the input routes to be shutdown and restarted as necessary to throttle the number of in-flight exchanges. You simple add the Bean like so with an a approprite configuration:

<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
 <property name="scope" value="Context"/>
<property name="maxInflightExchanges" value="20"/>
<property name="loggingLevel" value="WARN"/>
</bean>

and then add this policy to any route you wish to control, like so:

<route routePolicyRef="myPolicy">

Once there are more than 20 messages in-flight (‘context’ means all routes/queues) the inbound routes will be suspended. Once the activity drops below 70% (you can configure this) they’ll start up again – neat.

This only really skims the surface. Camel is a marvellous system and being able to use it to push content to MarkLogic is very handy (if I polish the code a bit).  Wiring routes up in Camel is so much easier, flexible and maintainable, than writing custom, one-off code.

Finally, of course, there’s nothing to say you couldn’t have a route that went away, got some key which was then sent to MarkLogic via Bean to retrieve some data instead and which that then got added to the body (Content Enrichment in EIP). That’ll have to be the subject of another day.

Resources.

  • MarkLogic Developer. http://developer.marklogic.com
  • Apache Camel. http://camel.apache.org
  • Enterprise Integration Patterns (nice hardback book) http://www.eaipatterns.com
Advertisements

Camel, Groovy and Beans

Last year I did an article on using Apache Camel as a switchboard for home monitoring – and a bit of a nod to IoT perhaps. One of the decisions I’d made was, as far as possible, I’d use Spring XML to configure rather than compile my solution, as I was interested in whether I could use Camel as a general tool. [more on Artisan use and tools here] So far, it’s worked out pretty well, until I wanted to upload some files via HTPP POST.

The plan.

Untitled

I’ve a Raspberry PI with a camera module, to take stop-motion images. There’s not much room on the PI, so it’s attached to the WiFi and uploads the photo after taking it, every minute or so. My Camel engine (2.12) is sitting on the server as a servlet inside a copy of Tomcat 7.

Now you might say that all I need is a bit of PHP or a servlet or similar to just dump the file. But, if I did that, not only would I get a ‘point-solution’ just for this need, I’m also reducing my choices as to what  can be done with the data afterwards. What if I want to send every 100th one to Flickr or SMS my phone if the PI stops sending? If I can pull the image (and it’s meta-data) into a Camel route, not only can I just save them, they’re ready for anything else I might want/think of to do with them later.

The technical problem is that the Camel Servlet component I’m using works fine for GET as you, well get, the parameters as Headers. If however you POST, as you need to to upload a file, then you get one long stream of data in the message body with everything mashed together as “Multipart Form-data” or RFC1867. What I need is a way to parse out the image file and headers myself and there’s even a Java library to do it called Commons File Upload. In the normal scheme of things you would create a Java Bean which would be called in the pipeline to do the work for you. But it seems a little against the configure-only theme, so, I need a way to write code, without writing “code”, i.e. script it in.

UntitledNote: This Bean doesn’t need to save the image, just move it into the body in a format where I can use modules like File to save it or route it somewhere else later.

Going Groovy

In my previous article, I’d already used a bit of Javascript in my route, just in-line with the XML. Now another Camel supported script language is Groovy. If you’ve not come across Groovy before, it’s worth a look, Java engine underneath but with the rough edges taken off and a rather nicer syntax. Happily, it also understands straight Java as well as cool Groovy constructs like closures, so you can simply drop code in and it works. You can use Groovy in-line in predicates, filters, expressions etc in routes and everything will be lovely, but it is also supported by Spring (and if you want, you can read about Dynamic Language Beans here) so you can create a Bean with it which seems just the ticket.
Dynamic, flexible and easily shared, drop-in Beans. That’s more like it.

I’m using the Camel Servlet-Tomcat example as the basis for my current engine. To use Groovy, you have to have the following added to your pom.xml :

<dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-script</artifactId>
</dependency>

 <dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-groovy</artifactId>
</dependency>

Build the war file and deploy it onto Tomcat. Try the built-in example to make sure it’s all working. Next add the following route in camel-config.xml (in WEB-INF/classes) :

<route>
    <from uri="servlet:///gmh" />
        <setBody>
        <groovy>
            def props = exchange.getIn().getHeaders()
            response ="&lt;doc&gt;"
            props.entrySet().each {
                response += "&lt;header key='" + it.getKey() + "'&gt;" + 
                it.getValue() +"&lt;/header&gt;"
            }
            response += "&lt;/doc&gt;"
            response
        </groovy>
        </setBody>
    <setHeader headerName="Content-Type">
    <simple>text/xml</simple>
    </setHeader>
</route>

Now try http://localhost:8080/[YOUR SERVLET]/camel/gme and you should get a nice XML list of headers and a warm feeling that Groovy is working.

Adding Beans

The problem with adding code in-line is that it not only becomes unwieldy pretty easily, you’re also limited in how you can put it together. This is where Beans come in. They not only allow you to hide/reuse even share groovy recipe code, but work at the Exchange level giving you far more options. To set up the code snippet above as a Bean is pretty easy. a) Create a folder in /classes called /groovy. b) In that create a file (or download) called gmh.groovy.
c) Into that file drop the following code:

import org.apache.camel.Exchange
class handler {
    public void gmh(Exchange exchange) {
        def props = exchange.getIn().getHeaders()
        def response = "<doc>"
        props.entrySet().each {
            response += "<header key='" + it.getKey() + "'>" + it.getValue() + "</header>"
        }
        response += "</doc>" 
        exchange.getIn().setBody(response)
    }
}

Notice that you now need to declare things that were previously hard-wired for in-line code, like the exchange, response etc. Also, there is now a class wrapped around the code which is itself now in a method plus I’ve had to set the body explicitly. But, you don’t need all that tedious XML escaping, and the whole Exchange is available to play with.

To wire gmh.groovy up into the route you need to add the following above the camelContext entry:

<lang:groovy id="gmh" script-source="classpath:groovy/gmh.groovy"/>

Note you may need to declare the “lang” namespace at the top of the file before it will work in which case add the following:

xmlns:lang="http://www.springframework.org/schema/lang"

Lastly, the route can be altered to get rid of the in-line code and use the bean instead:

<route>
 <from uri="servlet:///gmh" />
 <to uri="bean:gmh?method=gmh"/>
 <setHeader headerName="Content-Type"><simple>text/xml</simple></setHeader>
 </route>

Note it’s the bean:id that gets used in the uri, not the class name, – only the method name is in the route.

Now is you re-start the servlet and re-try the url, you should get the same answer as before. There’s of course a lot more you can do with this, there always is, but that’s the basics and it works. So, where does that leave me with my POST problem?

Getting the POST

Suffice to say, it’s not much more difficult, once Groovy is running, it’s just a bit more script. I used the Apache Commons File Upload libraries, and a bit of code (60 lines). Basically, it reads the encoded data in the body and does one of two things. a) If it’s a header, it creates a Exchange.Property called groovy.[header]. If it’s a file, it creates a property with the file name and turns the stream into byte array which gets put in the body.
That gave me a new postMe.groovy script which I wired into this route (Here’s the camel-config as well) :

<route id="POSTTest">
 <from uri="servlet:///posttest" />
 <to uri="bean:PostMe?method=getPost"/>
 <to uri="file:outbox"/>
 </route>

If you set this up and call curl with a file to upload like so:

curl -i -F filedata=@image.jpg http://localhost:8080/[SERVLET]/camel/posttest

Then you should see your file  in  the /outbox folder. You can also add headers like -F stuff=foo and throw in the previous bean to show them in the list as groovy.stuff=foo.

Last thoughts

Groovy is well, groovy and fixes my POST issue nicely. However, it’s proved that running Camel without doing some coding maybe optimistic. Having said that, once you have a Groovy-enabled .war deployed, it’s a great way to add code snippets into your route XML, and use it to create Beans to open out a much wider range of possibilities that can be shared. I can see it would be fairly easy to create a “toolkit” Camel with all these things in and perhaps SQL, SEDA (for queues) etc and a few of these Beans as well as ‘recipes’.