SOA专题

Apache Camel建立基于消息的应用

  Apache Camel建立基于消息的应用源码下载

该源码展示:

  • 基于Maven开始使用Apache Camel
  • 使用CamelRunner提升路由。
  • 使用camel建立的基于消息应用

基于Maven开始使用Apache Camel

这个 camel-demo可以作为你的项目模板,你只需要重命名的Java包,并重新命名POM的组和artifactID以符合您的需要。

该项目打开如下:

camel-demo
    +- bin
    +- config
    +- data
    +- src
    +- pom.xml
    +- README.txt

Maven的配置:

<?xml version='1.0' encoding='UTF-8'?>
<project xmlns='http://maven.apache.org/POM/4.0.0' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
    xsi:schemaLocation='http://maven.apache.org/POM/4.0.0   http://maven.apache.org/maven-v4_0_0.xsd'>

    <modelVersion>4.0.0</modelVersion>
    <groupId>deng.cameldemo</groupId>
    <artifactId>camel-demo</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>1.6.6</slf4j.version>
        <camel.version>2.10.1</camel.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>project</descriptorRef>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <dependencies>

        <!-- Unit testing lib -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit-dep</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-library</artifactId>
            <version>1.2.1</version>
            <scope>test</scope>
        </dependency>

        <!-- Logging lib -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- Apache Commons lib -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.0.1</version>
        </dependency>

        <!-- Apache Camel -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-spring</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-groovy</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jackson</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-mina</artifactId>
            <version>${camel.version}</version>
        </dependency>

    </dependencies>

</project>

此的pom.xml的声明了一个基于Java的应用程序,它会产生jar包。它需要最少JDK6或更高版本。除了典型的JUnit和hamcrest的单元测试,还添加了SLF4J进行记录。加入的Apache的commons-lang/io的项目。

maven-assembly-plugin只有用于这个Demo演示目的,您可以更改或删除以便符合您自己的项目需要。

对于Camel依赖,除了camel-core,还有:

  1.  camel-spring – 将Camel的路由作为XML配置. 见案例的 camel-demo/config目录.
  2. camel-jackson – 以JSON格式产生消息。
  3. camel-mina – 通过TCP Socket跨网络发送数据。
  4. camel-groovy – [可选] 增加动态脚本到路由中,适合调试和POC.

进入该项目,运行 mvn compile 检查是否有错误。

 

使用CamelRunner提升路由

下面使用路由来表达业务逻辑,以src/main/java/deng/cameldemo/HelloRoute.java为案例:

package deng.cameldemo;

import org.apache.camel.builder.RouteBuilder;

public class HelloRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from('timer://helloTimer?period=3000').
            to('log:' + getClass().getName());
    }
}

为了检查其如何运行,需要一个CamelContext ,编制类CamelRunner :

package deng.cameldemo;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

/**
 * A main program to start Camel and run as a server using RouteBuilder class names or
 * Spring config files.
 *
 * <p>Usage:
 *
 * java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute
 *
 * or
 *
 * java -Dspring=true deng.cameldemo.CamelRunner /path/to/camel-spring.xml
 *
 * @author Zemian Deng
 */
public class CamelRunner {
    public static void main(String[] args) throws Exception {
        CamelRunner runner = new CamelRunner();
        runner.run(args);
    }

    private static Logger logger = LoggerFactory.getLogger(CamelRunner.class);
    public void run(String[] args) throws Exception {
        if (Boolean.parseBoolean(System.getProperty('spring', 'false')))
            runWithSpringConfig(args);
        else
            runWithCamelRoutes(args);

        // Wait for user to hit CRTL+C to stop the service
        synchronized(this) {
            this.wait();
        }
    }

