Saturday, May 26, 2012

Using Quartz, Camel and Spring for Distributed Service Orchestration

UPDATE/WARNING: The ProducerTemplate should be used as Singleton. Regardless it should be stopped to cleanup resources. This means you should call the stop() method in ServletContextListener#contextDestroyed() if you have a Singleton ProducerTemplate. Injecting the Singleton is easy as you might have guessed (Just define it inside camelContextNode as "template" node and provide the "id" for injection). Our team has confirmed though that when loaded by Spring, the ProducerTemplate actually gets stopped when the bean is disposed so there is no need to use the Listener. This is because CamelProducerTemplateFactoryBean which implements DisposableBean is used to get an Instance of type ProducerTemplate when declared as camelContext/template[@id] in the XML. Please read http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html for more information.

Quartz is a powerful and popular Java Scheduler API. It allows simple timers or more complicated jobs a la Unix cron.

Camel is a Java Open Source API that implements Enterprise Integration Patterns (EIP).

Spring needs no presentation. Using Spring teams can concentrate on delivering business features without knowing much about low level API implementations.

Distributed Computing is how you manage to scale horizontally through different servers and it is an important cost factor that cannot be underestimated when designing a solution.

Services are how you encapsulate functionality that responds to events like user interaction, system triggers, external API usage and more.

Orchestration is how you define the order in which your services are called, retry policies, message adaptation, and in general how you use Enterprise Integration Patterns to externalize service usability. Doing orchestration separated from your Services contributes to build loosely coupled Services and that has an important consequence: There is no need for your Services to know about each other, being independent you can reuse them with minimum effort without breaking previous functionality and last but not least separation of concerns can be easily enforced. Did I mention I believe separation of concerns is the most important concept behind a good architecture?

Camel allows to define routes either in XML or Java DSL. I prefer Java DSL and this is definitely the main factor that made me decide for Camel and not for other possible solutions I was exposed to. Being able to debug with break points is important for the agility of specially small teams.

In Camel you use Endpoints which serve to consume or produce messages, the end points are connected by channels and common tasks have solutions in place through the use of several components. If you need more logic you can always build your own component or use generic implementations from beans and processors. You can start with the endpoints living in a single JVM or you could distribute them using JMS, AMQP or other messaging alternatives to provide asynchronous behavior if your project demands so.

In this post I am presenting a proof of concept to use Quartz 1.8.6 and Camel 2.9.2 (Camel 2.9.2 is incompatible with latest quartz 2.1.5) to provide Distributed Plain Old HTTP Service Orchestration. However Services can be POJOS managed or unmanaged by Spring, Camel will be able to interact with any Java and even non Java code you currently have. So even though I talk about Plain Old GET/POST HTTP Services the same is applicable to other kind of Services. Camel is a routing Engine, it will read messages (Exchange) from any Producer and will be able to produce messages for any Consumer.

I use XML for general configuration from Spring, then DSL to define Camel routes. The code can be tested from JUnit and of course directly from a Servlet Container like tomcat or an application Server like JBoss. I use Quartz scheduler to make sure only one instance in a Servlet Cluster (I have tested this in two Tomcat machines) starts the job at a time, Camel to define the route while aggregating information or using components like FTP or plain old HTTP Services responding JSON, Spring Controllers that can activate a Camel route. I also show how to define a retry policy and I do all that with some comments in the code.

