随着Kafka版本2.0.0 的KIP-255(Kafka改进提案)的提交,现在我们可以使用SASL(简单认证和安全层)OAUTHBEARER来验证客户端到代理或中间人身份验证。
先决条件
- Docker
- Docker-compose
- Git
实现Java类支持OAuth机制
有了KIP-255的文档,我们需要实现2个类来使用外部OAuth2服务器来验证我们的客户端或代理。
第一个类实现AuthenticateCallbackHandler,并将为需要进行身份验证的客户端或代理服务。
public class OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler { private final Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.class); private Map<String, String> moduleOptions = null; private boolean configured = false;
@Override public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) throw new IllegalArgumentException( String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size())); this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions()); configured = true; }
public boolean isConfigured(){ return this.configured; }
@Override public void close() { }
@Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { if (!isConfigured()) throw new IllegalStateException("Callback handler not configured"); for (Callback callback : callbacks) { if (callback instanceof OAuthBearerTokenCallback) try { handleCallback((OAuthBearerTokenCallback) callback); } catch (KafkaException e) { throw new IOException(e.getMessage(), e); } else throw new UnsupportedCallbackException(callback); } }
private void handleCallback(OAuthBearerTokenCallback callback){ if (callback.token() != null) throw new IllegalArgumentException("Callback had a token already");
log.info("Try to acquire token!"); OauthBearerTokenJwt token = OauthHttpCalls.login(null); log.info("Retrieved token.."); if(token == null){ throw new IllegalArgumentException("Null token returned from server"); } callback.token(token); }
}
|
第二个类实现相同的类,能让Kafka使用OAuth令牌自查来验证发送令牌。
public class OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler { private final Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.class); private List<AppConfigurationEntry> jaasConfigEntries; private Map<String, String> moduleOptions = null; private boolean configured = false; private Time time = Time.SYSTEM;
@Override public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) throw new IllegalArgumentException( String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size())); this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions()); configured = true; }
public boolean isConfigured(){ return this.configured; }
@Override public void close() { }
@Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { if (!isConfigured()) throw new IllegalStateException("Callback handler not configured"); for (Callback callback : callbacks) { if (callback instanceof OAuthBearerValidatorCallback) try { OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback; handleCallback(validationCallback); } catch (KafkaException e) { throw new IOException(e.getMessage(), e); } else throw new UnsupportedCallbackException(callback); } }
private void handleCallback(OAuthBearerValidatorCallback callback){ String accessToken = callback.tokenValue(); if (accessToken == null) throw new IllegalArgumentException("Callback missing required token value");
log.info("Trying to introspect Token!"); OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken); log.info("Trying to introspected");
// Implement Check Expire Token.. long now = time.milliseconds(); if(now > token.expirationTime()){ OAuthBearerValidationResult.newFailure("Expired Token, needs refresh!"); }
log.info("Validated! token.."); callback.token(token); }
}
|
server.properties文件
在这个文件中,我们将设置将用于在OAuth2服务器中进行登录和验证的类。完整的server.properties文件位于GitHub存储库中。
# ###########################的OAuth类################### ########## listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler
|
启动OAuth2服务器和我们的Kafka
此项目需要OAuth2服务器来提供客户端或代理的令牌和验证。一个简单而开源的替代方案是使用ORY Hydra,这是一个用Go编写的认证OAuth2服务器。对于此示例,我们使用docker-compose文件来设置服务器并创建3个帐户:
- consumer-kafka:用于消费者容器
- producer-kafka:用于生产者容器
- broker-kafka:用于interbroker身份验证
为了启动我们的OAuth2服务器和我们的Kafka代理,我们需要克隆kafka-playground GitHub存储库并在根文件夹中运行docker-compose文件。在运行docker-compose之前,我们需要设置一个名为HOST_IP的环境变量。HOST_IP=XXX.XXX.XXX.XXX docker-compose up
|
配置我们的客户端(生产者/消费者/流)
在Git的存储库中,我们有一个名为kafka-using-java的文件夹,它包含一个生成器示例,使用我们的.jar文件。 要运行此示例,您需要设置HOST_IP环境变量,其中包含正在运行的计算机的IP地址。
HOST_IP=XXX.XXX.XXX.XXX docker-compose up
|
源码: Github