    private void runWithSpringConfig(String[] args) {
        final ConfigurableApplicationContext springContext = new FileSystemXmlApplicationContext(args);

        // Register proper shutdown.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    springContext.close();
                    logger.info('Spring stopped.');
                } catch (Exception e) {
                    logger.error('Failed to stop Spring.', e);
                }
            }
        });

        // Start spring
        logger.info('Spring started.');
    }

    private void runWithCamelRoutes(String[] args) throws Exception {
        final CamelContext camelContext = new DefaultCamelContext();       
        // Register proper shutdown.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    camelContext.stop();
                    logger.info('Camel stopped for {}', camelContext);
                } catch (Exception e) {
                    logger.error('Failed to stop Camel.', e);
                }
            }
        });

        // Added RouteBuilder from args
        for (String className : args) {
            Class<?> cls = Class.forName(className);
            if (RouteBuilder.class.isAssignableFrom(cls)) {
                Object obj = cls.newInstance();
                RouteBuilder routeBuilder = (RouteBuilder)obj;
                camelContext.addRoutes(routeBuilder);
            } else {
                throw new RuntimeException('Unable to add Camel RouteBuilder ' + className);
            }
        }

        // Start camel
        camelContext.start();
        logger.info('Camel started for {}', camelContext);
    }
}

其中两个方法,一个是通过Spring的配置运行路由,一个是通过代码。

使用run-java这个SH批命令直接运行:

$ mvn package
$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.HelloRoute

你会看到程序加载HelloRoute到 DefaultCamelContext 并且开始为一个服务器,HelloRoute产生一个3秒计时的消息,发送到日志,打印到你的屏幕,使用 CTRL+C 中断。

下面配合Spring配置运行这个CamelRunner。

使用Spring的XML配置可以灵活指定路由。运行:

$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/hellocamel-spring.xml

config/hellocamel-spring.xml 相当于HelloRoute 代码,通过配置完成:

<beans xmlns='http://www.springframework.org/schema/beans'
    xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
    xsi:schemaLocation='
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>

    <camelContext id='helloCamel' xmlns='http://camel.apache.org/schema/spring'>
        <route>
            <from uri='timer://jdkTimer?period=3000'/>
            <to uri='log://deng.cameldemo.HelloCamel'/>
        </route>
    </camelContext>

</beans>

 

使用camel建立的基于消息应用

为了路由监听camel-mina 提供的TCP端口,需要一个路由:

package deng.cameldemo;

import org.apache.camel.builder.RouteBuilder;

public class TcpMsgRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        String port = System.getProperty('port', '12345');
        from('mina:tcp://localhost:' + port + '?sync=false').
            to('log:' + getClass().getName());
    }
}

一切准备好了,运行:

$ bin/run-java deng.cameldemo.CamelRunner deng.cameldemo.TcpMsgRoute -Dport=12345

