Quarkus WebSockets Next教程

在本文中,我们简要介绍了 Quarkus中使用websockets-next扩展的 WebSockets。我们了解了如何使用它编写服务器和客户端组件。在这里,我们只讨论了这个库的基础知识,但它能够处理许多更高级的场景。

在本文中,我们将研究Quarkus 框架的quarkus-websockets-next扩展。此扩展是一个新的实验性扩展,用于在我们的应用程序中支持WebSockets。

Quarkus WebSockets Next 是一个新的扩展,旨在取代旧的 Quarkus WebSockets 扩展。与旧扩展相比,它将更易于使用且更高效。

然而,与 Quarkus 不同,它不支持Jakarta WebSockets API,而是提供了一个简化且更现代的 API 来处理 WebSockets。它使用自己的带注释的类和方法,在功能上提供了更大的灵活性,同时还提供了 JSON 支持等内置功能。

与此同时,Quarkus WebSockets Next 仍然建立在标准 Quarkus 核心之上。这意味着我们可以获得预期的所有性能和可扩展性,同时还可以受益于 Quarkus 为我们提供的开发体验。

依赖项
如果我们要开始一个全新的项目,我们可以使用 Maven 创建已安装websockets-next扩展的结构:

$ mvn io.quarkus.platform:quarkus-maven-plugin:3.16.4:create \
    -DprojectGroupId=com.baeldung.quarkus \
    -DprojectArtifactId=quarkus-websockets-next \
    -Dextensions='websockets-next'

请注意,由于该扩展仍处于实验阶段,因此我们需要使用io.quarkus.platform:quarkus-maven-plugin 。

或者,如果我们正在使用现有项目,我们可以简单地将适当的依赖项添加到我们的pom.xml文件中:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-websockets-next</artifactId>
</dependency>

服务器端点
一旦我们的应用程序准备好并且安装了websockets-next扩展,我们就可以开始使用 WebSockets 了。

通过创建用@WebSocket注释的新类,在 Quarkus 中创建服务器端点:

@WebSocket(path = "/echo")
public class EchoWebsocket {
   
// WebSocket code here.
}

这将创建一个监听所提供路径的端点。与 Quarkus 一样,我们可以根据需要在其中使用路径参数以及固定路径。

消息回调
为了使我们的 WebSocket 端点有用,我们需要能够处理消息。

WebSocket 连接支持两种类型的消息 - 文本和二进制。我们可以在服务器端点使用带有@OnTextMessage或@OnBinaryMessage注释的方法处理这些消息:

@OnTextMessage
public String onMessage(String message) {
    return message;
}

在这种情况下,我们将消息负载作为方法参数接收,并将方法的返回值发送到客户端。因此,此示例就是我们所需的回显服务器 - 我们将每条收到的消息原封不动地发回。

如果有必要,我们还可以接收二进制有效负载而不是文本有效负载。这是通过使用@OnBinaryMessage注释来实现的:

@OnBinaryMessage
public Buffer onMessage(Buffer message) {
    return message;
}

在这种情况下,我们接受并返回一个io.vertx.core.buffer.Buffer实例。它将包含收到的消息的原始字节。

2. 方法参数和返回
除了我们的处理程序接收传入消息的原始有效负载之外,Quarkus 还能够将许多其他内容传递到我们的回调消息中。

所有消息处理程序都必须有一个参数,表示传入消息的有效负载。此参数的确切类型决定了我们如何访问它。

正如我们之前看到的,如果我们使用Buffer或byte[],我们将获得传入消息的精确字节。如果我们使用String,这些字节将首先被解码为字符串。

不过,我们也可以使用更丰富的对象。如果我们使用JsonObject或JsonArray,则传入消息将被视为 JSON 并根据需要进行解码。或者,如果我们使用 Quarkus 不支持的任何其他类型,Quarkus 将尝试将传入消息反序列化为该类型:

@OnTextMessage
public Message onTextMessage(Message message) {
    return message;
}
record Message(String message) {}

我们也可以使用所有这些相同的类型作为返回值,在这种情况下,Quarkus 将在将消息发送回客户端时完全按照预期对消息进行序列化。此外,消息处理程序可以有一个void返回,以指示不会将任何内容作为响应发送回去。

除此之外,还有一些其他的方法参数我们可以接受。

如我们所见,消息有效负载将提供一个未注释的String参数。但是,我们也可以使用@PathParam注释的String参数来接收传入 URL 中此参数的值:

@WebSocket(path = "/chat/:user")
public class ChatWebsocket {
    @OnTextMessage(broadcast = true)
    public String onTextMessage(String message, @PathParam(
"user") String user) {
        return user +
": " + message;
    }
}

我们还可以接受WebSocketConnection类型的参数,它将代表客户端和服务器之间的精确连接:

@OnTextMessage
public Map<String, String> onTextMessage(String message, WebSocketConnection connection) {
    return Map.of(
        "message", message,
       
"connection", connection.toString()
    );
}

使用它可以让我们访问客户端和服务器之间的网络连接的详细信息 - 包括连接的唯一 ID、建立连接的时间以及用于连接的 URL 的路径参数。

我们还可以通过向其发送消息甚至强制关闭它来使用它来更直接地与连接进行交互:

@OnTextMessage
public void onTextMessage(String message, WebSocketConnection connection) {
    if ("close".equals(message)) {
        connection.sendTextAndAwait(
"Goodbye");
        connection.closeAndAwait();
    }
}

3. OnOpen和OnClose回调
除了用于接收消息的处理程序之外,我们还可以注册当首次打开新连接时的处理程序 - 使用@OnOpen,以及当关闭连接时的处理程序 - 使用@OnClose:

