在本教程中,我们将看到如何在Netty中实现HTTP / 2服务器和客户端。
Netty是基于NIO的客户端-服务器框架,它使Java开发人员能够在网络层上进行操作。使用此框架,开发人员可以构建自己的任何已知协议甚至自定义协议的实现。
服务器端
Netty支持通过TLS进行HTTP / 2的APN协商。因此,我们需要创建服务器的第一件事是SslContext:
SelfSignedCertificate ssc = new SelfSignedCertificate(); SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) .sslProvider(SslProvider.JDK) .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) .applicationProtocolConfig( new ApplicationProtocolConfig(Protocol.ALPN, SelectorFailureBehavior.NO_ADVERTISE, SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2)) .build();
|
在这里,我们使用JDK SSL提供程序为服务器创建了一个上下文,添加了两个密码,并为HTTP / 2配置了应用层协议协商。这意味着我们的服务器将仅支持HTTP / 2及其基础协议标识符h2。
接下来,我们需要一个ChannelInitializer用于我们的多路复用子通道,以便建立一个Netty管道。
我们将在此通道中使用较早的sslContext来启动管道,然后引导服务器:
public final class Http2Server { static final int PORT = 8443; public static void main(String[] args) throws Exception { SslContext sslCtx = // create sslContext as described above EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 1024); b.group(group) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { if (sslCtx != null) { ch.pipeline() .addLast(sslCtx.newHandler(ch.alloc()), Http2Util.getServerAPNHandler()); } } }); Channel ch = b.bind(PORT).sync().channel(); logger.info("HTTP/2 Server is listening on https://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
|
作为此通道初始化的一部分,我们在实用程序方法Http2Util中定义的实用程序方法getServerAPNHandler()中向管道添加了APN处理程序:
public static ApplicationProtocolNegotiationHandler getServerAPNHandler() { ApplicationProtocolNegotiationHandler serverAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) { @Override protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { ctx.pipeline().addLast( Http2FrameCodecBuilder.forServer().build(), new Http2ServerResponseHandler()); return; } throw new IllegalStateException("Protocol: " + protocol + " not supported"); } }; return serverAPNHandler; }
|
我们的自定义处理程序扩展了Netty的ChannelDuplexHandler,并充当服务器的入站和出站处理程序。它准备要发送给客户端的响应。
在io.netty.buffer.ByteBuf中定义一个静态Hello World响应 -首选的对象,该对象在Netty中读写字节:
static final ByteBuf RESPONSE_BYTES = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8));
|
该缓冲区将在处理程序的channelRead方法中设置为DATA帧,并写入ChannelHandlerContext中:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof Http2HeadersFrame) { Http2HeadersFrame msgHeader = (Http2HeadersFrame) msg; if (msgHeader.isEndStream()) { ByteBuf content = ctx.alloc().buffer(); content.writeBytes(RESPONSE_BYTES.duplicate()); Http2Headers headers = new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()); ctx.write(new DefaultHttp2HeadersFrame(headers).stream(msgHeader.stream())); ctx.write(new DefaultHttp2DataFrame(content, true).stream(msgHeader.stream())); } } else { super.channelRead(ctx, msg); } }
|
我们的服务器已准备好发布Hello World。
为了进行快速测试,请启动服务器并使用–http2选项触发curl命令:
curl -k -v --http2 https://127.0.0.1:8443
|
客户端
接下来,让我们看一下客户端。当然,其目的是发送请求,然后处理从服务器获得的响应。
我们的客户端代码将包括几个处理程序,一个初始化器类(用于在管道中对其进行设置)以及最后一个JUnit测试,以引导客户端并将所有内容整合在一起。
让我们再次看看如何设置客户端的SslContext。我们将其编写为设置客户端JUnit的一部分:
@Before public void setup() throws Exception { SslContext sslCtx = SslContextBuilder.forClient() .sslProvider(SslProvider.JDK) .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE) .trustManager(InsecureTrustManagerFactory.INSTANCE) .applicationProtocolConfig( new ApplicationProtocolConfig(Protocol.ALPN, SelectorFailureBehavior.NO_ADVERTISE, SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2)) .build(); }
|
如我们所见,它与服务器的S slContext非常相似,只是我们在这里没有提供任何SelfSignedCertificate。另一个区别是,我们添加了一个InsecureTrustManagerFactory来信任任何证书而无需任何验证。
重要的是,此信任管理器仅用于演示目的,不应在生产中使用。要改为使用可信证书,Netty的SslContextBuilder提供了许多替代方案。
现在,让我们看一下处理程序。
首先,我们需要一个称为Http2SettingsHandler的处理程序来处理HTTP / 2的设置。它扩展了Netty的SimpleChannelInboundHandler:
public class Http2SettingsHandler extends SimpleChannelInboundHandler<Http2Settings> { private final ChannelPromise promise; // constructor @Override protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws Exception { promise.setSuccess(); ctx.pipeline().remove(this); } }
|
该类只是初始化ChannelPromise并将其标记为成功。
它还有一个实用程序方法awaitSettings,我们的客户端将使用该方法来等待初始握手完成:
public void awaitSettings(long timeout, TimeUnit unit) throws Exception { if (!promise.awaitUninterruptibly(timeout, unit)) { throw new IllegalStateException("Timed out waiting for settings"); } }
|
如果在规定的超时时间内没有发生通道读取,则抛出IllegalStateException。
其次,我们需要一个处理程序来处理从服务器获得的响应,我们将其命名为Http2ClientResponseHandler:
public class Http2ClientResponseHandler extends SimpleChannelInboundHandler { private final Map<Integer, MapValues> streamidMap; // constructor }
|
此类还扩展了SimpleChannelInboundHandler,并声明了MapValues的streamidMap,它是我们Http2ClientResponseHandler的内部类:
public static class MapValues { ChannelFuture writeFuture; ChannelPromise promise; // constructor and getters }
|
我们添加了此类,以便能够为给定的Integer键存储两个值。
处理程序还具有一个实用方法put,当然可以将值放入streamidMap中:public MapValues put(int streamId, ChannelFuture writeFuture, ChannelPromise promise) { return streamidMap.put(streamId, new MapValues(writeFuture, promise)); }
|
接下来,让我们看看在管道中读取通道时此处理程序的作用。
@Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text()); if (streamId == null) { logger.error("HttpResponseHandler unexpected message received: " + msg); return; } MapValues value = streamidMap.get(streamId); if (value == null) { logger.error("Message received for unknown stream id " + streamId); } else { ByteBuf content = msg.content(); if (content.isReadable()) { int contentLength = content.readableBytes(); byte[] arr = new byte[contentLength]; content.readBytes(arr); logger.info(new String(arr, 0, contentLength, CharsetUtil.UTF_8)); } value.getPromise().setSuccess(); } }
|
在方法结束时,我们将ChannelPromise标记为成功以指示正确完成。
作为我们描述的第一个处理程序,此类还包含一个供客户端使用的实用程序方法。该方法使我们的事件循环等到ChannelPromise成功。或者换句话说,它等待直到响应处理完成:
public String awaitResponses(long timeout, TimeUnit unit) { Iterator<Entry<Integer, MapValues>> itr = streamidMap.entrySet().iterator(); String response = null; while (itr.hasNext()) { Entry<Integer, MapValues> entry = itr.next(); ChannelFuture writeFuture = entry.getValue().getWriteFuture(); if (!writeFuture.awaitUninterruptibly(timeout, unit)) { throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey()); } if (!writeFuture.isSuccess()) { throw new RuntimeException(writeFuture.cause()); } ChannelPromise promise = entry.getValue().getPromise(); if (!promise.awaitUninterruptibly(timeout, unit)) { throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey()); } if (!promise.isSuccess()) { throw new RuntimeException(promise.cause()); } logger.info("---Stream id: " + entry.getKey() + " received---"); response = entry.getValue().getResponse(); itr.remove(); } return response; }
|
Http2ClientInitializer正如我们在服务器中看到的那样,ChannelInitializer的目的是建立管道:
public class Http2ClientInitializer extends ChannelInitializer { private final SslContext sslCtx; private final int maxContentLength; private Http2SettingsHandler settingsHandler; private Http2ClientResponseHandler responseHandler; private String host; private int port; // constructor @Override public void initChannel(SocketChannel ch) throws Exception { settingsHandler = new Http2SettingsHandler(ch.newPromise()); responseHandler = new Http2ClientResponseHandler(); if (sslCtx != null) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port)); pipeline.addLast(Http2Util.getClientAPNHandler(maxContentLength, settingsHandler, responseHandler)); } } // getters }
|
在这种情况下,我们将使用新的SslHandler启动管道,以在握手过程开始时添加TLS SNI扩展。
然后,由ApplicationProtocolNegotiationHandler负责在管道中排列连接处理程序和我们的自定义处理程序:public static ApplicationProtocolNegotiationHandler getClientAPNHandler( int maxContentLength, Http2SettingsHandler settingsHandler, Http2ClientResponseHandler responseHandler) { final Http2FrameLogger logger = new Http2FrameLogger(INFO, Http2ClientInitializer.class); final Http2Connection connection = new DefaultHttp2Connection(false); HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder().frameListener( new DelegatingDecompressorFrameListener(connection, new InboundHttp2ToHttpAdapterBuilder(connection) .maxContentLength(maxContentLength) .propagateSettings(true) .build())) .frameLogger(logger) .connection(connection) .build(); ApplicationProtocolNegotiationHandler clientAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) { @Override protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { ChannelPipeline p = ctx.pipeline(); p.addLast(connectionHandler); p.addLast(settingsHandler, responseHandler); return; } ctx.close(); throw new IllegalStateException("Protocol: " + protocol + " not supported"); } }; return clientAPNHandler; }
|
客户端启动:我们需要添加更多功能来处理发送请求和接收响应。如前所述,我们将其编写为JUnit测试:@Test public void whenRequestSent_thenHelloWorldReceived() throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx, Integer.MAX_VALUE, HOST, PORT); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.remoteAddress(HOST, PORT); b.handler(initializer); channel = b.connect().syncUninterruptibly().channel(); logger.info("Connected to [" + HOST + ':' + PORT + ']'); Http2SettingsHandler http2SettingsHandler = initializer.getSettingsHandler(); http2SettingsHandler.awaitSettings(60, TimeUnit.SECONDS); logger.info("Sending request(s)..."); FullHttpRequest request = Http2Util.createGetRequest(HOST, PORT); Http2ClientResponseHandler responseHandler = initializer.getResponseHandler(); int streamId = 3; responseHandler.put(streamId, channel.write(request), channel.newPromise()); channel.flush(); String response = responseHandler.awaitResponses(60, TimeUnit.SECONDS); assertEquals("Hello World", response); logger.info("Finished HTTP/2 request(s)"); } finally { workerGroup.shutdownGracefully(); } }
|
值得注意的是,虽然类似服务器引导程序,下面采取的额外步骤:
- 首先,我们使用Http2SettingsHandler的awaitSettings方法等待初始握手。
- 其次,我们将请求创建为FullHttpRequest
- 第三,将streamId放入Http2ClientResponseHandler的streamIdMap中,并调用其awaitResponses方法
- 最后,我们验证了在回应中确实获得了Hello World
简而言之,这就是发生的情况–客户端发送了HEADERS帧,发生了初始SSL握手,服务器发送了HEADERS和DATA帧中的响应。我们希望Netty API在将来能够处理HTTP / 2框架方面有更多改进,因为它仍在开发中。
与往常一样,源代码可以在GitHub上获得。