gRPC服务器中添加全局异常拦截器

在本教程中,我们将研究拦截器在gRPC服务器应用程序中处理全局异常的作用。

拦截器可以在请求到达 RPC 方法之前验证或操作请求。因此,它们在处理常见问题时非常有用,例如日志记录、安全性、缓存、审计、身份验证和授权以及应用程序的更多问题。

应用程序还可以使用拦截器作为全局异常处理程。

拦截器作为全局异常处理程序 拦截器主要可以帮助处理两种类型的异常:

  1. 处理从无法处理它们的方法中转义的未知运行时异常
  2. 处理从任何其他下游拦截器逃逸的异常
拦截器可以帮助创建一个框架来集中处理异常。这样,应用程序就可以拥有一致的标准和强大的方法来处理异常。

他们可以通过多种方式处理异常:

  • 出于审计或报告目的记录或保留异常
  • 创建支持票证
  • 在将错误响应发送回客户端之前修改或丰富错误响应

全局异常处理程序的高级设计 拦截器可以将传入请求转发到目标 RPC 服务。但是,当目标 RPC 方法抛出异常时,它可以捕获该异常,然后进行适当的处​​理。

我们假设有一个订单处理微服务。我们将在拦截器的帮助下开发一个全局异常处理程序,以捕获从微服务中的 RPC 方法中逃逸的异常。此外,拦截器捕获从任何下游拦截器逃逸的异常。然后,它调用票务服务以在票务系统中提出票证。最后,响应被发送回客户端。

首先,我们将开始在protobuf文件order_processing.proto中定义订单处理服务的基类:

syntax = "proto3";
package orderprocessing;
option java_multiple_files = true;
option java_package = "com.baeldung.grpc.orderprocessing";
message OrderRequest {
  string product = 1;
  int32 quantity = 2;
  float price = 3;
}
message OrderResponse {
  string response = 1;
  string orderID = 2;
  string error = 3;
}
service OrderProcessor {
  rpc createOrder(OrderRequest) returns (OrderResponse){}
}
order_processing.proto文件使用远程方法createOrder()和两个DTO OrderRequest和OrderResponse定义OrderProcessor。

稍后,我们可以使用order_processing.proto文件生成用于实现OrderProcessorImpl和GlobalExeptionInterceptor的支持 Java 源代码。

Maven插件生成类OrderRequest、OrderResponse和OrderProcessorGrpc。

我们将在实现部分讨论每个类。

我们将实现一个可以处理各种异常的拦截器。异常可能是由于某些失败的逻辑而显式引发的,也可能是由于某些不可预见的错误而导致的异常。

1.实施全局异常处理程序 gRPC 应用程序中的拦截器必须实现ServerInterceptor接口的InterceptCall()方法:

public class GlobalExceptionInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata headers,
        ServerCallHandler<ReqT, RespT> next) {
        ServerCall.Listener<ReqT> delegate = null;
        try {
            delegate = next.startCall(serverCall, headers);
        } catch(Exception ex) {
            return handleInterceptorException(ex, serverCall);
        }
        return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {
            @Override
            public void onHalfClose() {
                try {
                    super.onHalfClose();
                } catch (Exception ex) {
                    handleEndpointException(ex, serverCall);
                }
            }
        };
    }
    private static <ReqT, RespT> void handleEndpointException(Exception ex, ServerCall<ReqT, RespT> serverCall) {
        String ticket = new TicketService().createTicket(ex.getMessage());
        serverCall.close(Status.INTERNAL
            .withCause(ex)
            .withDescription(ex.getMessage() + ", Ticket raised:" + ticket), new Metadata());
    }
    private <ReqT, RespT> ServerCall.Listener<ReqT> handleInterceptorException(Throwable t, ServerCall<ReqT, RespT> serverCall) {
        String ticket = new TicketService().createTicket(t.getMessage());
        serverCall.close(Status.INTERNAL
            .withCause(t)
            .withDescription("An exception occurred in a subsequent interceptor:" + ", Ticket raised:" + ticket), new Metadata());
        return new ServerCall.Listener<ReqT>() {
            // no-op
        };
    }
}
InterceptCall()方法接受三个输入参数:
  • ServerCall:帮助接收响应消息
  • Metadata元数据:保存传入请求的元数据
  • ServerCallHandler:帮助将传入的服务器调用分派到拦截器链中的下一个处理器
该方法有两个try - catch块。第一个处理从任何后续下游拦截器抛出的未捕获的异常。在 catch 块中,我们调用方法handleInterceptorException(),该方法为异常创建一个票证。最后返回一个ServerCall.Listener对象,这是一个回调方法。

类似地,第二个try – catch块处理从 RPC 端点抛出的未捕获的异常。 InterceptCall ()方法返回ServerCall.Listener,充当传入 RPC 消息的回调。具体来说,它返回ForwardingServerCallListener的实例。SimpleForwardingServerCallListener是ServerCall.Listener的子类。

