zookeeper

  面向服务的设计SOA已被证明是一个针对各种分布式系统的巨大的成功的解决方案。如果使用得当,它有一个很大的好处。但是,随着服务数量的增加,它变得更加难以明白什么服务部署在哪里。而且因为我们正在建设的可靠和高度可用的系统,这样就有一个问题:目前有多少个服务实例?

  Apache ZooKeeper是 一个高度可靠的分布式服务协调者,通过ZooKeeper能够自动发现可用服务并执行一个REST 调用。

首先下载ZooKeeper,解压,拷贝目录conf/zoo_sample.cfg 到 conf/zoo.cfg,运行下面命令:

Windows: bin/zkServer.cmd  
Linux: bin/zkServer
Zookeeper现在运行在2181端口了。

下面开发一个简单的SOA服务,遵循 JAX-RS 2.0的无状态服务,查询返回人员集合。它能根据系统服务在多个服务器上运行,基础框架依赖 Apache CXF 和 Spring Framework 

package com.example.rs;

import java.util.Arrays;
import java.util.Collection;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import com.example.model.Person;

@Path( PeopleRestService.PEOPLE_PATH )
public class PeopleRestService {
public static final String PEOPLE_PATH = "/people";

@PostConstruct
public void init() throws Exception {
}

@Produces( { MediaType.APPLICATION_JSON } )
@GET
public Collection< Person > getPeople( @QueryParam( "page") @DefaultValue( "1" ) final int page ) {
return Arrays.asList(
new Person( "Tom", "Bombadil" ),
new Person( "Jim", "Tommyknockers" )
);
}
}

下面用Spring的AppConfig 是来在Jetty容器中创建JAX-RS 2.0 服务:

package com.example.config;

import java.util.Arrays;

import javax.ws.rs.ext.RuntimeDelegate;

import org.apache.cxf.bus.spring.SpringBus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

import com.example.rs.JaxRsApiApplication;
import com.example.rs.PeopleRestService;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

@Configuration
public class AppConfig {
public static final String SERVER_PORT = "server.port";
public static final String SERVER_HOST = "server.host";
public static final String CONTEXT_PATH = "rest";

@Bean( destroyMethod = "shutdown" )
public SpringBus cxf() {
return new SpringBus();
}

@Bean @DependsOn( "cxf" )
public Server jaxRsServer() {
JAXRSServerFactoryBean factory = RuntimeDelegate.getInstance().createEndpoint( jaxRsApiApplication(), JAXRSServerFactoryBean.class );
factory.setServiceBeans( Arrays.< Object >asList( peopleRestService() ) );
factory.setAddress( factory.getAddress() );
factory.setProviders( Arrays.< Object >asList( jsonProvider() ) );
return factory.create();
}

@Bean
public JaxRsApiApplication jaxRsApiApplication() {
return new JaxRsApiApplication();
}

@Bean
public PeopleRestService peopleRestService() {
return new PeopleRestService();
}

@Bean
public JacksonJsonProvider jsonProvider() {
return new JacksonJsonProvider();
}
}

下面启动内嵌的多个Jetty服务,服务的端口是由外界输入:

package com.example;

import org.apache.cxf.transport.servlet.CXFServlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;

import com.example.config.AppConfig;

public class ServerStarter {
public static void main( final String[] args ) throws Exception {
if( args.length != 1 ) {
System.out.println( "Please provide port number" );
return;
}

final int port = Integer.valueOf( args[ 0 ] );
final Server server = new Server( port );

System.setProperty( AppConfig.SERVER_PORT, Integer.toString( port ) );
System.setProperty( AppConfig.SERVER_HOST, "localhost" );

// Register and map the dispatcher servlet
final ServletHolder servletHolder = new ServletHolder( new CXFServlet() );
final ServletContextHandler context = new ServletContextHandler();
context.setContextPath( "/" );
context.addServlet( servletHolder, "/" + AppConfig.CONTEXT_PATH + "/*" );
context.addEventListener( new ContextLoaderListener() );

context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );

server.setHandler( context );
server.start();
server.join();
}
}

下面重点是演示 Apache ZooKeeper 如何实现服务发现定位到这个服务呢?

只要PeopleRestService 被部署时,它会将自己注册到ZooKeeper中,包括它的 URL和版本。客户端只要查询ZooKeeper,获得所有服务的列表,然后调用它们。当然,服务自己和客户端都必须知道Zookeeper在哪里运行。

这里我们是在本地机运行,这样在AppConfig 中加入localhost

private static final String ZK_HOST = "localhost";

每个服务一旦连上ZooKeeper,就保持一种持久连接,Zookeeper必须知道那些服务还活着,为了连接到ZooKeeper,我们必须创建一个CuratorFramework 实例:

@Bean( initMethod = "start", destroyMethod = "close" )
public CuratorFramework curator() {
return CuratorFrameworkFactory.newClient( ZK_HOST, new ExponentialBackoffRetry( 1000, 3 ) );
}

其中ZK_HOST是我们定义为Localhost的。

下面我们创建ServiceDiscovery 用来将服务信息提供给ZooKeeper发布。