@OnOpen
public void onOpen() {
    LOG.info("Connection opened");
}
@OnClose
public void onClose() {
    LOG.info(
"Connection closed");
}

这些回调处理程序不能接收任何消息负载,但可以接收如前所述的任何其他方法参数。

此外,@OnOpen处理程序可以有一个返回值,该返回值将被序列化并发送给客户端。这对于在连接时立即发送消息非常有用,而无需等待客户端先发送某些内容。执行此操作遵循与消息处理程序的返回值相同的规则:

@OnOpen
public String onOpen(WebSocketConnection connection) {
    return "Hello, " + connection.id();
}

4. 访问连接
我们已经看到,我们可以将当前连接注入到回调处理程序中。然而,这并不是我们访问连接详细信息的唯一方法。

Quarkus 允许我们使用@Inject将WebSocketConnection对象作为CDI 会话范围 bean注入。这可以注入到我们系统中的任何其他 bean 中,我们可以从那里访问当前连接:

@ApplicationScoped
public class CdiConnectionService {
    @Inject
    WebSocketConnection connection;
}

但是,这仅在从 WebSocket 处理程序上下文中调用时才有效。如果我们尝试从任何其他上下文(包括常规 HTTP 调用)访问它,则将抛出jakarta.enterprise.context.ContextNotActiveException。

我们还可以通过注入OpenConnections类型的对象来访问所有当前打开的 WebSocket 连接:

@Inject
OpenConnections connections;

然后我们不仅可以使用它来查询所有当前打开的连接,还可以向它们发送消息:

public void sendToAll(String message) {
    connections.forEach(connection -> connection.sendTextAndAwait(message));
}

与注入单个WebSocketConnection不同,它可以在任何上下文中正常工作。这允许我们在需要时从任何其他上下文访问 WebSocket 连接。

5. 错误处理
在某些情况下,当我们处理 WebSocket 回调时,可能会出错。Quarkus允许我们通过编写异常处理程序方法来处理这些方法抛出的任何异常。这些是使用@OnError注释的任何方法:

@OnError
public String onError(RuntimeException e) {
    return e.toString();
}

这些回调处理程序遵循与其他回调处理程序相同的规则,涉及它们可以接收的参数及其返回值。此外,它们必须有一个表示要处理的异常的参数。

我们可以根据需要编写任意数量的错误处理程序,只要它们都适用于不同的异常类即可。如果有重叠的异常处理程序(换句话说,如果一个异常处理程序是另一个异常处理程序的子类),则将调用最具体的异常处理程序:

@OnError
public String onIoException(IOException e) {
    // Handles IOException and all subclasses.
}
@OnError
public String onException(Exception e) {
   
// Handles Exception and all subclasses except for IOException.
}

客户端 API
Quarkus 除了允许我们编写服务器端点之外,还允许我们编写可以与其他服务器通信的 WebSocket 客户端。

1. 基本连接器
编写 WebSocket 客户端的最基本方法是使用 BasicWebSocketConnector 。这让我们可以打开连接并发送和接收原始消息。

首先,我们需要在代码中注入一个BasicWebSocketConnector :

@Inject
BasicWebSocketConnector connector;

然后我们可以使用它来连接到远程服务:

WebSocketClientConnection connection = connector
  .baseUri(serverUrl)
  .executionModel(BasicWebSocketConnector.ExecutionModel.NON_BLOCKING)
  .onTextMessage((c, m) -> {
      // Handle incoming messages.
  })
  .connectAndAwait();

作为此连接的一部分,我们注册了一个 lambda 来处理从服务器收到的任何传入消息。这是必要的,因为 WebSockets 具有异步、全双工的特性。我们不能仅仅将其视为具有请求和响应对的标准 HTTP 连接。

一旦我们打开了连接,我们就可以用它向服务器发送消息:

connection.sendTextAndAwait("Hello, World!");

我们既可以在回调处理程序内部执行此操作,也可以在其外部执行此操作。但是,请记住,连接不是线程安全的,因此我们需要确保永远不会同时通过多个线程对其进行写入。

除了onTextMessage回调之外,我们还可以注册其他生命周期事件的回调,包括onOpen() 和onClose()。

2. 富客户端 Bean
基本连接器足以应付简单的连接,但有时我们需要更灵活的功能。Quarkus还允许我们编写更丰富的客户端,就像我们编写服务器端点一样。

为此,我们需要编写一个用@WebSocketClient注释的新类:

@WebSocketClient(path = "/json")
class RichWebsocketClient {
   
// Client code here.
}

然后,在这个类中,我们以与服务器端点相同的方式编写注释方法,使用诸如@OnTextMessage,@OnOpen等注释:

@OnTextMessage
void onMessage(String message, WebSocketClientConnection connection) {
    // Process message here
}

这遵循与服务器端点关于方法参数和返回值的所有相同的规则,只是如果我们想要访问连接详细信息,我们使用WebSocketClientConnection而不是WebSocketConnection 。

一旦我们编写了客户端类,我们就可以通过注入的WebSocketConnector实例来使用它:

@Inject
WebSocketConnector<RichWebsocketClient> connector;

使用这个,我们以与以前类似的方式创建连接,只是我们不需要提供任何回调,因为我们的客户端实例将为我们处理所有这些:

WebSocketClientConnection connection = connector
  .baseUri(serverUrl)
  .connectAndAwait();

此时,我们有一个WebSocketClientConnection,可以像以前一样使用它。

如果我们需要访问我们的客户端实例,我们可以通过为适当的类注入一个Instance来实现:

@Inject
Instance<RichWebsocketClient> clients;

但是,我们需要记住,这个客户端实例只有在正确的上下文中才可用,而且因为它都是异步的,所以我们需要确保某些事件已经发生。