Monday, May 11, 2015

Database events - Part 1 - polling a table with apache camel

From time to time, we face the need to obtain events from a table in a database. If we are using Hibernate/JPA or another ORM, then this easy, because we can modify the DAO to publish an event. But if we need to integrate with another system, then this is an issue. Apache camel, can help in this case. We can set it up to poll a table, based on a primary key, and notify us when an even happens. Here's a sample that monitors a table for inserted records. It does so by keeping the last id inserted. In the next run we select any ID that's higher, assuming the ID is numberical and sequencial. The main idea, is to use quartz2 camel component to store the state, and then access it in the next run. This case is simple, and it does not require write-access to the database (ie. to mark the processed records). And unlike other methods based on http://camel.apache.org/idempotent-consumer.html, where we filter duplicate records, this one works if the table is large.
public class DataBaseSyncRouteBuilder extends RouteBuilder {

 @Override
 public void configure() throws Exception {

  from("quartz2://sync/myTimer?trigger.repeatInterval=5000&stateful=true")
    .routeId("myRoute")
    .choice()
    .when()
    .simple("${header.jobDetail.jobDataMap[last_id]} == null")
    .setBody(constant("select * from student order by id "))
    .otherwise()
    .setBody(
      simple("select * from student where id> ${header.jobDetail.jobDataMap[last_id]} order by id"))
    //
    .end()
    .to("jdbc:ds?useHeadersAsParameters=true")
    .choice()
    .when()
    .simple("${header.CamelJdbcRowCount} > 0")
    .process(new Processor() {

     @SuppressWarnings("unchecked")
     @Override
     public void process(Exchange exchange) throws Exception {

      Message msg = exchange.getIn();
      List> data = msg.getBody(List.class);
      JobDetail jobDetail = (JobDetail) msg.getHeader("jobDetail");
      JobDataMap map = jobDetail.getJobDataMap();

      int currentId = 0;

      if (map.containsKey("last_id"))
       currentId = map.getInt("last_id");

      for (Map row : data) {
       int i = Integer.parseInt(row.get("id").toString());
       if (currentId < i)
        currentId = i;
      }
      jobDetail.getJobDataMap().put("last_id", currentId);
     }
    })
    .log("last Processed Id: ${header.jobDetail.jobDataMap[last_id]}")
    .to("activemq:myQueue");

  from("activemq:myQueue").marshal().json().to("file:db.output");

 }
}

No comments:

Post a Comment