Creating MarkLogic content with Apache Camel

MarkLogic with JMS?

A few weeks back we started looking at MarkLogic  at work as a possible replacement for our mix of Cocoon and eXistDB systems. One of the side issues that’s been raised is how we would get messages to/from our ActiveMQ EIP system. Now, MarkLogic doesn’t have a JMS connector, although to be fair, it seems to have a pretty good system for slurping up content from URL’s, files etc. However, there is a Java API, which gave me the idea of using my old friend Apache Camel.

If I could get Camel to talk to MarkLogic then, not only could I talk to any sort of queue, I could also pull content into MarkLogic from the huge range of other things Camel will talk to, and I would get all the EIP magic thrown in for good measure.

The Camel Routes.

The easiest way to test this was of course to set up a simple Camel project to slurp up some data. The most expedient  producer I could think of was to get an RSS feed from somewhere; JIRA being the most obvious, as it would reliably produce something new at reasonably short intervals. This would need transforming into XML and pushing into Marklogic via a queue. The MarkLogic side would be handled by their Java API mounted as a Bean, in my case written in Groovy so I could work with it as a script. So much for the basic plan. Here’s the start route as Camel sees it :

<route id="jira">
    <from uri="rss:https://issues.apache.org/jira/sr/jira.issueviews:searchrequest-xml
    /temp/SearchRequest.xml?jqlQuery=&amp;tempMax=100&amp;delay=10s"/>
    <marshal><rss /></marshal>
    <setHeaderheaderName="ml_doc">
        <simple>/twitter/${exchangeId}</simple>
    </setHeader>
   <to uri="seda:mlfeed"/>
</route>

The Camel RSS module calls a basic Jira RSS feed, in this case, polling every 10 seconds. I’ve used the module defaults, so each entry is separated out of the feed and passed down the route one at a time. At this point the message body is a Java SyndFeed object, not XML, so it has to be ‘marshalled‘. Now the message body is an XML string ready for upload, but before I can send it I need to make a URI for MarkLogic to use. Each run of the route or ‘exchange’ has a unique id, so I’ve used that via the inbuilt <simple/> language. Alternatively, I could have also parsed something out of the content, like the Jira id or made something up like the current date-time. Finally, the message is dropped into a queue via the SEDA module.
Note, this in-memory queue type isn’t persistent, like JMS or ActiveMQ, but it’s built into camel-core, so was just handy.

There is another route to pull messages from the queue and into MarkLogic.

<route id="marklogic">
    <from uri="seda:mlfeed"/>
    <to uri="bean:marklogic?method=upload_xml"/>
    <!-- <to uri="file:outbox"/> -->
</route>

This route takes messages off the queue and passes them to a Bean written using Camel’s Groovy support. Lastly there’s an optional entry to put the message into a file in an /outbox folder. This is handy if you can’t get the MarkLogic bit working and want to look at the input: comment out the bean instead and just drop the data into files.

The Groovy Code.

The Groovy Bean is mounted in the configuration file, along with some parameters needed to connect to MarkLogic.
Note. To get this working, you’ll need to supply your own parameters, and have a MarkLogic REST server listening, as REST is the basis of their API. You can get instructions here.

<lang:groovy id="marklogic" script-source="classpath:groovy/marklogic.groovy">
<lang:property name="host" value="YOURHOST" />
<lang:property name="port" value="YOURPORT" />
<lang:property name="user" value="YOURNAME" />
<lang:property name="password" value="YOURPASSWORD" />
</lang:groovy>

Once the Bean is running, you simple call it’s methods in the route. You get as input the entire Exchange, so you have access to everything, as well as the ability to alter it as you like. In this case, I’ve simply written the data out and not altered the massage at all. In real life it would probably be more complex. The salient bit of Groovy code (the Get’s for the parameters are not shown) is shown below. This is the MarkLogic basic example with a couple of mods to a) Get the header that has the URI in, and b) Get the body of the input Message as an InputStream:

public void upload_xml(Exchange exchange) {
    // Get the doc url from Camel
    String docId = exchange.getIn().getHeader("ml_doc");
    if (docId == null) docId = "/twitter/" + exchange.getExchangeId();
    // create the client
    DatabaseClient client = DatabaseClientFactory.newClient(host, port,
 user, password,
 Authentication.DIGEST);

    // try to make use of the client connection
    try {
        XMLDocumentManager XMLDocMgr = client.newXMLDocumentManager();
        //Get an InputStream from the Message Body
        InputStreamHandle handle = new InputStreamHandle(exchange.getIn().getBody(InputStream.class));

        //Write out the XML Doc
        XMLDocMgr.write(docId,handle);
        } catch (Exception e) {
            System.out.println("Exception : " + e.toString() );
        } finally {
            // release the client
            client.release();
        }
}