Dependencies

        <!-- Camel -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camel.version}</version>
        </dependency>
     <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-http</artifactId>
            <version>${camel.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>geronimo-servlet_2.5_spec</artifactId>
                    <groupId>org.apache.geronimo.specs</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-ftp</artifactId>
          <version>${camel.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-mail</artifactId>
          <version>${camel.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.camel</groupId>
          <artifactId>camel-jackson</artifactId>
          <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-quartz</artifactId>
            <version>${camel.version}</version>
            <exclusions>
            <!-- If camel would support quartz 2 we would exclude it here
                <exclusion>
                    <artifactId>quartz</artifactId>
                    <groupId>org.quartz-scheduler</groupId>
                </exclusion>
            -->
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-stream</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring</artifactId>
            <version>${camel.version}</version>
            <exclusions>
             <exclusion>
              <artifactId>spring-context</artifactId>
              <groupId>org.springframework</groupId>
             </exclusion>
             <exclusion>
              <artifactId>spring-aop</artifactId>
              <groupId>org.springframework</groupId>
             </exclusion>
             <exclusion>
              <artifactId>spring-tx</artifactId>
              <groupId>org.springframework</groupId>
             </exclusion>
             <exclusion>
              <artifactId>camel-core</artifactId>
              <groupId>org.apache.camel</groupId>
             </exclusion>
            </exclusions>
        </dependency>
        <!-- Camel already includes the quartz version it can work with
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.1.5</version>
        </dependency>
 <!-- Camel Test -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-test</artifactId>
            <version>${camel.version}</version>
            <scope>test</scope>
        </dependency>
        -->

Quartz

At the core of this proof of concept is Quartz which we have to setup to provide persistent jobs. We use MySQL database to host the needed tables (The default for Camel 2.9.2 is quartz 1.8.6)
cat /Users/nestor/Downloads/quartz-1.8.6/docs/dbTables/tables_mysql_innodb.sql | mysql -u root -proot nestorurquiza
In order to schedule jobs you need some settings in web.xml to include the Listener that will start Quartz Scheduler when the application starts or shut it down when the application is underplayed:
    
    <!-- Quartz Listener -->
    <!-- Uncomment the below to start Quartz in this instance -->
 <listener>
        <listener-class>com.nestorurquiza.utils.web.QuartzServletContextListener</listener-class>
    </listener>
Here is the Listener code:
package com.nestorurquiza.utils.web;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QuartzServletContextListener implements
        ServletContextListener {
    Logger log = LoggerFactory.getLogger(QuartzServletContextListener.class.getName());

    @Override
    public void contextInitialized(ServletContextEvent event) {
        
        Scheduler scheduler = (StdScheduler) ApplicationServletContextListener.getBean("schedulerFactoryBean");
        try {
            scheduler.start();
        } catch (SchedulerException e) {
            throw new RuntimeException("Quartz failure:", e);
        }
        log.debug("Quartz Scheduler started");
    }

    @Override
    public void contextDestroyed(ServletContextEvent event) {
        Scheduler scheduler = (StdScheduler) ApplicationServletContextListener.getBean("schedulerFactoryBean");
        try {
            scheduler.shutdown();
        } catch (SchedulerException e) {
            log.error("Quartz failure:", e);
        }
        log.info("Quartz Scheduler destroyed");
    }
}
You will need an interface to manage the triggers for your scheduled jobs. For that you will use the Quartz API.

Here is an example of how to list all triggers per job in the system. It is just a Controller using BHUB that returns the information in JSON:
package com.nestorurquiza.web;

import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.camel.CamelContext;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;

/**
 * A sample Controller which can be used to list, delete or schedule cron triggers which execute a Camel trigger job. 
 * Used to schedule when to trigger a particular Camel route. As we use Persistent quartz jobs your usual cluster of servlet containers like Tomcat can be used to distribute the workload performed from Camel.
 * 
 * Samples:
 * http://bhubdev.nestorurquiza.com/quartz/info?ert=json
 * http://bhubdev.nestorurquiza.com/quartz/addOrUpdateCamelCronTrigger?triggerName=timer1&triggerGroup=group1&jobName=job1&jobGroup=DEFAULT&cronExpression=00%2000%2012%20*%20*%20%3F%20*&endPoint=seda:queue.sample&ert=json
 * http://bhubdev.nestorurquiza.com/quartz/deleteCronTrigger?ert=json&triggerName=timer1&triggerGroup=group1
 * http://bhubdev.nestorurquiza.com/quartz/deleteJob?jobName=job1&jobGroup=DEFAULT&ert=json
 * 
 * @author nestor
 *
 */
@Controller
public class QuartzController extends RootController {
    @Autowired
    CamelContext camelContext;
    
    @Autowired
    SchedulerFactoryBean schedulerFactoryBean;
    
    @RequestMapping("/quartz/info")
    public ModelAndView info(HttpServletRequest request,
            HttpServletResponse response) throws SchedulerException {
        //Initialize the context (mandatory)
        ControllerContext ctx = new ControllerContext(request, response);
        init(ctx);

        
        ctx.setRequestAttribute("triggersInJob", retrieveTriggersInJobs());
        return getModelAndView(ctx, "justJsonNoJspNeeded", null);
    }
    
    @RequestMapping("/quartz/deleteJob")
    public ModelAndView deleteJob(HttpServletRequest request,
            HttpServletResponse response,
            @RequestParam(value = "jobName", required = true) String jobName,
            @RequestParam(value = "jobGroup", required = true) String jobGroup) throws SchedulerException, ParseException {
        //Initialize the context (mandatory)
        ControllerContext ctx = new ControllerContext(request, response);
        init(ctx);

        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        scheduler.deleteJob(jobName, jobGroup);
        
        ctx.setRequestAttribute("triggersInJob", retrieveTriggersInJobs());
        
        return getModelAndView(ctx, "justJsonNoJspNeeded", null);
    }
    
    
    @RequestMapping("/quartz/deleteCronTrigger")
    public ModelAndView deleteCronTrigger(HttpServletRequest request,
            HttpServletResponse response,
            @RequestParam(value = "triggerName", required = true) String triggerName,
            @RequestParam(value = "triggerGroup", required = true) String triggerGroup) throws SchedulerException, ParseException {
        //Initialize the context (mandatory)
        ControllerContext ctx = new ControllerContext(request, response);
        init(ctx);

        //QuartzComponent quartz = camelContext.getComponent("quartz", QuartzComponent.class);
        //Scheduler scheduler = quartz.getScheduler();
        
        //SchedulerFactory schedFact = new org.quartz.impl.StdSchedulerFactory();
        //Scheduler scheduler = schedFact.getScheduler();
        
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        
        
        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerName, triggerGroup);
        if( trigger != null ) {
            scheduler.unscheduleJob(triggerName, triggerGroup);
        } 
        
        ctx.setRequestAttribute("triggersInJob", retrieveTriggersInJobs());

        
        return getModelAndView(ctx, "justJsonNoJspNeeded", null);
    }
    
    @RequestMapping("/quartz/addOrUpdateCamelCronTrigger")
    public ModelAndView addCronTrigger(HttpServletRequest request,
            HttpServletResponse response,
            @RequestParam(value = "triggerName", required = true) String triggerName,
            @RequestParam(value = "triggerGroup", required = true) String triggerGroup,
            @RequestParam(value = "jobName", required = true) String jobName,
            @RequestParam(value = "jobGroup", required = true) String jobGroup,
            @RequestParam(value = "cronExpression", required = true) String cronExpression,
            @RequestParam(value = "endPoint", required = false) String endPoint
            ) throws SchedulerException, ParseException {
        //Initialize the context (mandatory)
        ControllerContext ctx = new ControllerContext(request, response);
        init(ctx);

        //QuartzComponent quartz = camelContext.getComponent("quartz", QuartzComponent.class);
        //Scheduler scheduler = quartz.getScheduler();
        
        //SchedulerFactory schedFact = new org.quartz.impl.StdSchedulerFactory();
        //Scheduler scheduler = schedFact.getScheduler();
        
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        
        
        CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerName, triggerGroup);
        JobDetail jobDetail = null;
        if( trigger != null ) {
            if(endPoint == null) {
                throw new SchedulerException("Parameter endPoint is mandatory when creating a brand new trigger.");
            }
        } else {
            trigger = new CronTrigger(triggerName, triggerGroup, jobName, jobGroup, cronExpression);
        }
        trigger.setJobName(jobName);
        trigger.setJobGroup(jobGroup);
        trigger.setCronExpression(cronExpression);
        
        jobDetail = scheduler.getJobDetail(jobName, jobGroup);
        boolean reschedule = false;
        if(jobDetail != null) {
            jobDetail.setJobClass(com.nestorurquiza.orchestration.camel.quartz.CamelTriggerJob.class);
            reschedule = true;
        } else {
            jobDetail = new JobDetail(jobName, jobGroup, com.nestorurquiza.orchestration.camel.quartz.CamelTriggerJob.class);
        }
        
        jobDetail.getJobDataMap().put("endPoint", endPoint);
        if(reschedule) {
            scheduler.addJob(jobDetail, true);
            scheduler.rescheduleJob(triggerName, triggerGroup, trigger); 
        } else {
            scheduler.scheduleJob(jobDetail, trigger);
        }
        
        ctx.setRequestAttribute("triggersInJob", retrieveTriggersInJobs());

        
        return getModelAndView(ctx, "justJsonNoJspNeeded", null);
    }
    
    private Map retrieveTriggersInJobs() throws SchedulerException {
        return retrieveTriggersInJob(null, null);
    }
    
    private Map retrieveTriggersInJob(String jobName, String jobGroup) throws SchedulerException {
        String[] jobGroups;
        Map triggersInJobs = new HashMap();
        
        int i;
        
        //QuartzComponent quartz = camelContext.getComponent("quartz", QuartzComponent.class);
        //Scheduler scheduler = quartz.getScheduler();
        
        //SchedulerFactory schedFact = new org.quartz.impl.StdSchedulerFactory();
        //Scheduler scheduler = schedFact.getScheduler();
        
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        
        jobGroups = scheduler.getJobGroupNames();
        for (i = 0; i < jobGroups.length; i++) {
           String jg = jobGroups[i];
           if(jobGroup == null || jobGroup != null && jg.equals(jobGroup)) {
               String[] jobNames = scheduler.getJobNames(jg);
               if(jobNames != null) {
                   for( String jn : jobNames ) {
                       //Add the job to the Map even if there are not triggers defined
                       if(triggersInJobs.get(jn) == null) {
                           triggersInJobs.put(jn, null);
                       }
                       if(jobName == null || jobName != null && jn.equals(jobName)) {
                           Trigger[] jobTriggers = scheduler.getTriggersOfJob(jn, jg);
                           triggersInJobs.put(jn, jobTriggers);
                       }
                   }
               }
           }
        }

        return triggersInJobs;
    }
}
As you can see from the Controller I always schedule a job using the same trigger class which is nothing but a Camel invoker:
package com.nestorurquiza.orchestration.camel.quartz;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.StatefulJob;

import com.nestorurquiza.utils.web.ApplicationServletContextListener;

/**
 * A Quartz job to invoke a Camel endPoint
 * @author nestor
 *
 */

public class CamelTriggerJob implements StatefulJob {
    
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        CamelContext camelContext = (CamelContext) ApplicationServletContextListener.getBean("camelContext");
        ProducerTemplate template = camelContext.createProducerTemplate();
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        String endPoint = dataMap.getString("endPoint");
        template.sendBody(endPoint, "Activation from" + this.getClass().getName());
    }
    
}

