Templates by BIGtheme NET

Using Multicasting In Camel Routers

Description
1) When the list of destinations is known ahead of time and is static, you can add an element
to the route that will consume messages from a source endpoint and then send the message
out to a list of destinations. (accounting dept & production dept).
2) Using this multicasting technique, we can improve the speed of operations by sending
orders to accounting and production at the same time.

Example :
The incoming order are in the folder file:/venkatjavasource/camel/from.
First all these orders will submit to jms:xmlOrders jms queue.
From here all these orders are multicast to jms:accounting accounting queue and
jms:production production queue.

20
A multicast sends a message to a number of specified recipients.

1) Code that do multicast is,

from("jms:xmlOrders").multicast().to("jms:accounting", "jms:production");

We can mention JMS to list using comma separator.

UsingMulticastingInCamelRouters.java

package com.venkatjavasource.camel;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

public class UsingMulticastingInCamelRouters {

	public static void main(String[] args) throws Exception {
		// Create CamelContext
		CamelContext context = new DefaultCamelContext();

		// Connect to embedded ActiveMQ JMS broker
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"vm://localhost");
		context.addComponent("jms",
				JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

		// add our route to the CamelContext
		context.addRoutes(new RouteBuilder() {
			@Override
			public void configure() {

				// Content-based router
				from("file:/venkatjavasource/camel/from?noop=false").choice()
						.when(header("CamelFileName").endsWith(".xml"))
						.to("jms:xmlOrders")
						.when(header("CamelFileName").regex("^.*(csv|csl)$"))
						.to("jms:csvOrders").otherwise().to("jms:badOrders");

				from("jms:xmlOrders").multicast().to("jms:accounting",
						"jms:production");

				// test that our route is working
				from("jms:accounting").process(new Processor() {
					public void process(Exchange exchange) throws Exception {
						System.out.println("Accounting received order: "
								+ exchange.getIn().getHeader("CamelFileName"));
					}
				});
				from("jms:production").process(new Processor() {
					public void process(Exchange exchange) throws Exception {
						System.out.println("Production received order: "
								+ exchange.getIn().getHeader("CamelFileName"));
					}
				});
			}
		});

		// start the route and let it do its work
		context.start();
		Thread.sleep(2000);

		// stop the CamelContext
		context.stop();
	}
}

Demo :
We have one order with file name Order_xml.xml in file:/venkatjavasource/camel/from
21

Select “Run As” -> “Java Application”, Order_xml.xml order will be submitted to accounting & production jms queues at a time.

Out Put :
Accounting received order: Order_xml.xml
Production received order: Order_xml.xml

The order will be removed from source directory and back up has taken in same source directory.
22

PARALLEL MULTICASTING :
1) This will set up the multicast to distribute messages to the destinations in parallel.

from("jms:xmlOrders").multicast().parallelProcessing()
.to("jms:accounting", "jms:production");

2) We can mention default thread pool size also,

ExecutorService executor = Executors.newFixedThreadPool(16);
from("jms:xmlOrders").multicast()
.parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");

3) By default, the multicast will continue sending messages to destinations even if one
fails.

STOPPING THE MULTICAST ON EXCEPTION :
1) We can use stopOnException feature of the multicast, to stop sending messages to destinations if it fails.

from("jms:xmlOrders")
.multicast().stopOnException()
.parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");

UsingMulticastingInCamelRouters.java

package com.venkatjavasource.camel;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

public class UsingMulticastingInCamelRouters {

	public static void main(String[] args) throws Exception {
		// Create CamelContext
		CamelContext context = new DefaultCamelContext();

		// Connect to embedded ActiveMQ JMS broker
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"vm://localhost");
		context.addComponent("jms",
				JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

		// add our route to the CamelContext
		context.addRoutes(new RouteBuilder() {
			@Override
			public void configure() {

				// Content-based router
				from("file:/venkatjavasource/camel/from?noop=false").choice()
						.when(header("CamelFileName").endsWith(".xml"))
						.to("jms:xmlOrders")
						.when(header("CamelFileName").regex("^.*(csv|csl)$"))
						.to("jms:csvOrders").otherwise().to("jms:badOrders");

				ExecutorService executor = Executors.newFixedThreadPool(14);

				from("jms:xmlOrders").multicast().stopOnException()
						.parallelProcessing().executorService(executor)
						.to("jms:accounting", "jms:production");

				// test that our route is working
				from("jms:accounting").process(new Processor() {
					public void process(Exchange exchange) throws Exception {
						System.out.println("Accounting received order: "
								+ exchange.getIn().getHeader("CamelFileName"));
					}
				});
				from("jms:production").process(new Processor() {
					public void process(Exchange exchange) throws Exception {
						System.out.println("Production received order: "
								+ exchange.getIn().getHeader("CamelFileName"));
					}
				});
			}
		});

		// start the route and let it do its work
		context.start();
		Thread.sleep(2000);

		// stop the CamelContext
		context.stop();
	}
}

*** Venkat – Happy leaning ****