SOA专题
Apache Camel建立基于消息的应用
该源码展示:
- 基于Maven开始使用Apache Camel
- 使用CamelRunner提升路由。
- 使用camel建立的基于消息应用
基于Maven开始使用Apache Camel
这个 camel-demo可以作为你的项目模板,你只需要重命名的Java包,并重新命名POM的组和artifactID以符合您的需要。
该项目打开如下:
camel-demo
+- bin
+- config
+- data
+- src
+- pom.xml
+- README.txt
Maven的配置:
<?xml version='1.0' encoding='UTF-8'?>
<project xmlns='http://maven.apache.org/POM/4.0.0' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
xsi:schemaLocation='http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd'>
<modelVersion>4.0.0</modelVersion>
<groupId>deng.cameldemo</groupId>
<artifactId>camel-demo</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.6.6</slf4j.version>
<camel.version>2.10.1</camel.version>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptorRefs>
<descriptorRef>project</descriptorRef>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Unit testing lib -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit-dep</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.2.1</version>
<scope>test</scope>
</dependency>
<!-- Logging lib -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- Apache Commons lib -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.0.1</version>
</dependency>
<!-- Apache Camel -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-groovy</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jackson</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-mina</artifactId>
<version>${camel.version}</version>
</dependency>
</dependencies>
</project>
此的pom.xml的声明了一个基于Java的应用程序,它会产生jar包。它需要最少JDK6或更高版本。除了典型的JUnit和hamcrest的单元测试,还添加了SLF4J进行记录。加入的Apache的commons-lang/io的项目。
maven-assembly-plugin只有用于这个Demo演示目的,您可以更改或删除以便符合您自己的项目需要。
对于Camel依赖,除了camel-core,还有:
- camel-spring – 将Camel的路由作为XML配置. 见案例的 camel-demo/config目录.
- camel-jackson – 以JSON格式产生消息。
- camel-mina – 通过TCP Socket跨网络发送数据。
- camel-groovy – [可选] 增加动态脚本到路由中,适合调试和POC.
进入该项目,运行 mvn compile 检查是否有错误。
使用CamelRunner提升路由
下面使用路由来表达业务逻辑,以src/main/java/deng/cameldemo/HelloRoute.java为案例:
package deng.cameldemo;
import org.apache.camel.builder.RouteBuilder;
public class HelloRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from('timer://helloTimer?period=3000').
to('log:' + getClass().getName());
}
}
为了检查其如何运行,需要一个CamelContext ,编制类CamelRunner :
package deng.cameldemo;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
/**
* A main program to start Camel and run as a server using RouteBuilder class names or
* Spring config files.
*
* <p>Usage:
*
* java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute
*
* or
*
* java -Dspring=true deng.cameldemo.CamelRunner /path/to/camel-spring.xml
*
* @author Zemian Deng
*/
public class CamelRunner {
public static void main(String[] args) throws Exception {
CamelRunner runner = new CamelRunner();
runner.run(args);
}
private static Logger logger = LoggerFactory.getLogger(CamelRunner.class);
public void run(String[] args) throws Exception {
if (Boolean.parseBoolean(System.getProperty('spring', 'false')))
runWithSpringConfig(args);
else
runWithCamelRoutes(args);
// Wait for user to hit CRTL+C to stop the service
synchronized(this) {
this.wait();
}
}
private void runWithSpringConfig(String[] args) {
final ConfigurableApplicationContext springContext = new FileSystemXmlApplicationContext(args);
// Register proper shutdown.
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
springContext.close();
logger.info('Spring stopped.');
} catch (Exception e) {
logger.error('Failed to stop Spring.', e);
}
}
});
// Start spring
logger.info('Spring started.');
}
private void runWithCamelRoutes(String[] args) throws Exception {
final CamelContext camelContext = new DefaultCamelContext();
// Register proper shutdown.
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
camelContext.stop();
logger.info('Camel stopped for {}', camelContext);
} catch (Exception e) {
logger.error('Failed to stop Camel.', e);
}
}
});
// Added RouteBuilder from args
for (String className : args) {
Class<?> cls = Class.forName(className);
if (RouteBuilder.class.isAssignableFrom(cls)) {
Object obj = cls.newInstance();
RouteBuilder routeBuilder = (RouteBuilder)obj;
camelContext.addRoutes(routeBuilder);
} else {
throw new RuntimeException('Unable to add Camel RouteBuilder ' + className);
}
}
// Start camel
camelContext.start();
logger.info('Camel started for {}', camelContext);
}
}
其中两个方法,一个是通过Spring的配置运行路由,一个是通过代码。
使用run-java这个SH批命令直接运行:
$ mvn package
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute
你会看到程序加载HelloRoute到 DefaultCamelContext 并且开始为一个服务器,HelloRoute产生一个3秒计时的消息,发送到日志,打印到你的屏幕,使用 CTRL+C 中断。
下面配合Spring配置运行这个CamelRunner。
使用Spring的XML配置可以灵活指定路由。运行:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/hellocamel-spring.xml
config/hellocamel-spring.xml 相当于HelloRoute 代码,通过配置完成:
<beans xmlns='http://www.springframework.org/schema/beans'
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
xsi:schemaLocation='
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>
<camelContext id='helloCamel' xmlns='http://camel.apache.org/schema/spring'>
<route>
<from uri='timer://jdkTimer?period=3000'/>
<to uri='log://deng.cameldemo.HelloCamel'/>
</route>
</camelContext>
</beans>
使用camel建立的基于消息应用
为了路由监听camel-mina 提供的TCP端口,需要一个路由:
package deng.cameldemo;
import org.apache.camel.builder.RouteBuilder;
public class TcpMsgRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
String port = System.getProperty('port', '12345');
from('mina:tcp://localhost:' + port + '?sync=false').
to('log:' + getClass().getName());
}
}
一切准备好了,运行:
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.TcpMsgRoute -Dport=12345
输出:
15:21:41 main INFO org.apache.camel.impl.DefaultCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: camel-1) is starting
15:21:41 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:21:42 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:21:42 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@2ffad8fe
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=true]
15:21:42 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1426 | Total 1 routes, of which 1 is started.
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: camel-1) started in 0.505 seconds
15:21:42 main INFO deng.cameldemo.CamelRunner:93 | Camel started for CamelContext(camel-1)
服务器在12345端口等待用户客户端。
客户端代码,写一个TCP客户端:
package deng.cameldemo.client;
import java.io.FileReader;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TcpMsgSender {
public static void main(String[] args) throws Exception {
TcpMsgSender runner = new TcpMsgSender();
runner.run(args);
}
private static Logger logger = LoggerFactory.getLogger(TcpMsgSender.class);
public void run(String[] args) throws Exception {
String fileName = args.length > 0 ? args[0] : 'data/msg.txt';
String[] hostPort = (args.length > 1 ? args[1] : 'localhost:12345').split(':');
String host = hostPort[0];
String port = hostPort.length > 1 ? hostPort[1] : '12345';
logger.info('Sending tcp message {} to host={}, port={}', new Object[]{ fileName, host, port});
String text = IOUtils.toString(new FileReader(fileName));
logger.debug('File size={}', text.length());
CamelContext camelContext = new DefaultCamelContext();
ProducerTemplate producer = camelContext.createProducerTemplate();
producer.sendBody('mina:tcp://' + host + ':' + port + '?sync=false', text);
logger.info('Message sent.');
}
}
这个TcpMsgSender 将发送文本到服务器端点。运行:
$ bin/run-java deng.cameldemo.client.TcpMsgSender data/test-msg.json localhost:12345
输出:
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:24 | Sending tcp message data/test-msg.json to host=localhost, port=12345
15:22:35 main DEBUG deng.cameldemo.client.TcpMsgSender:27 | File size=47
15:22:35 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:22:35 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:32 | Message sent.
服务器端我们还是可以用Spring配置替代TcpMsgRoute :
<beans xmlns='http://www.springframework.org/schema/beans'
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
xsi:schemaLocation='
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>
<camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'>
<route>
<from uri='mina:tcp://localhost:12345?sync=false'/>
<to uri='log://deng.cameldemo.TcpMsgServer'/>
</route>
</camelContext>
</beans>
保存为 config/tcpmsgserver-spring.xml,运行:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-spring.xml
下面我们将产生JSON格式的文本。
输出你接受到文本,文本格式在data/test-msg.json:
{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }
使用myMsgProcessor用来进行JSON和Java对象之间转换,创建 config/tcpmsgserver-json-spring.xml :
<beans xmlns='http://www.springframework.org/schema/beans'
xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
xsi:schemaLocation='
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>
<camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'>
<route>
<from uri='mina:tcp://localhost:12345?sync=false'/>
<to uri='log://deng.cameldemo.TcpMsgServer'/>
<unmarshal>
<json library='Jackson'/>
</unmarshal>
<to uri='bean:myMsgProcessor?method=process'/>
</route>
</camelContext>
<bean id='myMsgProcessor' class='deng.cameldemo.MyMsgProcessor'>
</bean>
</beans>
'myMsgProcessor'代码如下:
package deng.cameldemo;
import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class MyMsgProcessor {
private static Logger logger = LoggerFactory.getLogger(MyMsgProcessor.class);
public void process(Map<String, String> data) {
logger.info('We should slice and dice the data: ' + data);
}
}
再次运行:
$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-json-spring.xml
最后几行输出:
17:05:28 main INFO deng.cameldemo.CamelRunner:61 | Spring started.
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.TcpMsgServer:96 | Exchange[ExchangePattern:InOnly, BodyType:String, Body:{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }]
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.MyMsgProcessor:11 | We should slice and dice the data: {lastName=Deng, firstName=Zemian}
Camel会自动转换的数据格式,客户端只发送JSON格式的纯文本,当服务器接收到它,它使用Jackson库包将其转换成一个Java的Map对象。然后通过map象导入到我们的处理器Bean中。
同样的道理,当你将业务逻辑写成一个或多个处理器bean,这是一个好主意,因为这将限制你的POJO逻辑到尽可能小的单位。当你做到这一点,那么你就可以最大限度地提高处理器的可重用性。如果做成一个混合的更大的POJO,很多业务逻辑混合,它也将很难测试。一旦你养成良好习惯,你就可以使用Camel骆驼以一种更有效的方式解决很多领域问题。