Spring

Here are the relevant bits from the Spring Web application Context:
    ...
    ... http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    ...
    <context:component-scan base-package="com.nestorurquiza.orchestration.camel.route" />
    <context:component-scan base-package="com.nestorurquiza.orchestration.camel.filter" />
    ...
        <bean id="schedulerFactoryBean"
        class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="dataSource">
            <ref bean="nestorurquizaDataSource" />
        </property>
        <property name="autoStartup">
            <value>false</value>
        </property>
        <property name="applicationContextSchedulerContextKey">
            <value>applicationContext</value>
        </property>
        <property name="waitForJobsToCompleteOnShutdown">
            <value>true</value>
        </property>
        <property name="quartzProperties">
            <props>
                <prop key="org.quartz.scheduler.instanceName">nestorurquizaScheduler</prop>
                <prop key="org.quartz.scheduler.instanceId">AUTO</prop>
                <!-- ThreadPool -->
                <prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
                <prop key="org.quartz.threadPool.threadCount">5</prop>
                <prop key="org.quartz.threadPool.threadPriority">5</prop>
                <!-- Job store -->
                <prop key="org.quartz.jobStore.misfireThreshold">60000</prop>
                <prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreTX</prop>
                <prop key="org.quartz.jobStore.driverDelegateClass">org.quartz.impl.jdbcjobstore.StdJDBCDelegate</prop>
                <prop key="org.quartz.jobStore.useProperties">false</prop>
                <prop key="org.quartz.jobStore.tablePrefix">QRTZ_</prop>
                <prop key="org.quartz.jobStore.isClustered">true</prop>
                <prop key="org.quartz.jobStore.clusterCheckinInterval">20000</prop>
                <prop key="org.quartz.jobStore.selectWithLockSQL">
                    SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?
                </prop>
                <!-- Plugins -->
                <prop key="org.quartz.plugin.shutdownhook.class">
                    org.quartz.plugins.management.ShutdownHookPlugin
                </prop>
                <prop key="org.quartz.plugin.shutdownhook.cleanShutdown">true</prop>
                <prop key="org.quartz.plugin.triggHistory.class">
                    org.quartz.plugins.history.LoggingTriggerHistoryPlugin
                </prop>
                <prop
                    key="org.quartz.plugin.triggHistory.triggerFiredMessage">
                    Trigger {1}.{0} fired job {6}.{5} at: {4, date, HH:mm:ss MM/dd/yyyy}
                </prop>
                <prop
                    key="org.quartz.plugin.triggHistory.triggerCompleteMessage">
                    Trigger {1}.{0} completed firing job {6}.{5} at {4, date, HH:mm:ss
                    MM/dd/yyyy} with resulting trigger instruction code:
                    {9}
                </prop>
            </props>
        </property>
    </bean>
    
    <!-- Uncomment the below only if you use Quartz from Camel -->
    <!--  
    <bean id="quartz" class="org.apache.camel.component.quartz.QuartzComponent">
        <property name="scheduler">
            <ref bean="schedulerFactoryBean" />
        </property>
    </bean>
    -->
    
    <camelContext xmlns="http://camel.apache.org/schema/spring" id="camelContext">
        <packageScan>
            <package>com.nestorurquiza.orchestration.camel.route</package>
        </packageScan>
    </camelContext>
    ...