输出:
15:21:41 main INFO org.apache.camel.impl.DefaultCamelContext:1391 | Apache Camel 2.10.1 (CamelContext: camel-1) is starting
15:21:41 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:21:42 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:21:42 main INFO org.apache.camel.component.mina.MinaConsumer:59 | Binding to server address: localhost/127.0.0.1:12345 using acceptor: org.apache.mina.transport.socket.nio.SocketAcceptor@2ffad8fe
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:2045 | Route: route1 started and consuming from: Endpoint[mina://tcp://localhost:12345?sync=true]
15:21:42 main INFO org.apache.camel.management.DefaultManagementLifecycleStrategy:859 | StatisticsLevel at All so enabling load performance statistics
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1426 | Total 1 routes, of which 1 is started.
15:21:42 main INFO org.apache.camel.impl.DefaultCamelContext:1427 | Apache Camel 2.10.1 (CamelContext: camel-1) started in 0.505 seconds
15:21:42 main INFO deng.cameldemo.CamelRunner:93 | Camel started for CamelContext(camel-1)

服务器在12345端口等待用户客户端。

客户端代码,写一个TCP客户端:

package deng.cameldemo.client;

import java.io.FileReader;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpMsgSender {
    public static void main(String[] args) throws Exception {
        TcpMsgSender runner = new TcpMsgSender();
        runner.run(args);
    }

    private static Logger logger = LoggerFactory.getLogger(TcpMsgSender.class);
    public void run(String[] args) throws Exception {
        String fileName = args.length > 0 ? args[0] : 'data/msg.txt';
        String[] hostPort = (args.length > 1 ? args[1] : 'localhost:12345').split(':');
        String host = hostPort[0];
        String port = hostPort.length > 1 ? hostPort[1] : '12345';
        logger.info('Sending tcp message {} to host={}, port={}', new Object[]{ fileName, host, port});

        String text = IOUtils.toString(new FileReader(fileName));
        logger.debug('File size={}', text.length());

        CamelContext camelContext = new DefaultCamelContext();
        ProducerTemplate producer = camelContext.createProducerTemplate();
        producer.sendBody('mina:tcp://' + host + ':' + port + '?sync=false', text);
        logger.info('Message sent.');
    }
}

这个TcpMsgSender 将发送文本到服务器端点。运行:

$ bin/run-java deng.cameldemo.client.TcpMsgSender data/test-msg.json localhost:12345

输出:
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:24 | Sending tcp message data/test-msg.json to host=localhost, port=12345
15:22:35 main DEBUG deng.cameldemo.client.TcpMsgSender:27 | File size=47
15:22:35 main INFO org.apache.camel.impl.converter.DefaultTypeConverter:45 | Loaded 172 type converters
15:22:35 main INFO org.apache.camel.management.ManagementStrategyFactory:43 | JMX enabled.
15:22:35 main INFO deng.cameldemo.client.TcpMsgSender:32 | Message sent.

 

服务器端我们还是可以用Spring配置替代TcpMsgRoute :

<beans xmlns='http://www.springframework.org/schema/beans'
    xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
    xsi:schemaLocation='
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>

    <camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'>
        <route>
            <from uri='mina:tcp://localhost:12345?sync=false'/>
            <to uri='log://deng.cameldemo.TcpMsgServer'/>
        </route>
    </camelContext>

</beans>

保存为 config/tcpmsgserver-spring.xml,运行:

$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-spring.xml

 

下面我们将产生JSON格式的文本。

输出你接受到文本,文本格式在data/test-msg.json:

{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }

使用myMsgProcessor用来进行JSON和Java对象之间转换,创建 config/tcpmsgserver-json-spring.xml :

<beans xmlns='http://www.springframework.org/schema/beans'
    xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'
    xsi:schemaLocation='
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd'>

    <camelContext id='tcpMsgServer' xmlns='http://camel.apache.org/schema/spring'>
        <route>
            <from uri='mina:tcp://localhost:12345?sync=false'/>
            <to uri='log://deng.cameldemo.TcpMsgServer'/>
            <unmarshal>
                <json library='Jackson'/>
            </unmarshal>
            <to uri='bean:myMsgProcessor?method=process'/>
        </route>
    </camelContext>

    <bean id='myMsgProcessor' class='deng.cameldemo.MyMsgProcessor'>
    </bean>

</beans>

'myMsgProcessor'代码如下:

package deng.cameldemo;

import org.apache.camel.builder.RouteBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;

public class MyMsgProcessor {
    private static Logger logger = LoggerFactory.getLogger(MyMsgProcessor.class);
    public void process(Map<String, String> data) {
        logger.info('We should slice and dice the data: ' + data);
    }
}

再次运行:

$ bin/run-java deng.cameldemo.CamelRunner -Dspring=true config/tcpmsgserver-json-spring.xml

最后几行输出:

17:05:28 main INFO deng.cameldemo.CamelRunner:61 | Spring started.
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.TcpMsgServer:96 | Exchange[ExchangePattern:InOnly, BodyType:String, Body:{ 'firstName' : 'Zemian', 'lastName' : 'Deng' }]
17:05:35 Camel (tcpMsgServer) thread #3 - MinaThreadPool INFO deng.cameldemo.MyMsgProcessor:11 | We should slice and dice the data: {lastName=Deng, firstName=Zemian}

Camel会自动转换的数据格式,客户端只发送JSON格式的纯文本,当服务器接收到它,它使用Jackson库包将其转换成一个Java的Map对象。然后通过map象导入到我们的处理器Bean中。

 

同样的道理,当你将业务逻辑写成一个或多个处理器bean,这是一个好主意,因为这将限制你的POJO逻辑到尽可能小的单位。当你做到这一点,那么你就可以最大限度地提高处理器的可重用性。如果做成一个混合的更大的POJO,很多业务逻辑混合,它也将很难测试。一旦你养成良好习惯,你就可以使用Camel骆驼以一种更有效的方式解决很多领域问题。

 

SOA案例源码

使用Apache CXF开发Web服务

使用Spring Webservices建立SOAP服务代理