@Bean( initMethod = "start", destroyMethod = "close" )
public ServiceDiscovery< RestServiceDetails > discovery() {
JsonInstanceSerializer< RestServiceDetails > serializer =
new JsonInstanceSerializer< RestServiceDetails >( RestServiceDetails.class );

return ServiceDiscoveryBuilder.builder( RestServiceDetails.class )
.client( curator() )
.basePath( "services" )
.serializer( serializer )
.build();
}

在ZooKeepr内部所有数据保存在一个有层次名称空间,如同文件系统一样,服务有根路径,每个服务有自己运行的URL和端口,我们建立一个URI规范供给Curator调用:

package com.example.rs;

import javax.inject.Inject;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

import org.springframework.core.env.Environment;

import com.example.config.AppConfig;
import com.netflix.curator.x.discovery.UriSpec;

@ApplicationPath( JaxRsApiApplication.APPLICATION_PATH )
public class JaxRsApiApplication extends Application {
public static final String APPLICATION_PATH = "api";

@Inject Environment environment;

public UriSpec getUriSpec( final String servicePath ) {
return new UriSpec(
String.format( "{scheme}://%s:{port}/%s/%s%s",
environment.getProperty( AppConfig.SERVER_HOST ),
AppConfig.CONTEXT_PATH,
APPLICATION_PATH,
servicePath
) );
}
}

最后,将服务PeopleRestService 注册服务发现处,这是通过在PeopleRestService 的init方法中实现的:

@Inject private JaxRsApiApplication application;
@Inject private ServiceDiscovery< RestServiceDetails > discovery;
@Inject private Environment environment;

@PostConstruct
public void init() throws Exception {
final ServiceInstance< RestServiceDetails > instance =
ServiceInstance.< RestServiceDetails >builder()
.name( "people" )
.payload( new RestServiceDetails( "1.0" ) )
.port( environment.getProperty( AppConfig.SERVER_PORT, Integer.class ) )
.uriSpec( application.getUriSpec( PEOPLE_PATH ) )
.build();

discovery.registerService( instance );
}

在这个init方法中,我们用到了RestServiceDetails,这是一个有关服务版本的类,如下:

package com.example.config;

import org.codehaus.jackson.map.annotate.JsonRootName;

@JsonRootName( "serviceDetails" )
public class RestServiceDetails {
private String version;

public RestServiceDetails() {
}

public RestServiceDetails( final String version ) {
this.version = version;
}

public void setVersion( final String version ) {
this.version = version;
}

public String getVersion() {
return version;
}
}

好了,最后总结一下我们刚才的步骤:

1.创建一个namepeple的服务实例,完整名称是:/services/people
2.设置一个这个实例运行的端口
3.为这个REST服务端endpoint设置URI规范
4.附加了一个服务版本的类 (RestServiceDetails)

下面让服务以/services/people发布到ZOOKeeper:

下面启动两个服务在8080和8081端口如下:

mvn clean package
java -jar jax-rs-2.0-service\target\jax-rs-2.0-service-0.0.1-SNAPSHOT.one-jar.jar 8080
java -jar jax-rs-2.0-service\target\jax-rs-2.0-service-0.0.1-SNAPSHOT.one-jar.jar 8081

这在zooKeeper会是如下示意:

zookeeper

已经发现有两个服务在运行,现在我们可以通过ZooKeeper使用他们:

第一步同样,我们要创建CuratorFramework 和 ServiceDiscovery 的实例,见:

package com.example.client;

import java.util.Collection;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.example.config.RestServiceDetails;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceInstance;

public class ClientStarter {
public static void main( final String[] args ) throws Exception {
try( final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( ClientConfig.class ) ) {
@SuppressWarnings("unchecked")
final ServiceDiscovery< RestServiceDetails > discovery =
context.getBean( ServiceDiscovery.class );
final Client client = ClientBuilder.newClient();

final Collection< ServiceInstance< RestServiceDetails > > services =
discovery.queryForInstances( "people" );
for( final ServiceInstance< RestServiceDetails > service: services ) {
final String uri = service.buildUriSpec();

final Response response = client
.target( uri )
.request( MediaType.APPLICATION_JSON )
.get();

System.out.println( uri + ": " + response.readEntity( String.class ) );
System.out.println( "API version: " + service.getPayload().getVersion() );

 response.close();
}
}
}
}

下面把这个客户端运行起来:

mvn clean package  java -jar jax-rs-2.0-client\target\jax-rs-2.0-client-0.0.1-SNAPSHOT.one-jar.jar

得到输出结果是:

http://localhost:8081/rest/api/people: [{"email":null,"firstName":"Tom","lastName":"Bombadil"},{"email":null,"firstName":"Jim","lastName":"Tommyknockers"}]
API version: 1.0

http://localhost:8080/rest/api/people: [{"email":null,"firstName":"Tom","lastName":"Bombadil"},{"email":null,"firstName":"Jim","lastName":"Tommyknockers"}]
API version: 1.0

文中源码下载:GitHub

 

 

下页

分布式系统

SOA

更多伸缩性scalable讨论

 

 

 

猜你喜欢