使用Spring Integration和Hazelcast进行集群领导者选举

19-01-10 banq
                   

最近在检查Spring Integration区域时,我注意到与Hazelcastdatagrid 的非常好的集成。在以下位置查看:

https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-hazelcast

如果您使用Hazelcast,那么您可以从Hazelcast支持的各种分布式数据结构中提供Spring Integration通道基础架构,如:

  • com.hazelcast.core.IMap,
  • com.hazelcast.core.MultiMap,
  • com.hazelcast.core.IList,
  • com.hazelcast.core.ISet,
  • com.hazelcast.core.IQueue,
  • com.hazelcast.core.ITopic,
  • com.hazelcast.core.ReplicatedMap

真正令我印象深刻的是Spring Integration团队通过Hazelcast实施集群领导者选举。让我们来看看并测试它。但首先要做的事情是:

演示任务:我们有两个Spring Boot微服务,每10秒产生一个随机值放入分布式IMap。现在只允许一个微服务在某一时刻消费使用IMap的数据。为了使它变得有点辣,微服务应该在消息传递给其他节点之后放弃其领导。

通过Spring Integration Cluster Leadership解决方案:

首先,您需要将Spring Boot MicroService添加到领导游戏中:

@Bean
    public Candidate nodeService1Candidate() {
        final NodeCandidate candidate = new NodeCandidate("service1", HazelcastConfiguration.ROLE_JOB_MAP);
        return candidate;
    }


    @Bean
    public LeaderInitiator initiator() {
        final LeaderInitiator leaderInitiator = new LeaderInitiator(hazelcastConfiguration.hazelcastInstance(), nodeService1Candidate());
        return leaderInitiator;
    }

但这不是全部。我们的目标是在领导权被授予后从IMap开始数据消费,另一方面在领导被撤销后停止数据消费。

为此,我们需要监听org.springframework.integration.leader.DefaultCandidate子类NodeCandidate中的onGranted和onRevoked事件。第一个构造函数参数是节点id,第二个是角色名称。阅读Spring Integration角色,但我不会使用它们。我将手动启动IMap更改生成者。

为了了解数据更改,SI Hazelcast集成提供了HazelcastEventDrivenMessageProducer,它可以监听分布式IMap更改并将适当的数据更改事件委派给Spring Integration通道基础结构。

@Configuration
public class HazelcastConfiguration {
    .
    .
    @Bean
    public IMap<String, String> getDistributedMapForJobInput() {
        return hazelcastInstance().getMap(INPUT_JOB_MAP);
    }

    @Bean
    public MessageChannel inputJobChannel() {
        return new DirectChannel();
    }

    @Bean
    public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
        final HazelcastEventDrivenMessageProducer producer =
                new HazelcastEventDrivenMessageProducer(
                        getDistributedMapForJobInput()
                );
        producer.setOutputChannel(inputJobChannel());
        producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
        producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
        producer.setAutoStartup(false);

        return producer;
    }
}

setAutostartup(false)的注意事项。我们希望让这位生产者在获得领导时能够开始启动:

/**
 * Created by tomask79 on 24.08.17.
 */
public class NodeCandidate extends DefaultCandidate {

    @Autowired
    private HazelcastConfiguration hazelcastConfiguration;

    public NodeCandidate(String nodeId, String role) {
        super(nodeId, role);
    }

    @Override
    public void onGranted(Context ctx) {
        super.onGranted(ctx);
        System.out.println("Leader granted to: "+ctx.toString());
        hazelcastConfiguration.hazelcastEventDrivenMessageProducer().start();
    }

    @Override
    public void onRevoked(Context ctx) {
        super.onRevoked(ctx);
        System.out.println("Leader revoked to: "+ctx.toString());
        hazelcastConfiguration.hazelcastEventDrivenMessageProducer().stop();
    }
}

最后一项任务是消费来自分布式IMap的消息并放弃领导,以便其他节点可以接受工作并享受一些乐趣。因此,让我们声明ServiceActivator监听来自jobInputChannel DirectChannel 的数据:

@Bean 
    @ServiceActivator(inputChannel =“inputJobChannel”)
    public MessageHandler logger(){ 
        return new LogAndGiveInitiatorHandler(); 
    }        

将消息记录到标准输出:

/**
 * Created by tomask79 on 24.08.17.
 */
public class LogAndGiveInitiatorHandler implements MessageHandler{