为了处理下游方法抛出的异常,我们重写了ForwardingServerCallListener类中的onHalfClose()方法。SimpleForwardingServerCallListener。一旦客户端完成发送消息,它就会被调用。

在此方法中,super.onHalfClose()将请求转发到OrderProcessorImpl类中的RPC 端点createOrder()。如果端点中有未捕获的异常,我们会捕获该异常,然后调用handleEndpointException()来创建票证。最后,我们调用serverCall对象上的close()方法来关闭服务器调用并将响应发送回客户端。

2.注册全局异常处理程序 我们在启动期间创建io.grpc.Server对象时注册拦截器:

public class OrderProcessingServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(8080)
          .addService(new OrderProcessorImpl())
          .intercept(new LogInterceptor())
          .intercept(new GlobalExceptionInterceptor())
          .build();
        server.start();
        server.awaitTermination();
    }
}
我们将GlobalExceptionInterceptor对象传递给io.grpc.ServerBuilder类的intercept()方法。这可确保对OrderProcessorImpl服务的任何 RPC 调用都经过GlobalExceptionInterceptor。同样,我们调用addService()方法来注册OrderProcessorImpl服务。最后,我们调用Server对象上的start()方法来启动服务器应用程序。

3.处理来自端点的未捕获异常 为了演示异常处理程序,我们首先看一下OrderProcessorImpl类:

public class OrderProcessorImpl extends OrderProcessorGrpc.OrderProcessorImplBase {
    @Override
    public void createOrder(OrderRequest request, StreamObserver<OrderResponse> responseObserver) {
        if (!validateOrder(request)) {
             throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription("Order Validation failed"));
        } else {
            OrderResponse orderResponse = processOrder(request);
            responseObserver.onNext(orderResponse);
            responseObserver.onCompleted();
        }
    }
    private Boolean validateOrder(OrderRequest request) {
        int tax = 100/0;
        return false;
    }
    private OrderResponse processOrder(OrderRequest request) {
        return OrderResponse.newBuilder()
          .setOrderID("ORD-5566")
          .setResponse("Order placed successfully")
          .build();
    }
}
RPC 方法createOrder()首先验证订单,然后通过调用processOrder()方法对其进行处理。在validateOrder()方法中,我们故意通过将数字除以零来强制运行时异常。

现在,让我们运行该服务并看看它如何处理异常:

@Test
void whenRuntimeExceptionInRPCEndpoint_thenHandleException() {
    OrderRequest orderRequest = OrderRequest.newBuilder()
      .setProduct("PRD-7788")
      .setQuantity(1)
      .setPrice(5000)
      .build();
    try {
        OrderResponse response = orderProcessorBlockingStub.createOrder(orderRequest);
    } catch (StatusRuntimeException ex) {
        assertTrue(ex.getStatus()
          .getDescription()
          .contains("Ticket raised:TKT"));
    }
}
我们创建OrderRequest对象,然后将其传递给客户端存根中的createOrder()方法。正如预期的那样,服务抛出异常。当我们检查异常中的描述时,我们发现其中嵌入了票证信息。因此,它表明GlobalExceptionInterceptor完成了它的工作。

这对于流媒体案例也同样有效。

4.处理拦截器中未捕获的异常 假设在GlobalExceptionInterceptor 之后有第二个拦截器被调用。 LogInterceptor记录所有传入请求以用于审核目的。我们来看一下:

public class LogInterceptor implements ServerInterceptor {
    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
        ServerCallHandler<ReqT, RespT> next) {
        logMessage(serverCall);
        ServerCall.Listener<ReqT> delegate = next.startCall(serverCall, metadata);
        return delegate;
    }
    private <ReqT, RespT> void logMessage(ServerCall<ReqT, RespT> call) {
        int result = 100/0;
    }
}
在LogInterceptor中,interceptCall()方法在将请求转发到 RPC 端点之前调用logMessage()来记录消息。 logMessage ()方法故意执行除以零以引发运行时异常,以演示GlobalExceptionInterceptor的功能。

让我们运行该服务并看看它如何处理LogInterceptor引发的异常:

@Test
void whenRuntimeExceptionInLogInterceptor_thenHandleException() {
    OrderRequest orderRequest = OrderRequest.newBuilder()
        .setProduct("PRD-7788")
        .setQuantity(1)
        .setPrice(5000)
        .build();
    try {
        OrderResponse response = orderProcessorBlockingStub.createOrder(orderRequest);
    } catch (StatusRuntimeException ex) {
        assertTrue(ex.getStatus()
            .getDescription()
            .contains("An exception occurred in a subsequent interceptor:, Ticket raised:TKT"));
    }
    logger.info("order processing over");
}
首先,我们在客户端存根上调用createOrder()方法。这次,GlobalExceptionInterceptor在第一个try - catch块中捕获从LogInterceptor逃逸的异常。随后,客户端收到异常,并在描述中嵌入票证信息。