在本教程中,我们将研究拦截器在gRPC服务器应用程序中处理全局异常的作用。
拦截器可以在请求到达 RPC 方法之前验证或操作请求。因此,它们在处理常见问题时非常有用,例如日志记录、安全性、缓存、审计、身份验证和授权以及应用程序的更多问题。
应用程序还可以使用拦截器作为全局异常处理程。
拦截器作为全局异常处理程序 拦截器主要可以帮助处理两种类型的异常:
- 处理从无法处理它们的方法中转义的未知运行时异常
- 处理从任何其他下游拦截器逃逸的异常
他们可以通过多种方式处理异常:
- 出于审计或报告目的记录或保留异常
- 创建支持票证
- 在将错误响应发送回客户端之前修改或丰富错误响应
全局异常处理程序的高级设计 拦截器可以将传入请求转发到目标 RPC 服务。但是,当目标 RPC 方法抛出异常时,它可以捕获该异常,然后进行适当的处理。
我们假设有一个订单处理微服务。我们将在拦截器的帮助下开发一个全局异常处理程序,以捕获从微服务中的 RPC 方法中逃逸的异常。此外,拦截器捕获从任何下游拦截器逃逸的异常。然后,它调用票务服务以在票务系统中提出票证。最后,响应被发送回客户端。
首先,我们将开始在protobuf文件order_processing.proto中定义订单处理服务的基类:
syntax = "proto3"; package orderprocessing; option java_multiple_files = true; option java_package = "com.baeldung.grpc.orderprocessing"; message OrderRequest { string product = 1; int32 quantity = 2; float price = 3; } message OrderResponse { string response = 1; string orderID = 2; string error = 3; } service OrderProcessor { rpc createOrder(OrderRequest) returns (OrderResponse){} } |
稍后,我们可以使用order_processing.proto文件生成用于实现OrderProcessorImpl和GlobalExeptionInterceptor的支持 Java 源代码。
Maven插件生成类OrderRequest、OrderResponse和OrderProcessorGrpc。
我们将在实现部分讨论每个类。
我们将实现一个可以处理各种异常的拦截器。异常可能是由于某些失败的逻辑而显式引发的,也可能是由于某些不可预见的错误而导致的异常。
1.实施全局异常处理程序 gRPC 应用程序中的拦截器必须实现ServerInterceptor接口的InterceptCall()方法:
public class GlobalExceptionInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata headers, ServerCallHandler<ReqT, RespT> next) { ServerCall.Listener<ReqT> delegate = null; try { delegate = next.startCall(serverCall, headers); } catch(Exception ex) { return handleInterceptorException(ex, serverCall); } return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) { @Override public void onHalfClose() { try { super.onHalfClose(); } catch (Exception ex) { handleEndpointException(ex, serverCall); } } }; } private static <ReqT, RespT> void handleEndpointException(Exception ex, ServerCall<ReqT, RespT> serverCall) { String ticket = new TicketService().createTicket(ex.getMessage()); serverCall.close(Status.INTERNAL .withCause(ex) .withDescription(ex.getMessage() + ", Ticket raised:" + ticket), new Metadata()); } private <ReqT, RespT> ServerCall.Listener<ReqT> handleInterceptorException(Throwable t, ServerCall<ReqT, RespT> serverCall) { String ticket = new TicketService().createTicket(t.getMessage()); serverCall.close(Status.INTERNAL .withCause(t) .withDescription("An exception occurred in a subsequent interceptor:" + ", Ticket raised:" + ticket), new Metadata()); return new ServerCall.Listener<ReqT>() { // no-op }; } } |
- ServerCall:帮助接收响应消息
- Metadata元数据:保存传入请求的元数据
- ServerCallHandler:帮助将传入的服务器调用分派到拦截器链中的下一个处理器
类似地,第二个try – catch块处理从 RPC 端点抛出的未捕获的异常。 InterceptCall ()方法返回ServerCall.Listener,充当传入 RPC 消息的回调。具体来说,它返回ForwardingServerCallListener的实例。SimpleForwardingServerCallListener是ServerCall.Listener的子类。
为了处理下游方法抛出的异常,我们重写了ForwardingServerCallListener类中的onHalfClose()方法。SimpleForwardingServerCallListener。一旦客户端完成发送消息,它就会被调用。
在此方法中,super.onHalfClose()将请求转发到OrderProcessorImpl类中的RPC 端点createOrder()。如果端点中有未捕获的异常,我们会捕获该异常,然后调用handleEndpointException()来创建票证。最后,我们调用serverCall对象上的close()方法来关闭服务器调用并将响应发送回客户端。
2.注册全局异常处理程序 我们在启动期间创建io.grpc.Server对象时注册拦截器:
public class OrderProcessingServer { public static void main(String[] args) throws IOException, InterruptedException { Server server = ServerBuilder.forPort(8080) .addService(new OrderProcessorImpl()) .intercept(new LogInterceptor()) .intercept(new GlobalExceptionInterceptor()) .build(); server.start(); server.awaitTermination(); } } |
3.处理来自端点的未捕获异常 为了演示异常处理程序,我们首先看一下OrderProcessorImpl类:
public class OrderProcessorImpl extends OrderProcessorGrpc.OrderProcessorImplBase { @Override public void createOrder(OrderRequest request, StreamObserver<OrderResponse> responseObserver) { if (!validateOrder(request)) { throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription("Order Validation failed")); } else { OrderResponse orderResponse = processOrder(request); responseObserver.onNext(orderResponse); responseObserver.onCompleted(); } } private Boolean validateOrder(OrderRequest request) { int tax = 100/0; return false; } private OrderResponse processOrder(OrderRequest request) { return OrderResponse.newBuilder() .setOrderID("ORD-5566") .setResponse("Order placed successfully") .build(); } } |
现在,让我们运行该服务并看看它如何处理异常:
@Test void whenRuntimeExceptionInRPCEndpoint_thenHandleException() { OrderRequest orderRequest = OrderRequest.newBuilder() .setProduct("PRD-7788") .setQuantity(1) .setPrice(5000) .build(); try { OrderResponse response = orderProcessorBlockingStub.createOrder(orderRequest); } catch (StatusRuntimeException ex) { assertTrue(ex.getStatus() .getDescription() .contains("Ticket raised:TKT")); } } |
这对于流媒体案例也同样有效。
4.处理拦截器中未捕获的异常 假设在GlobalExceptionInterceptor 之后有第二个拦截器被调用。 LogInterceptor记录所有传入请求以用于审核目的。我们来看一下:
public class LogInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> next) { logMessage(serverCall); ServerCall.Listener<ReqT> delegate = next.startCall(serverCall, metadata); return delegate; } private <ReqT, RespT> void logMessage(ServerCall<ReqT, RespT> call) { int result = 100/0; } } |
让我们运行该服务并看看它如何处理LogInterceptor引发的异常:
@Test void whenRuntimeExceptionInLogInterceptor_thenHandleException() { OrderRequest orderRequest = OrderRequest.newBuilder() .setProduct("PRD-7788") .setQuantity(1) .setPrice(5000) .build(); try { OrderResponse response = orderProcessorBlockingStub.createOrder(orderRequest); } catch (StatusRuntimeException ex) { assertTrue(ex.getStatus() .getDescription() .contains("An exception occurred in a subsequent interceptor:, Ticket raised:TKT")); } logger.info("order processing over"); } |