Note.  I’ve connected and disconnected to the MarkLogic database each time. I’m sure this can’t be efficient in anything but a basic use case, but it will do for the present. There’s nothing to stop me creating an Init() method that could be called as the Bean starts to create a persistent connection if that’s better, but all the examples I could find seem to do it this way [If I’ve made any MarkLogic Java API gurus out there wince, I’m sorry. Happy to do it a better way].

Putting it all together.

If you’ve got a handy MarkLogic server, you can try this all out. I’ve put the code here on GitHub as a Maven project, and all you need to do is pull it and run “mvn compile exec:java”. Ten seconds or so after it starts, you should see something similar to this on the console:

[l-1) thread #1 – seda://mlfeed] DocumentManagerImpl INFO Writing content for /jira/ID-twiglet-53205-1398451322665-0-2
[l-1) thread #1 – seda://mlfeed] DatabaseClientImpl INFO Releasing connection

On the MarkLogic side, if you go to the Query Console you can use Explore to look at your database. You should see the files in the database – query them to your heart’s content.

  • I’m using MarkLogic 7 and Java API 2.0-2 with Camel 2.12.0.
  • If you want to change the routes, you’ll find them in src/resources/camel-context.xml.
  • The Groovy script is in resources/groovy/marklogic.groovy.
  • Remember, if you want to use modules outside of camel-core, you’ll need them in the pom.xml!

Bells and Whistles.

Now I’ve got the basic system working there are a couple of other things I could do. As the MarkLogic component reads from the end of a queue, I could for instance add another route that puts messages into the same queue from another source, for example Twitter (for which there’s a module) assuming I had appropriate twitter oauth ‘keys’, like so:

<route id="twitter-demo">
    <from uri="twitter://search?type=polling&amp;delay=60&amp;keywords=marklogic&amp;consumerKey={{twitter.consumerKey}}&amp;consumerSecret={{twitter.consumerSecret}}&amp;accessToken={{twitter.accessToken}}&amp;accessTokenSecret={{twitter.accessTokenSecret}}" />
    <setHeader headerName="ml_doc">
        <simple>/twitter/${body.id}</simple>
    </setHeader>
    <log message="${body.user.screenName} tweeted: ${body.text}" />
    <to uri="seda:mlfeed"/>
</route>

Of course, once you start doing that, you need someway to make sure you can throttle the speed that things get added to the queue to avoid overwhelming the consumer. Camel has several strategies for this, but my favourite is RoutePolicy. With this you can specify rules that allow the input routes to be shutdown and restarted as necessary to throttle the number of in-flight exchanges. You simple add the Bean like so with an a approprite configuration:

<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
 <property name="scope" value="Context"/>
<property name="maxInflightExchanges" value="20"/>
<property name="loggingLevel" value="WARN"/>
</bean>

and then add this policy to any route you wish to control, like so:

<route routePolicyRef="myPolicy">

Once there are more than 20 messages in-flight (‘context’ means all routes/queues) the inbound routes will be suspended. Once the activity drops below 70% (you can configure this) they’ll start up again – neat.

This only really skims the surface. Camel is a marvellous system and being able to use it to push content to MarkLogic is very handy (if I polish the code a bit).  Wiring routes up in Camel is so much easier, flexible and maintainable, than writing custom, one-off code.

Finally, of course, there’s nothing to say you couldn’t have a route that went away, got some key which was then sent to MarkLogic via Bean to retrieve some data instead and which that then got added to the body (Content Enrichment in EIP). That’ll have to be the subject of another day.

Resources.

  • MarkLogic Developer. http://developer.marklogic.com
  • Apache Camel. http://camel.apache.org
  • Enterprise Integration Patterns (nice hardback book) http://www.eaipatterns.com
Advertisements

2 thoughts on “Creating MarkLogic content with Apache Camel

  1. Cool to see the pieces come together so smoothly. Also, I can confirm your guess — ordinarily, you create the MarkLogic client once and reuse it for many requests. (The examples are simple and self-contained, so they don’t show the expected reuse.)

    • Eric, thanks for the fast reply – I’ll refactor 🙂
      Handily, Spring lets me call init/destroy methods in the bean config.

Comments are closed.