Vert.x是一个非常高性能的库,用于实现低延迟服务。它的多反应堆模式使得在几毫秒内每秒处理许多请求成为可能。
使用实时出价,我们每秒收到数千个请求,我们必须在不到100毫秒的时间内回答。这就是我们选择Vert.x的原因。
在本文中,我将向您介绍我们从基于该库的4年运营生产服务中学到的经验教训。
基础
vert.x应用程序看起来像下面的代码片段。main方法创建一个vertx实例; 然后使用一些部署选项通过此实例部署Verticle。
public static void main(String[] args) { VertxOptions vertxOptions = new VertxOptions().setMetricsOptions(metricsOptions); Vertx vertx = Vertx.vertx(vertxOptions); DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(instances); vertx.deployVerticle(MainVerticle.class, deploymentOptions); }
|
端点
端点通过路由器公开,路由器将路由映射到处理程序,请求处理程序基本上是您的业务代码。
public class MainVerticle extends AbstractVerticle { @Override public void start(Vertx vertx) { Router router = Router.router(vertx); router.route().consumes("application/json"); router.route().produces("application/json"); router.route().handler(BodyHandler.create()); router.get("/ping").handler(pingHandler); vertx.createHttpServer().requestHandler(router).listen(8080); } }
|
请求处理程序
处理程序负责对系统中发生的事情做出反应。一旦异步操作结束(通过失败,成功或超时),它们将管理代码执行。
public class PingHandler implements Handler<RoutingContext> { public void handler(RoutingContext context) { context.response().setStatusCode(200).end("pong"); } }
|
失败处理程序
如果在请求处理期间发生故障,我们还可以将故障处理程序附加到Route以执行一段代码。
失败处理程序非常适合确保连接正确关闭,将度量标准记录为可帮助我们分析应用程序行为的错误类型,尤其是意外错误。
public class FailureHandler implements Handler<RoutingContext> { public void handle(RoutingContext context) { Throwable thrown = context.failure(); recordError(thrown); context.response().setStatusCode(500).end(); } private void recordError(Throwable throwable) { // Your logging/tracing/metrics framework here } }
|
自适应性
1. 配置
应用程序的某些部分是可配置的,因为我们希望按照12个因素原则将相同的二进制文件执行到多个环境中。
Vert.x生态系统提供了一个vertx-config模块,它是一个非常精心设计的模块,用于处理配置加载。它围绕ConfigStore概念进行组织,代表任何能够包含配置的东西(Redis,Consul,Files,Environment Variables)。
如下例所示,配置存储可以使用ConfigRetriever进行链接,最新值将覆盖第一个值。
private static ConfigRetrieverOptions getConfigRetrieverOptions() { JsonObject classpathFileConfiguration = new JsonObject().put("path", "default.properties"); ConfigStoreOptions classpathFile = new ConfigStoreOptions() .setType("file") .setFormat("properties") .setConfig(classpathFileConfiguration);
JsonObject envFileConfiguration = new JsonObject().put("path", "/etc/default/demo"); ConfigStoreOptions envFile = new ConfigStoreOptions() .setType("file") .setFormat("properties") .setConfig(envFileConfiguration) .setOptional(true);
JsonArray envVarKeys = new JsonArray(); for (ConfigurationKeys key : ConfigurationKeys.values()) { envVarKeys.add(key.name()); } JsonObject envVarConfiguration = new JsonObject().put("keys", envVarKeys); ConfigStoreOptions environment = new ConfigStoreOptions() .setType("env") .setConfig(envVarConfiguration) .setOptional(true);
return new ConfigRetrieverOptions() .addStore(classpathFile) // local values : exhaustive list with sane defaults .addStore(environment) // Container / PaaS friendly to override defaults .addStore(envFile) // external file, IaaS friendly to override defaults and config hot reloading .setScanPeriod(5000); }
|
使用外部配置系统(如Redis,Consul或您的云提供商,如Google Runtime Config)时,此设计特别好。您的系统将尝试从远程系统检索配置,如果失败则返回到环境变量,如果未定义,则将使用默认配置文件。请注意将远程存储设置为可选存储,以避免在发生远程系统故障时出现异常。
配置也是一种热重新加载,这使我们的应用程序能够在不停机的情况下调整其行为。ConfigurationRetriever会定期刷新配置(默认为5秒),因此您可以将其注入应用程序,然后调用它以检索其最新值。
我们的主要方法现在看起来像这样,请注意我们在部署Verticle之前等待第一次配置检索,并通过Event Bus传播配置更改。
public static void main(String[] args) { VertxOptions vertxOptions = new VertxOptions().setMetricsOptions(metricsOptions); Vertx vertx = Vertx.vertx(vertxOptions);
ConfigRetrieverOptions configRetrieverOptions = getConfigRetrieverOptions(); ConfigRetriever configRetriever = ConfigRetriever.create(vertx, configRetrieverOptions);
// getConfig is called for initial loading configRetriever.getConfig( ar -> { int instances = Runtime.getRuntime().availableProcessors(); DeploymentOptions deploymentOptions = new DeploymentOptions().setInstances(instances).setConfig(ar.result()); vertx.deployVerticle(MainVerticle.class, deploymentOptions); });
// listen is called each time configuration changes configRetriever.listen( change -> { JsonObject updatedConfiguration = change.getNewConfiguration(); vertx.eventBus().publish(EventBusChannels.CONFIGURATION_CHANGED.name(), updatedConfiguration); }); }
|
我们的Verticle现在需要通过订阅EventBus主题来决定如何对配置更改做出反应。
package co.teemo.blog.verticles;
public class MainVerticle extends AbstractVerticle { private PingHandler pingHandler; private FailureHandler failureHandler;
@Override public void start() { this.pingHandler = new PingHandler(); this.failureHandler = new FailureHandler(); configureRouteHandlers(config()); configureEventBus();
Router router = Router.router(vertx); router.route().consumes("application/json"); router.route().produces("application/json"); router.route().handler(BodyHandler.create());
router.get("/ping").handler(pingHandler).failureHandler(failureHandler); vertx.createHttpServer().requestHandler(router).listen(8080); }
private void configureEventBus() { vertx.eventBus().<JsonObject>consumer( EventBusChannels.CONFIGURATION_CHANGED.name(), message -> { logger.debug("Configuration has changed, verticle {} is updating...", deploymentID()); configureRouteHandlers(message.body()); logger.debug("Configuration has changed, verticle {} has been updated...", deploymentID()); }); }
private void configureRouteHandlers(JsonObject configuration) { String pingResponse = configuration.getString(ConfigurationKeys.PING_RESPONSE.name()); pingHandler.setMessage(pingResponse); } }
|
例如,我选择改变每个处理程序中的变量以调整端点响应。
public class PingHandler implements Handler<RoutingContext> { private String pingResponse;
public void handle(RoutingContext context) { context.response().setStatusCode(200).end(pingResponse); }
public void setPingResponse(String pingResponse) { this.pingResponse = pingResponse; } }
|
关于模块可扩展性,因为添加新的配置存储非常容易,我们需要扩展vertx-config以支持Google Runtime Config,实现它是一件轻而易举的事。您只需要实现两个类:ConfigStore定义如何从存储中检索配置,ConfigStoreFactory定义如何创建ConfigStore并为其注入配置,如凭据或过滤条件。
设计失败
我们的生产服务依赖于一些外部依赖,数据库,消息队列,键/值存储,远程API,......
由于我们不能依赖外部系统健康,因此当上游依赖性面临中断或意外延迟时,我们必须准备应用程序以适应自身。Vertx生态系统包含一个实现Circuit Breaker模式的模块,可以轻松应对。
对于这个例子,让我们定义一个新的处理程序,它将联系令人敬畏的PokeAPI列出口袋妖怪!由于这是一个外部依赖,我们需要使用Circuit Breaker包装调用。
public class PokemonHandler implements Handler<RoutingContext> {
private static final Logger logger = LoggerFactory.getLogger(PokemonHandler.class); private static final JsonArray FALLBACK = new JsonArray();
private final WebClient webClient; private final CircuitBreaker circuitBreaker;
private int pokeApiPort; private String pokeApiHost; private String pokeApiPath;
public PokemonHandler(Vertx vertx) { WebClientOptions options = new WebClientOptions().setKeepAlive(true).setSsl(true); this.webClient = WebClient.create(vertx, options);
CircuitBreakerOptions circuitBreakerOptions = new CircuitBreakerOptions() .setMaxFailures(3) .setTimeout(1000) .setFallbackOnFailure(true) .setResetTimeout(60000);
this.circuitBreaker = CircuitBreaker.create("pokeapi", vertx, circuitBreakerOptions); this.circuitBreaker.openHandler(v -> logger.info("{} circuit breaker is open", "pokeapi")); this.circuitBreaker.closeHandler(v -> logger.info("{} circuit breaker is closed", "pokeapi")); this.circuitBreaker.halfOpenHandler(v -> logger.info("{} circuit breaker is half open", "pokeapi")); }
@Override public void handle(RoutingContext context) {
Function<Throwable, JsonArray> fallback = future -> FALLBACK; Handler<Future<JsonArray>> processor = future -> { webClient.get(pokeApiPort, pokeApiHost, pokeApiPath).send(result -> { if (result.succeeded()) { future.complete(result.result().bodyAsJsonObject().getJsonArray("results")); } else { future.fail(result.cause()); } }); }; Handler<AsyncResult<JsonArray>> callback = result -> { if (result.succeeded()) { JsonArray pokemons = result.result(); context.response().setStatusCode(200).end(Json.encodePrettily(pokemons)); } else { Throwable cause = result.cause(); logger.error(cause.getMessage(), cause); context.response().setStatusCode(500).end(cause.getMessage()); } };
circuitBreaker.executeWithFallback(processor, fallback).setHandler(callback); }
public void setPokeApiUrl(String pokeApiHost, int pokeApiPort, String pokeApiPath) { this.pokeApiHost = pokeApiHost; this.pokeApiPort = pokeApiPort; this.pokeApiPath = pokeApiPath; } }
|
现在,我们可以使用流量控制模拟网络延迟,并在达到最大故障限制后查看断路器是否打开。然后我们的处理程序将立即回复后备值。
tc qdisc add dev eth0 root netem delay 2000ms
要模拟外部服务恢复,让我们删除延迟规则并观察断路器是否会再次关闭,并返回远程API响应。
tc qdisc add del eth0 root netem
观测
没有人不会对生产进行监视,因此我们必须定义如何在运行时观察我们的应用程序。
健康检查
观察我们心爱的软件最基本的方法是询问它如何定期运行,感谢vertx生态系统,有一个模块!
我们的策略是暴露两个端点,一个用于判断应用程序是否存活,另一个用于判断应用程序是否健康。它可能没有健康就活着,因为它需要加载一些初始数据。
我喜欢使用“健康”端点进行监控,以了解由于外部依赖性故障导致服务质量下降的时间,并使用活动端点进行警报,因为它需要外部操作来恢复(重新启动服务,替换实例, ...)
让我们将这些健康检查添加到PokemonHandler中。
public class PokemonHandler implements Handler<RoutingContext> {
// ... private final HealthChecks healthChecks;
public PokemonHandler(Vertx vertx) {
// ... this.healthChecks = HealthChecks.create(vertx); healthChecks.register("pokeApiHealthcheck", 1000, future -> { if (circuitBreaker.state().equals(CircuitBreakerState.CLOSED)) { future.complete(Status.OK()); } else { future.complete(Status.KO()); } }); } // ...
public HealthChecks getHealthchecks() { return healthChecks; } }
|
现在我们需要在路由器中公开相关的端点。
public class MainVerticle extends AbstractVerticle {
// ...
@Override public void start() { // ... router.get("/alive").handler(HealthCheckHandler.create(vertx)); router.get("/healthy").handler(HealthCheckHandler.createWithHealthChecks(healthChecks));
vertx.createHttpServer().requestHandler(router).listen(8080); } }
|
通过这种配置,我们的软件可以告诉我们它是否存活,在这种情况下,它是否已准备好在最佳条件下实现其目标。这是我在脑海里做的映射:
- 活着和健康:正常,一切都很好。
- 活着不健康:警告,应用程序在降级模式下工作。
- 不活着:CRITICAL,应用程序不起作用。
日志
一旦发出警报,我们首先要做的就是查看指标和错误日志。因此,我们需要以可利用的方式发布日志。
我们要:
- 使用易于解析的格式(如JSON)导出日志。
- 添加有关执行环境(主机,容器ID,平台,环境)的信息,以确定问题是否容易与进程隔离; 计算节点或应用程序的版本。
- 添加有关执行上下文(用户ID,会话ID,相关ID)的信息,以了解导致错误的序列。
格式:
我们需要配置logback以输出具有预期格式的日志。<configuration> <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <layout class="ch.qos.logback.contrib.json.classic.JsonLayout"> <timestampFormat>yyyy-MM-dd'T'HH:mm:ss.SSSX</timestampFormat> <timestampFormatTimezoneId>Etc/UTC</timestampFormatTimezoneId> <jsonFormatter class="ch.qos.logback.contrib.jackson.JacksonJsonFormatter"> <prettyPrint>true</prettyPrint> </jsonFormatter> </layout> </encoder> </appender>
|
你需要知道的一个小技巧是vertx日志记录本身并不关心logback配置,我们必须通过将“vertx.logger-delegate-factory-class-name”系统属性设置为“io.vertx.core.logging.SLF4JLogDelegateFactory”来明确告诉我们要使用的内容。
System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory");
|
您也可以使用-Dvertx.logger-delegate-factory-class-name = io.vertx.core.logging.SLF4JLogDelegateFactory参数启动应用程序,在JVM级别执行此操作。
现在我们的应用程序输出标准JSON,让我们添加一些关于它运行的环境的上下文数据。为此,我们使用MDC来维护可以添加元数据的上下文。
package co.teemo.blog.verticles;
public class MainVerticle extends AbstractVerticle { private static final Logger logger = LoggerFactory.getLogger(MainVerticle.class);
private PingHandler pingHandler; private FailureHandler failureHandler; private PokemonHandler pokemonHandler; private GreetingHandler greetingHandler;
@Override public void start() { configureLogging(); // ... }
private static void configureLogging() { // It's OK to use MDC with static values MDC.put("application", "blog"); MDC.put("version", "1.0.0"); MDC.put("release", "canary"); try { MDC.put("hostname", InetAddress.getLocalHost().getHostName()); } catch (UnknownHostException e) { // Silent error, we can live without it } } }
|
请注意,MDC不能用于添加上下文元数据,如userId,requestId,sessionId,correlationId,...),因为它依赖于与vertx异步性质不兼容的线程本地值。请参阅此主题以进一步挖掘。
我们需要另一种解决方案来记录这些数据......作为一种解决方法,让我们将它们添加到日志消息本身,让我们的集中式日志平台解析它并将其转换为元数据。
private void logError(String userId, Throwable thrown) { String dynamicMetadata = ""; if(userId != null) { dynamicMetadata = String.format("userId=%s ", userId); } logger.error(dynamicMetadata + thrown.getMessage()); }
|
现在,触发验证错误会给我们一个充满上下文的消息,使日志成为分析导致错误的路径的强大工具。
{ "timestamp" : "2019-02-08T14:44:09.207Z", "level" : "ERROR", "thread" : "vert.x-eventloop-thread-5", "mdc" : { "hostname" : "mikael-XPS-13-9360", "application" : "blog", "release" : "canary", "version" : "1.0.0" }, "logger" : "co.teemo.blog.handlers.FailureHandler", "message" : "userId=toto Name must start with an uppercase char", "context" : "default" }
|
度量
Vertx与Dropwizard Metrics和Micrometer本地集成。假设我们想要查看每个端点处理的请求数。我不会演示如何向后端报告指标,但您基本上有两个选项:Dropwizard Reporters和Opencensus。
以下是使用标准Dropwizard Metric Reporter配置指标的方法。
package co.teemo.blog;
public class Application {
public static void main(String[] args) { // Initialize metric registry String registryName = "registry"; MetricRegistry registry = SharedMetricRegistries.getOrCreate(registryName); SharedMetricRegistries.setDefault(registryName);
Slf4jReporter reporter = Slf4jReporter.forRegistry(registry) .outputTo(LoggerFactory.getLogger(Application.class)) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .build(); reporter.start(1, TimeUnit.MINUTES);
// Initialize vertx with the metric registry DropwizardMetricsOptions metricsOptions = new DropwizardMetricsOptions() .setEnabled(true) .setMetricRegistry(registry); VertxOptions vertxOptions = new VertxOptions().setMetricsOptions(metricsOptions); Vertx vertx = Vertx.vertx(vertxOptions); // ... } }
|
我们可以及时观察服务水平指标的演变。它允许我们查看应用程序指标(请求/秒,HTTP响应代码,线程使用情况......)和业务指标(对我的应用程序有意义的那些,如新客户,停用客户,平均活动持续时间......)。
例如,如果我们想要计算应用程序错误,我们现在可以像往常一样使用Dropwizard。
package co.teemo.blog.handlers;
public class FailureHandler implements Handler<RoutingContext> {
private final Counter validationErrorsCounter;
public FailureHandler() { validationErrorsCounter = SharedMetricRegistries.getDefault().counter("validationErrors"); }
public void handle(RoutingContext context) { Throwable thrown = context.failure(); recordError(thrown); context.response().setStatusCode(500).end(thrown.getMessage()); }
private void recordError(String userId, Throwable thrown) { validationErrorsCounter.inc(); } }
|
跟踪
最后但同样重要的是,我们可能需要向应用程序添加跟踪,以了解当我们检测到意外行为并在我们的系统中遍历特定请求时会发生什么。
Vert.x目前尚未准备好处理此功能集,但它肯定朝着这个方向前进。请参阅RFC。
安全
这并不奇怪,我们需要在将其部署到生产环境之前保护对应用程序的访问。
1. 身份验证和授权
我们使用Auth0来处理这些目的。由于值不是公开的,我没有在Github存储库中添加本文的这一部分。相反,我给你链接到一些有用的资源:
请记住,您永远不应该信任网络,因此请在应用程序级别实施身份验证和授权。
用于将身份与每个操作相关联的身份验证,根据与执行操作的身份相关联的权限来限制操作的授权。例如,我们的健康端点可以公开访问,但应该使用最小特权原则限制操作。
使用Auth0,解码和验证令牌足以执行身份验证,检查范围是执行授权所必需的。
2.输入验证
应始终验证输入,以确保我们的应用程序永远不会处理可能导致系统损坏的恶意或错误数据。再一次,有一个模块!
让我们为我们的应用程序添加一个新的“hello world”端点,并说它只能使用以下参数调用:
- 一个名称:是需要字符串开头以大写字符与字母字符构成的路径参数。
- 一个授权所需要串标头
- 一个版本标头,是一个可选的INT
让我们将验证和问候处理程序添加到我们的路由器,而不要忘记实现自定义验证器来处理名称的业务验证。public class GreetingHandler implements Handler<RoutingContext> {
@Override public void handle(RoutingContext routingContext) { // Thanks to the validation handler, we are sure required parameters are present String name = routingContext.request().getParam("name"); String authorization = routingContext.request().getHeader("Authorization"); int version = Integer.valueOf(routingContext.request().getHeader("Version"));
String response = String.format("Hello %s, you are using version %d of this api and authenticated with %s", name, version, authorization); routingContext.response().setStatusCode(200).end(response); } }
public class MainVerticle extends AbstractVerticle { // ... private GreetingHandler greetingHandler;
@Override public void start() { //... this.greetingHandler = new GreetingHandler();
HTTPRequestValidationHandler greetingValidationHandler = HTTPRequestValidationHandler .create() .addHeaderParam("Authorization", ParameterType.GENERIC_STRING, true) .addHeaderParam("Version", ParameterType.INT, true) .addPathParamWithCustomTypeValidator("name", new NameValidator(), false);
router.get("/greetings/:name") .handler(greetingValidationHandler) .handler(greetingHandler) .failureHandler(failureHandler);
//... } }
public class NameValidator implements ParameterTypeValidator { @Override public RequestParameter isValid(String value) throws ValidationException { if(!isFirstCharUpperCase(value)) { throw new ValidationException("Name must start with an uppercase char"); } if(!isStringAlphabetical(value)) { throw new ValidationException("Name must be composed with alphabetical chars"); } return RequestParameter.create(value); }
private static boolean isFirstCharUpperCase(String value) { return Character.isUpperCase(value.charAt(0)); }
private static boolean isStringAlphabetical(String value) { return value.chars().allMatch(Character::isAlphabetic); } }
|
如果我们在没有任何必需参数的情况下调用端点,则抛出ValidationException,并且注册的FailureHandler将继续请求处理。
curl -i -H 'Authorization: toto' -H 'Version: 1' http://localhost:8080/greetings/fds > HTTP/1.1 400 Bad Request > content-length: 38 > Name must start with an uppercase char
|
结论
Vert.x生态系统在模块数量方面令人印象深刻,这些模块几乎涵盖了我们生产中所需的所有功能。该库设计得很好,并为每个概念提出了SPI,这使得它非常易于扩展。
例如,我们需要添加新的配置商店来加载来自Google运行时配置和Google计算元数据的配置,第一个实施草案花了我们不到一天的时间!
虽然跟踪是Observability的缺失部分,但它不是阻止在生产环境中发布我们的服务,因为我们能够通过运行状况检查,度量标准和日志来观察它。
您可以在此处找到源代码:https://github.com/migibert/vertx-in-production