Here is the application context test file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context-2.5.xsd
           http://camel.apache.org/schema/spring 
           http://camel.apache.org/schema/spring/camel-spring.xsd
           ">
               
    <context:component-scan base-package="com.nestorurquiza.orchestration.camel.route" />
    <context:component-scan base-package="com.nestorurquiza.orchestration.camel.filter" />
     
    <camel:camelContext id="camel">
        <camel:package>com.nestorurquiza.orchestration.camel.route</camel:package>
    </camel:camelContext>
  
</beans>
Here is a Spring Controller that can be used to hit any Camel endPoint consumer:
package com.nestorurquiza.web;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.ModelAndView;

/**
 * A sample Controller showing how to send a simple message to any endPoint in the system
 * 
 * Example:
 * http://bhubdev.nestorurquiza.com/camel/publish?ert=json&endPoint=seda:queue.sample
 * 
 * @author nestor
 *
 */
@Controller
public class SampleCamelController extends RootController {

    @Autowired
    CamelContext camelContext;
    
    @RequestMapping("/camel/publish")
    public ModelAndView publish(HttpServletRequest request,
            HttpServletResponse response,
            @RequestParam(value = "endPoint", required = true) String endPoint) {
        //Initialize the context (mandatory)
        ControllerContext ctx = new ControllerContext(request, response);
        init(ctx);

        ProducerTemplate template = camelContext.createProducerTemplate();
        template.sendBody(endPoint, "Activation from" + this.getClass().getName());
        
        return getModelAndView(ctx, "justJsonNoJspNeeded", null);
    }
}
Here is how to test routes. We are testing here our first User Story. Read the Camel section for more:
package com.nestorurquiza.orchestration.camel.route;

