如何为Kafka设置OAuth2安全机制?


随着Kafka版本2.0.0 的KIP-255(Kafka改进提案)的提交,现在我们可以使用SASL(简单认证和安全层)OAUTHBEARER来验证客户端到代理或中间人身份验证。

先决条件

  • Docker
  • Docker-compose
  • Git

实现Java类支持OAuth机制
有了KIP-255的文档,我们需要实现2个类来使用外部OAuth2服务器来验证我们的客户端或代理。
第一个类实现AuthenticateCallbackHandler,并将为需要进行身份验证的客户端或代理服务。


public class OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
    private final Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.class);
    private Map<String, String> moduleOptions = null;
    private boolean configured = false;

    @Override
    public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
            throw new IllegalArgumentException(
                    String.format(
"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
                            jaasConfigEntries.size()));
        this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = true;
    }

    public boolean isConfigured(){
        return this.configured;
    }

    @Override
    public void close() {
    }

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException(
"Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerTokenCallback)
                try {
                    handleCallback((OAuthBearerTokenCallback) callback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerTokenCallback callback){
        if (callback.token() != null)
            throw new IllegalArgumentException(
"Callback had a token already");

        log.info(
"Try to acquire token!");
        OauthBearerTokenJwt token = OauthHttpCalls.login(null);
        log.info(
"Retrieved token..");
        if(token == null){
            throw new IllegalArgumentException(
"Null token returned from server");
        }
        callback.token(token);
    }

}

第二个类实现相同的类,能让Kafka使用OAuth令牌自查来验证发送令牌。


public class OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
    private final Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.class);
    private List<AppConfigurationEntry> jaasConfigEntries;
    private Map<String, String> moduleOptions = null;
    private boolean configured = false;
    private Time time = Time.SYSTEM;

    @Override
    public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
            throw new IllegalArgumentException(
                    String.format(
"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
                            jaasConfigEntries.size()));
        this.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = true;
    }

    public boolean isConfigured(){
        return this.configured;
    }

    @Override
    public void close() {
    }

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException(
"Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerValidatorCallback)
                try {
                    OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
                    handleCallback(validationCallback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerValidatorCallback callback){
        String accessToken = callback.tokenValue();
        if (accessToken == null)
            throw new IllegalArgumentException(
"Callback missing required token value");

        log.info(
"Trying to introspect Token!");
        OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
        log.info(
"Trying to introspected");

       
// Implement Check Expire Token..
        long now = time.milliseconds();
        if(now > token.expirationTime()){
            OAuthBearerValidationResult.newFailure(
"Expired Token, needs refresh!");
        }

        log.info(
"Validated! token..");
        callback.token(token);
    }

}

server.properties文件
在这个文件中,我们将设置将用于在OAuth2服务器中进行登录和验证的类。完整的server.properties文件位于GitHub存储库中。

# ###########################的OAuth类################### ##########
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler


启动OAuth2服务器和我们的Kafka
此项目需要OAuth2服务器来提供客户端或代理的令牌和验证。一个简单而开源的替代方案是使用ORY Hydra,这是一个用Go编写的认证OAuth2服务器。对于此示例,我们使用docker-compose文件来设置服务器并创建3个帐户:

  • consumer-kafka:用于消费者容器
  • producer-kafka:用于生产者容器
  • broker-kafka:用于interbroker身份验证

为了启动我们的OAuth2服务器和我们的Kafka代理,我们需要克隆kafka-playground GitHub存储库并在根文件夹中运行docker-compose文件。在运行docker-compose之前,我们需要设置一个名为HOST_IP的环境变量。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up


配置我们的客户端(生产者/消费者/流)
在Git的存储库中,我们有一个名为kafka-using-java的文件夹,它包含一个生成器示例,使用我们的.jar文件。 要运行此示例,您需要设置HOST_IP环境变量,其中包含正在运行的计算机的IP地址。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up

源码: Github