    @Autowired
    private JobServices jobServices;

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println(message.toString());
        System.out.println("Waiting for another node to take the work...!");
        jobServices.giveUp();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("........");
    }
}

并命令微服务放弃其领导地位:

/**
 * Created by tomask79 on 10.08.17.
 */
@Service
public class JobServices {    
    @Autowired
    private LeaderInitiator initiator;
    .
    .
    public void giveUp() {
        if (initiator.getContext().isLeader()) {
            System.out.println("Giving up on leadership: "+initiator.getContext().toString());
            initiator.getContext().yield();
        }
    }
}

就是这样!让我们测试整个包。

  • git clone https://bitbucket.org/tomask79/spring-leader-hazelcast.git
  • mvn clean all (in the directory with top pom.xml to build all three projects)

输出:

[INFO] Reactor Summary:
<p>[INFO] 
<p>[INFO] spring-cloud-cluster-demo .......................... SUCCESS [  0.412 s]
<p>[INFO] spring-microservice-hazelcast ...................... SUCCESS [  2.380 s]
<p>[INFO] spring-microservice-service1 ....................... SUCCESS [  3.685 s]
<p>[INFO] spring-microservice-service2 ....................... SUCCESS [  2.745 s]
<p>[INFO] ------------------------------------------------------------------------
<p>[INFO] BUILD SUCCESS
<p>[INFO] ------------------------------------------------------------------------
<p>[INFO] Total time: 10.047 s
<p>[INFO] Finished at: 2017-08-28T19:57:53+02:00
<p>[INFO] Final Memory: 40M/532M
<p>[INFO] ------------------------------------------------------------------------

现在打开两个终端并运行:

  • java -jar spring-microservice-service1 / target / service1-0.0.1-SNAPSHOT.war(在第一个终端)
  • java -jar spring-microservice-service2 / target / service2-0.0.1-SNAPSHOT.war(在第二个终端)

要验证两个微服务是否形成有效的Hazelcast集群,您应该看到类似的内容:

Members [2] {
    Member [192.168.1.112]:5702
    Member [192.168.1.112]:5701 this
}

在形成Hazelcast群集设置后,您应该看到以下输出

第一终端(获取领导权并放弃给服务2):

[st-leadership-0] com.example.hazelcast.NodeCandidate      : DefaultCandidate{role=leader, id=service1} has been granted leadership; context: HazelcastContext{role=leader, id=service1, isLeader=true}
Leader granted to: HazelcastContext{role=leader, id=service1, isLeader=true}
<p>[st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer
GenericMessage [payload=EntryEventMessagePayload [key=service18eff005d-6da8-4fb8-b747-f977ad8e1544, value=a61b5f9a-1b96-493d-b240-61ccb549ba17, oldValue=null], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5702, id=f9c5455b-b42d-3ab7-ec49-9bd33db9ec5f, hazelcast_eventType=ADDED, timestamp=1503945864993}]
Waiting for another node to take the work...!
Giving up on leadership: HazelcastContext{role=leader, id=service1, isLeader=true}

第二终端(获取领导权并放弃给服务1)

Leader granted to: HazelcastContext{role=leader, id=service2, isLeader=true}
2017-08-28 20:47:08.001  INFO 1357 --- [st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer
2017-08-28 20:47:08.019  INFO 1357 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8082 (http)
2017-08-28 20:47:08.029  INFO 1357 --- [           main] c.e.SpringMicroserviceServiceComponent   : Started SpringMicroserviceServiceComponent in 12.807 seconds (JVM running for 13.507)
........
GenericMessage [payload=EntryEventMessagePayload [key=service249cde108-5045-4b77-84f7-cdc9f524df04, value=c4fe775f-e44d-4f10-ac43-0fe7157c0e67, oldValue=null], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5701, id=df474178-1ff1-35e5-e1e1-d3f6f25d6d68, hazelcast_eventType=ADDED, timestamp=1503946037904}]
Waiting for another node to take the work...!
Giving up on leadership: HazelcastContext{role=leader, id=service2, isLeader=true}

总结

只是一些想法。如果在生产过程中流经系统的消息数量每天只有几千(我们在Embedit的生产系统中的速率),那么Hazelcast肯定是一种有过度杀伤力工作。建议始终使用JMS / AMPQ以循环方式将数据分发到您的节点。 但是当处理存储在内存中的大数据时。你不应该错过由Hazelcast支持的Spring Integration Election算法。