import static org.junit.Assert.assertTrue;

import java.io.File;

import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

/**
 * A simple JUnit test capable of registering spring Singletons needed for our SampleCamelRouteBuilder
 * @author nestor
 *
 */
@ContextConfiguration(locations = {"classpath:/camelSpringContext.xml"})
public class SampleSpringCamelRouteBuilderTest extends AbstractJUnit4SpringContextTests {
    @Autowired
    private ApplicationContext applicationContext;
    
    @Produce(uri = "direct:start")
    protected ProducerTemplate template;
    
    @Configuration
    public static class ContextConfig {
        @Bean
        public RouteBuilder route() {
            return new SampleCamelRouteBuilder();
        }
    }
    
    @Test
    public void testRoute1() throws Exception {
        template.sendBody("seda:queue.sample", "Activation from JUnit");
        Thread.sleep(20000);
        File ulock = new File("/tmp/unlock");
        assertTrue("File not moved", ulock.exists());
    }

}

Camel

You might be tempted to use the Quartz component directly from Camel but at the time of this writing and for the versions I have used for this proof of concept your possibilities to reschedule your jobs will be limited. In any case here is a sample route file which schedules a cron trigger. You need to uncomment of course the relevant portion of the route. As it is the route will just log the routeId as soon as the route is loaded (direct:start):
package com.nestorurquiza.orchestration.camel.route;

import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SampleSchedulerCamelRouteBuilder extends RouteBuilder {
    private static final String LOG_NAME = SampleCamelRouteBuilder.class.getName();
    private static final Logger log = LoggerFactory.getLogger(LOG_NAME);
    
    @Override
    public void configure() {
        log.debug("Configuring SampleSchedulerCamelRouteBuilder routes");
        
        //It is better to use quartz api outside of camel at the moment http://camel.465427.n5.nabble.com/Configure-Camel-to-consume-externally-configured-quartz-jobs-td5712732.html#a5713361
        //from("quartz://group1/timer1?job.name=job1&stateful=true&trigger.repeatInterval=5000&trigger.repeatCount=0")
        //from("quartz://group1/timer2?job.name=job1&stateful=true&cron=00 50 14 * * ? *")
        
        from("direct:start")
        .log(LoggingLevel.DEBUG, LOG_NAME, "SampleSchedulerCamelRouteBuilder route ${id} triggered");
    }
}
Let us go through the following Routes defined for the purpose of covering some basics of Orchestration. Note that we consume an HTTP Service which accepts a unix command and sends back JSON, this is done using a nodejs server to run shell commands. This JSON Services approach BTW is a great way to reuse Talend jobs.
package com.nestorurquiza.orchestration.camel.route;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;

import com.nestorurquiza.orchestration.camel.ShellBeanService;
import com.nestorurquiza.orchestration.camel.ShellProcessor;
import com.nestorurquiza.orchestration.camel.ShellResponse;

public class SampleCamelRouteBuilder extends RouteBuilder {
    private static final String LOG_NAME = SampleCamelRouteBuilder.class.getName();
    private static final Logger log = LoggerFactory.getLogger(LOG_NAME);
    public static final int POLLING_DELAY_MSEC = 60 * 1000;
    
    @Autowired
    protected ApplicationContext applicationContext;
    
    @Override
    public void configure() {
        log.debug("Configuring SampleCamelRouteBuilder routes");
        //*** User Story# 1: Given the route seda:queue.sample is triggered; 
        //    When an error occurs 
        //    Then retry up to 5 times using exponential increments (2) for the delay for a maximum of 10 seconds
        //    When the route starts
        //    Then request the JSON GET HTTP Service, parse the response and throw an Exception if there are any errors
        //***
        
        //Solution 1.1: Using a direct route with noErrorHandler() encapsulating all the logic and calling it from the route that will perform the retries works as expected
        //The disadvantage is that in a chained Services process (Service Orchestration) several routes will be needed just to accomplish retries per each service that fails
        from("seda:queue.sample")
        .errorHandler(defaultErrorHandler()
                .log(log)
                .maximumRedeliveries(5)
                .backOffMultiplier(2)
                .useExponentialBackOff()
                .redeliveryDelay(1000)
                .maximumRedeliveryDelay(10000)
                .retryAttemptedLogLevel(LoggingLevel.WARN))
        .to("direct:direct.sample");
        //An intermediate entry point is needed        
        from("direct:direct.sample")
        .errorHandler(noErrorHandler()) 
        .log(LoggingLevel.DEBUG, LOG_NAME, "Processing with http, jackson and processor components in route ${id}")
        .to("http://localhost:8088/?cmd=ls%20/tmp/unlock")
        .unmarshal().json(JsonLibrary.Jackson, ShellResponse.class)
        .process(new ShellProcessor())
        .log(LoggingLevel.DEBUG, LOG_NAME, "Process succesfully unlocked")
        .end();

        
        //Solution 1.2: A second solution is to build a bean that invokes each http service, analyze the response and throws Exception if error
        from("seda:queue.sample2")
        .log(LoggingLevel.DEBUG, LOG_NAME, "Processing with bean component in route ${id}")
        .setHeader("url", constant("http://localhost:8088/?cmd=ls%20/tmp/unlock")).bean(ShellBeanService.class)
        .log(LoggingLevel.DEBUG, LOG_NAME, "Process succesfully unlocked")
        //That way we need only one route:
        //.setHeader("url", constant("http://localhost:8088/?cmd=runSecondService")).bean(ShellBeanService.class)
        //.setHeader("url", constant("http://localhost:8088/?cmd=runThirdService")).bean(ShellBeanService.class)
        //.log(LoggingLevel.DEBUG, LOG_NAME, "All Services in the route have executed successfully");
        .end();
         
        //*** User Story# 2: Given a recurrence of POLLING_DELAY_MSEC and a remote SFTP server directory to query for files:
        //    When polling time is reached 
        //    Then send an email with newer file names in the remoteSFTP directory
        //***
        
        //Solution 2: A sample polling implementation (uncomment to see it working): Checks for files newer than certain POLLING_DELAY_MSEC and sends an email when it finds them, aggregating all in just one message
        //            Note this solution is lacking distributed computing approach. Quartz generating the polling and a Bean in regular routes look like a stronger solution if more than one JVM is involved. 
        from("sftp://bhubint.nestorurquiza.com//home/admin?username=admin&password=pass&fastExistsCheck=true&delay=" + POLLING_DELAY_MSEC + "&filter=#fileAttributesFilter")
        .aggregate(new FileAttributesAggregationStrategy()).constant("all").completionTimeout(5000L)
        .to("log:" + LOG_NAME + "?level=DEBUG&showAll=true")
        .to("smtp://krms.sample.com?Subject=New Files&From=nestor@nestorurquiza.com&To=nestor@nestorurquiza.com")
        .end();
        */
       
    }
    
    private static class FileAttributesAggregationStrategy implements AggregationStrategy {

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            String newCamelFileNameOnly = newExchange.getIn().getHeader("CamelFileNameOnly", String.class);
            if (oldExchange == null) {
                newExchange.getIn().setBody(newCamelFileNameOnly);
                return newExchange;
            }
            String oldListOfFileNames = oldExchange.getIn().getBody(String.class);
            String newListOfFileNames = oldListOfFileNames + ", " + newExchange.getIn().getHeader("CamelFileNameOnly", String.class);
            log.debug(newListOfFileNames);
            newExchange.getIn().setBody(newListOfFileNames);
            return newExchange;
        }
    }
    
}
The first user story has two solutions and to try it you can just invoke the route (from the CamelController or scheduling a Quartz job) then creating the file "/tmp/unlock" after it fails a couple of times, you will see how the route finishes successfully if you create the file at any point during retries.

The second user story is an Example of polling. Even though FTP component is great like many other components there is no persisted way of handling the jobs and so you will be on your own if you decide to put the routes in more than one server as they will compete with each other. A custom Bean taking care of the checking using jsch library in a route that is triggered by a persisted Quartz job will be a better solution at least while Polling is supported in a distributed way from Camel.

There are several classes needed to support the two examples above. Here are all of them:
package com.nestorurquiza.orchestration.camel;

import org.apache.camel.Header;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.GetMethod;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.nestorurquiza.utils.Utils;

public class ShellBeanService {
    private static final String LOG_NAME = ShellBeanService.class.getName();
    private static final Logger log = LoggerFactory.getLogger(LOG_NAME);
    
    public void process(@Header(value = "url") String url) throws Exception {
        HttpClient httpclient = new HttpClient();
        GetMethod method = new GetMethod(url);
        int responseCode = httpclient.executeMethod(method);
        log.debug(url + " responseCode: " + responseCode);
        byte[] responseStream = method.getResponseBody();

        ObjectMapper mapper = new ObjectMapper();
        ShellResponse shellResponse = (ShellResponse) mapper.readValue(responseStream, ShellResponse.class);
        
        if(shellResponse == null) {
            throw new Exception("No response from remote Shell Server");
        }
        String stderr = shellResponse.getStderr();
        if(!Utils.isEmpty(stderr)) {
            throw new Exception(stderr);
        }
        
    }
}

package com.nestorurquiza.orchestration.camel;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

import com.nestorurquiza.utils.Utils;

public class ShellProcessor implements Processor {
    
    public void process(Exchange exchange) throws Exception {
        ShellResponse shellResponse = exchange.getIn().getBody(ShellResponse.class);
        if(shellResponse == null) {
            throw new Exception("No response from remote Shell Server");
        }
        String stderr = shellResponse.getStderr();
        if(!Utils.isEmpty(stderr)) {
            throw new Exception(stderr);
        }
        
    }
}


package com.nestorurquiza.orchestration.camel;

public class ShellResponse {
    private String stdout;
    private String stderr;
    private String cmd;
    public String getStdout() {
        return stdout;
    }
    public void setStdout(String stdout) {
        this.stdout = stdout;
    }
    public String getStderr() {
        return stderr;
    }
    public void setStderr(String stderr) {
        this.stderr = stderr;
    }
    public String getCmd() {
        return cmd;
    }
    public void setCmd(String cmd) {
        this.cmd = cmd;
    }
    
}


package com.nestorurquiza.orchestration.camel.filter;

import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileFilter;
import org.springframework.stereotype.Component;

import com.nestorurquiza.orchestration.camel.route.SampleCamelRouteBuilder;

@Component("fileAttributesFilter")
public class FileAttributesFilter implements GenericFileFilter {

    public boolean accept(GenericFile file) {
        return file.getLastModified() + SampleCamelRouteBuilder.POLLING_DELAY_MSEC > System.currentTimeMillis();
    }
}

If you deploy the app in two servers pointing to the same database you will notice Quartz will only activate the route in one of your servers. Look at QuartzController for examples about how to use it.

The endPoint consumer can be also triggered from SampleCamelController which uses a Publisher Template. Take a look at the class above for an example.

Camel Route Diagrams

Diagrams can be generated out of a Maven plugin. You will need to checkout or update the project:
$ git clone https://github.com/rmannibucau/diagram-generator-parent.git
$ cd diagram-generator-parent/
Then build it
$ mvn clean install
Include a section near to the below in your pom file and you should get your diagrams in the javadoc maven directory. You should actually exclude the doc-files folder from being committed to your repository (after all it is dynamically generated).
...
<build>
...
  <plugins>
...
 <plugin>
                <groupId>fr.rmannibucau</groupId>
                <artifactId>diagram-generator-maven-plugin</artifactId>
                <version>0.0.1-SNAPSHOT</version>
                <executions>
                    <execution>
                        <id>nestorurquiza-camel-routes</id>
                        <phase>site</phase>
                        <goals>
                            <goal>diagram</goal>
                        </goals>
                        <configuration>
                            <!-- or a qualified RouteBuilder name/a qualified package if you use java routes -->
                            <input>com.nestorurquiza.orchestration.camel.route</input>
                            <!-- default = false, true to show a window containing the diagram -->
                            <view>false</view>
                            <!-- default = 640  -->
                            <width>480</width>
                            <!-- default = 480 -->
                            <height>640</height>
                            <!-- default = target/diagram -->
                            <output>${basedir}/src/main/javadoc/com/nestorurquiza/orchestration/camel/route/doc-files</output>
                            <!-- default = camel -->
                            <type>camel</type>
                            <!-- default = xml, other values = { java  }-->
                            <fileType>java</fileType>
                            <!-- default = png, you can set jpg ... -->
                            <format>png</format>
                            <!-- true allows to resize icons, false force to keep their original size; default: true -->
                            <adjust>true</adjust>
                            <additionalClasspathElements>
                                <additionalClasspathElement>${basedir}/target/classes</additionalClasspathElement>
                            </additionalClasspathElements> 
                        </configuration>
                    </execution>
                </executions>
                
                <dependencies>
                    <dependency>
                        <!-- to use camel generator -->
                        <groupId>fr.rmannibucau</groupId>
                        <artifactId>camel-loader</artifactId>
                        <version>0.0.1-SNAPSHOT</version>
                    </dependency>
                    <!-- route dependencies if needed -->
                </dependencies>
            </plugin>
...
You can find several discussions about the correct use of this plugin in Nable but here is just what I did so far: I created a link in our documentation wiki pointing to the URL containing our javadocs including the path to the generated images for example file://localhost/Users/nestor/eclipse-workspace-test/nestorurquiza-app/target/site/apidocs/com/nestorurquiza/orchestration/camel/route/doc-files/. Note that doc-files directory is mandatory otherwise the images will not be added to the final documentation site. You can certainly add any subdirectory after it if you like.

Here is the output for our sample route:

Consuming Plain Old HTTP Services

Just in case you are new to my posts: I have posted before about how to reuse Talend jobs through the use of a nodejs server. The client invokes a shell command from a GET request to nodejs and a JSON response is then processed.

In our User Story #1 above we test retry policy while issuing a command like "ls /tmp/unlock" that will return an error in the JSON response as the file does not exist. After some retries we will manually create the file with "touch /tmp/unlock" and then we will see how Camel correctly continues and does not retry any more.

Camel needs to invoke the Nodejs HTTP Service using http://localhost:8088/?cmd=ls%20/tmp/unlock and it will need to parse the error if any to retry again:
{"stdout":,"stderr":"ls: /tmp/unlock: No such file or directory
","cmd":"ls /tmp/unlock"}

Learning Camel

  1. Checkout the source code and search especially in JUnit tests for uses of the different components
  2. Search mailing list which is available from nable

Notes on Quartz and scheduling

Quartz ships with a utility class org.quartz.TriggerUtils that allows to provide information about the stored triggers. Clearly this is handy to understand what is scheduled in the system. JWatch is a promising project that works with Quartz 2.x to provide access to Quartz scheduling.

Google uses an implementation of RFC2445 (iCalendar standard) for their Calendar application. Most likely based on the open source project google-rfc-2445. Quartz still does not support the standard. We should vote for such addition as clearly a standard will allow to use cool projects on the front end to report about Quartz jobs and integrate them with Calendar widgets.

As we have seen we have built using a Spring Controller some basic functionality to shedule and list cron triggers however the output of it is still a little bit cryptic as you can see below. A translation to plain spoken language is something you will need to do by your own or better share with us:
Object
autoLogoutTimeout:900000
contextPath:""
debug:false
errors:Object
info:Object
isAdvancedSearchAvailable:false
is_authenticated:true
messages:Object
module:"CLIENT"
pageTitle:"LOCAL"
sid:"4F0B046EA65A4E3295EDC94E53E14BFE.nestorurquiza-app1"
triggersInJob:Object
job1:Array[1]
0:Object
calendarName:null
cronExpression:"00 50 14 * * ? *"
description:null
endTime:null
expressionSummary:"seconds: 0 minutes: 50 hours: 14 daysOfMonth: * months: * daysOfWeek: ? lastdayOfWeek: false nearestWeekday: false NthDayOfWeek: 0 lastdayOfMonth: false years: * "
finalFireTime:null
fireInstanceId:null
fullJobName:"DEFAULT.job1"
fullName:"group1.timer1"
group:"group1"
jobDataMap:Object
jobGroup:"DEFAULT"
jobName:"job1"
key:Object
first:"timer1"
group:"group1"
name:"timer1"
second:"group1"
misfireInstruction:0
name:"timer1"
nextFireTime:1337626200000
previousFireTime:null
priority:5
startTime:1337540312000
timeZone:"America/New_York"
triggerListenerNames:Array[0]
volatile:false
warnings:Object

4 comments:

robert said...

hello, this is a bit late, but i am new to the party of camel and quartz. this is a very helpful and educative post. if you are monitoring this, i am wondering if the full source is available anywhere?

Nestor Urquiza said...

@Robert, no full source code as unfortunately I worked on this proof of concept in a proprietary implementation of a Business Hub but based on Spring for sure I see no big problems on trying to apply the steps above in a spring sample project.

Supriya said...

Hi,
I have currently implemented quartz scheduler using spring's SchedulerFactoryBean.
I want to switch to camel quartz integration with same scheduler implementation. Is it possible to pass my custom scheduler to camel component without disturbing existing implementation.

Nestor Urquiza said...

@Supriya that is a good question for the Camel mailing list. You will need to be more specific about the code you are currently using (code snippets) but you should post to the mailing list for sure. Best, - Nestor

Followers