在微服务架构中实施分布式事务锁的几个方案比较 - Prasanth Gullapalli


众所周知,锁通常用于监视和控制多个线程同时访问共享资源。它们基本上保护并发应用程序中的数据完整性和原子性,即,一次只能有一个线程可以获取共享资源上的锁,否则将无法访问该锁。但是在分布式环境中的锁定不仅仅是在多线程应用程序中的互斥锁。由于必须立即跨集群或网络中的任何节点出现故障的所有节点获取锁定,因此情况变得更加复杂。
这是我们看看这样案例:该应用以用户首选的格式获取数据,并将其转换为可上传至政府门户的标准化格式(如PDF)。该应用程序有两种不同的微服务可以执行以下操作:Transformer和Rules Engine。我们已经使用Cassandra进行持久化,并使用Kafka作为消息队列。另外,请注意,一旦接受用户请求,则立即返回。PDF生成后,将异步通知用户。这是通过以下步骤实现的:

  • 用户请求被放入消息队列。
  • 一旦Transformer服务接收到用户请求,它将把用户上传的文件转换为Rule Engine可以理解的格式。
  • 现在,数据通过规则引擎获取,规则引擎更新数据点
  • 最后,将数据转换为PDF并通知用户。

首先,让我们尝试理解为什么在分布式环境中根本需要获取锁。以下是我们使用分布式锁的用例:

  1. 效率:这是为了确保同一昂贵的计算不会多次发生。例如:假设用户已上传文件进行处理。由于请求数量增加或当前文件太大而无法处理,因此系统上的负担很重,因此可能需要一段时间才能生成PDF。现在,如果用户变得焦躁不安,等待通知,他可以再次上传文件进行处理(此后不必要地增加了系统的负担)。通过在处理文件之前对文件的校验和进行锁定可以避免这种情况。
  2. 正确性:这是为了避免应用程序中的数据损坏。当使用锁时,系统中的两个并发/并行进程不会弄乱基础数据。如果两个进程同时对基础数据集进行操作而没有获取锁定,则很有可能损坏数据。例如:假设我们已经从用户那里获得了销售交易和订单项数据。交易级别的税额是根据交易级别已经征收的税额与行级别存在的任何其他税额之和计算得出的。现在,如果在两个不同的节点中并行执行同一事务的规则,则很有可能该行项目的税额增加两次。如果我们锁定事务级别,则可以避免这种情况。

请注意,锁通常不是一个好主意。阻塞操作通过限制系统的计算能力,增加了对那里基础资源的争用。此外,由于以下原因,尝试锁定分布式环境会更加困难和危险:
  • 当获取它的节点在没有释放的情况下崩溃时,该锁会发生什么情况?
  • 我们如何处理网络分区的情况?
  • 这些将把共识的其他方面带入画面。我们将在一段时间内讨论分布式共识的想法。

因此,出于上述所有原因,如果存在任何其他解决方案,我们应尽量避免使用这些锁定。这是可以在应用程序中使用的两种可能的方法:
  1. 乐观锁定:  在这种情况下,资源实际上并未锁定。在提交事务之前,我们检查资源是否由其他人更新。如果数据是陈旧的,则事务将回滚,并向用户抛出错误指示该错误。与此相反,悲观锁定是指您采用排他锁定时,其他任何人都无法修改资源。例如:数据库中的选择更新锁,Java锁。Hibernate提供了乐观锁定的支持。您可以在此处了解更多信息。
  2. Kafka中分区的用法:如前所述,在处理用户请求之前,我们始终将其保留在Kafka中。因为可用性是应用程序的核心体系结构原理之一,所以它是通过这种方式完成的。我们不希望当某些高峰使用期间负载增加多倍时应用程序崩溃。Kafka将针对某个主题发布的消息存储在内部的多个分区中。而且,它可以确保始终将给定分区中的消息以与发布时相同的顺序提供给使用者。利用此信息,我们将不想并行处理的所有请求发布到了同一分区(因此使用了锁)。这可以通过在将消息发布到Kafka时指定分区键来完成。具有相同密钥的消息将发布到同一分区。现在,随着消息从分区中顺序接收。

分布式锁定
在某些情况下,我们更喜欢采用分布式锁定,它不属于上述情况。当我们谈论分布式锁时,就会出现分布式共识。共识可以定义为使集群中的所有节点根据其投票就某个特定值达成共识的过程。所有节点都必须同意相同的值,并且该值必须是至少一个节点提交的值。现在,当说某个特定节点获取集群中的分布式锁时,集群中的其余节点必须同意该锁已被其使用。有多种共识算法,例如Paxos,Raft,ZAB,Pacifica等。我在博客末尾提供了一些链接,供那些对此感兴趣的人解释这些算法。
  • 对称/无领导者:在这里,所有参与共识的服务器都具有相同的角色。因此,在这种情况下,客户端可以连接到任何服务器。例如:Paxos
  • 基于非对称/基于领导者:在任何给定时间,一台服务器都充当参与共识的服务器的领导者。其余服务器接受领导者的决定。在这里,客户只能与领导者交流。示例:Raft,ZAB

几十年来,Paxos已成为共识的代名词。但是现在,如上所述有共识的不同实现。Raft实际上克服了传统Paxos的一些缺点。对于上述每种算法,都有不同的实现方式。对于Ex:Cassandra为轻量级交易实现了Paxos。卡夫卡内部使用Pacifica,而Zookeeper和Hazelcast分别使用ZAB和Raft。这是我们的应用程序中分布式锁的通用接口:
package common.concurrent.lock;
 
import java.util.concurrent.TimeUnit;
 
/**
 * Provides interface for the distributed lock implementations based on Zookeeper and Hazelcast.
 * @author pgullapalli
 */

public interface DistributedLock {
   
/**
     * Acquires the lock. If the lock is not available, the current thread until the lock has been acquired.
     * The distributed lock acquired by a thread has to be released by same thread only.
     **/

    void lock();
 
   
/**
     * This is a non-blocking version of lock() method; it attempts to acquire the lock immediately, return true if locking succeeds.
     * The distributed lock acquired by a thread has to be released by same thread only.
     **/

    boolean tryLock();
 
   
/**
     * Acquires the lock. Blocks until the lock is available or timeout is expired.
     * The distributed lock acquired by a thread has to be released by same thread only.
     **/

    boolean tryLock(long timeout, TimeUnit unit);
 
   
/**
     * Checks if current thread has already acquire the lock.
     * @return
     */

    boolean isLocked();
 
   
/**
     * Releases the lock. This method has to be called by same thread as which has acquired the lock.
     */

    void release();
}
 
public interface DistributedLocker {
 
   
/**
     * This method only fetches the lock object but does not explicitly lock. Lock has to be acquired and released.
     * specifically
     * @param key Fetch the lock object based on the key provided.
     * @return Implementation of DistributedLock object
     */

    DistributedLock getLock(String key);
 
}

 
对于我们的应用程序,以下是我们为实现分布式锁而探索的选项:
a)Zookeeper的InterProcessSemaphoreMutex:由Netflix开源的Curator,它是建立在Zookeeper之上的高级API,提供了许多配方,并处理了管理连接以及对基础ZooKeeper集合进行重试操作的复杂性。InterProcessSemaphoreMutex是Curator Framework的配方,是可重入的互斥量,可在JVM之间使用。它使用Zookeeper来保持锁。跨JVM使用相同锁定路径的所有进程都将达到进程间关键部分。此外,该互斥锁是“公平的” –每个用户将按照请求的顺序获得互斥锁(从Zookeeper的角度来看)。

package common.concurrent.lock.impl;
 
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
 
import java.util.concurrent.TimeUnit;
 
public class ZKBasedDistributedLocker implements DistributedLocker {
    private final CuratorFramework curatorClient;
    private final String basePath;
 
    public ZKBasedDistributedLocker(){
        curatorClient = CuratorFrameworkFactory.newClient("localhost:2181",
                new ExponentialBackoffRetry(1000, 3));
        basePath = new StringBuilder(
"/config/sample-app/distributed-locks/").toString();
    }
 
    @Override
    public DistributedLock getLock(String key) {
        String lock = new StringBuilder(basePath).append(key).toString();
        return new ZKLock(new InterProcessSemaphoreMutex(curatorClient, lock));
    }
 
    private class ZKLock implements DistributedLock {
        private final InterProcessLock lock;
 
        public ZKLock(InterProcessLock lock){
            this.lock = lock;
        }
 
        @Override
        public void lock() {
            try {
                lock.acquire();
            } catch (Exception e) {
                throw new RuntimeException(
"Error while acquiring lock", e);
            }
        }
 
        @Override
        public boolean tryLock() {
            return tryLock(10, TimeUnit.MILLISECONDS);
        }
 
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                return lock.acquire(timeout, unit);
            } catch (Exception e) {
                throw new RuntimeException(
"Error while acquiring lock", e);
            }
        }
 
        @Override
        public boolean isLocked() {
            return lock.isAcquiredInThisProcess();
        }
 
        @Override
        public void release() {
            try {
                lock.release();
            } catch (Exception e) {
                throw new RuntimeException(
"Error while releasing lock", e);
            }
        }
    }
}

由于Zookeeper通常在许多分布式系统中使用,因此使用此选项不需要任何其他锁定框架。但是有一个观察到,随着锁数量的增加,性能会下降。这是由于所有锁实际上都在内部创建为znode。随着znode数量的增加,在列出/删除Zookeeper中的locks文件夹时,我们甚至开始遇到问题。因此,对于需要较少数量锁的情况,Zookeeper非常适合。由于应用程序的许多服务可能都依赖Zookeeper,因此Zookeeper的任何问题也可能影响它们。很少有这样的用例,例如微服务向服务发现注册自己,使用Kafka的服务,而Kafka则取决于Zookeeper进行领导者选举。
 
b)来自Cassandra的轻量级事务:在基于主控的分布式系统中很容易实现强一致性。但是,这也意味着如果主服务器宕机,则会影响系统的可用性。Cassandra是无主控系统,并且在可用性与一致性之间进行权衡。它属于CAP定理的AP类别,因此高度可用,并且默认情况下最终保持一致。最终一致表示某个值的写入后读取可能不会产生写入的最新值。但是我们可以通过将查询的一致性级别指定为QUORUM来实现Cassandra中的强一致性。仲裁意味着写入事务只有在将其写入大多数服务器后才能成功。我们可以在Cassandra中实现锁定,如下所示:
  1. 创建表lock_requests(resource_id文本,lock_status文本,created_on时间戳,主键(resource_id));
  2. 试图获取锁的线程将检查锁表中是否存在具有指定键的条目:select * from lock_requests其中resource_id ='ABC';
  3. 如果不存在锁,现在我们说在锁中插入一个条目后就获得了该锁:插入lock_requests(resource_id,lock_status,created_on)values('ABC','Locked',toTimestamp(now()))

但是请注意,如果我们将这些作为与应用程序分开的步骤来进行,则步骤2和3之间的线程之间总是存在竞争状态的可能性。但是,如果数据库本身可以在插入之前检查行是否存在,则可以避免竞争条件。这就是所谓的线性化一致性(即ACID术语中的串行隔离级别)。轻量级交易完全一样。因此,可以将上述步骤2和3结合起来:
insert into lock_requests(resource_id,lock_status,created_on) values('ABC', 'Locked', toTimestamp(now())) if not exists;

如果存在锁,则上述写入失败,因此无法获取锁。现在,下一个问题是,如果获取锁的服务未释放它,将会发生什么情况。服务器可能崩溃了,或者代码可能引发了异常。锁将永远不会被释放。对于这种情况,我们可以为该行定义生存时间(TTL)。这意味着锁定行将在规定的秒数后自动过期。这是我们通过为该行的每个记录定义TTL来实现的方法。

create table lock_requests(resource_id text,lock_status text, created_on timestamp, primary key(resource_id)) with gc_grace_seconds=86400 and default_time_to_live=600;

现在,锁定将在10分钟后自动失效。通过为所有列定义TTL,可以为每一行覆盖此设置。如果我们无法粗略估计一次计算(被锁包围)所花费的时间,则TTL可能无济于事。

package common.concurrent.lock.impl;
 
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
import org.apache.commons.lang3.time.StopWatch;
 
import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
 
public class CassandraDistributedLocker implements DistributedLocker {
    private final CqlSession session;
    private final PreparedStatement selectStatement, insertStatement, deleteStatement;
 
    public CassandraDistributedLocker(){
        session = CqlSession.builder()
                .addContactPoint(new InetSocketAddress("127.0.0.1", 9042))
                .withKeyspace(
"sample").build();
        selectStatement = session.prepare(
               
"select * from lock_requests where resource_id=?");
        insertStatement = session.prepare(
               
"insert into lock_requests(resource_id,lock_status,created_on) values(?,?,?) if not exists");
        deleteStatement = session.prepare(
               
"delete from lock_requests where resource_id=? if exists");
    }
 
    @Override
    public DistributedLock getLock(String key) {
        return new CassandraLock(key);
    }
 
    private class CassandraLock implements DistributedLock{
        private final String key;
 
        public CassandraLock(String key) {
            this.key = key;
        }
 
        @Override
        public void lock() {
            insertLock();
        }
 
        private boolean insertLock() {
            BoundStatement boundStatement = insertStatement.bind()
                    .setString(0, key)
                    .setString(1,
"LOCKED")
                    .setInstant(2, Instant.now());
            ResultSet resultSet = session.execute(boundStatement);
            return resultSet.wasApplied();
// this is equivalent to row.getBool("applied")
        }
 
        @Override
        public boolean tryLock() {
            return tryLock(10, TimeUnit.MILLISECONDS);
        }
 
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                boolean locked = false;
                StopWatch stopWatch = StopWatch.createStarted();
                while(stopWatch.getTime(TimeUnit.SECONDS) < timeout) {
                    if(insertLock()) {
                        locked = true;
                        break;
                    }
                }
                return locked;
            } catch (Exception e) {
                throw new RuntimeException(
"Error while acquiring lock", e);
            }
        }
 
        @Override
        public boolean isLocked() {
            BoundStatement boundStatement = selectStatement.bind().setString(0, key);
            ResultSet resultSet = session.execute(boundStatement);
            Row row = resultSet.one();
            return row != null ?
"LOCKED".equals(row.getString("lock_status")) : false;
        }
 
        @Override
        public void release() {
            try {
                BoundStatement boundStatement = deleteStatement.bind().setString(0, key);
                session.execute(boundStatement);
            } catch (Exception e){
                throw new RuntimeException(
"Error while releasing lock", e);
            }
        }
    }
}

Cassandra内部使用Paxos的修改版本来实现轻量级交易。它进行了4次额外的往返行程以实现此线性化。如果您的应用程序很少需要将每个操作线性化的应用程序,那么这听起来像是很高的成本–也许太高了。但是对于大多数应用程序而言,只有极少数操作需要线性化,这是一个很好的工具,可以增强到目前为止我们提供的强大/最终的一致性。有关更多信息,请参考此链接
当然,仅当应用程序已使用Cassandra进行持久化时,此解决方案才可行。我们还看到轻型卡车在重负荷下会超时。因此,最好谨慎使用这些锁。这些锁的优点之一是,不存在必须由获得该锁的人释放该锁的约束。如果我们遇到这样的场景,其中一个微服务最初会获得一个锁,而另一个服务会在工作流异步完成后释放它,这可能会派上用场。
 
c)使用Hazelcast的分布式锁: Hazelcast IMDG提供了基本Java集合和同步器的分布式版本。Hazelcast API的优点在于,在实现Java API本身时,它们很容易理解。例如: com.hazelcast.map.IMap扩展了java.util.Map。因此,这里的学习曲线较少。分布式MAP实现具有一种锁定特定密钥的方法。如果该锁不可用,则当前线程将被阻塞,直到释放该锁为止。即使MAP上没有钥匙,我们也可以将其锁定。如果MAP中不存在该密钥,则尝试将锁定的密钥放入MAP中时,除锁所有者之外的任何线程都将被阻塞。
package common.concurrent.lock.impl;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
 
import java.util.concurrent.TimeUnit;
 
public class HzMapBasedDistributedLocker implements DistributedLocker {
    private IMap txLockMap;
 
    public HzMapBasedDistributedLocker(){
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        txLockMap = hazelcastInstance.getMap("txLockMap");
    }
 
    @Override
    public DistributedLock getLock(String lockKey) {
        return new HzMapBasedLock(lockKey);
    }
 
    private class HzMapBasedLock implements DistributedLock{
        private final String key;
 
        public HzMapBasedLock(String key) {
            this.key = key;
        }
 
        @Override
        public void lock() {
            txLockMap.lock(key);
        }
 
        @Override
        public boolean tryLock() {
            return txLockMap.tryLock(key);
        }
 
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                return txLockMap.tryLock(key, timeout, unit);
            } catch (Exception e) {
                throw new RuntimeException(
"Error while acquiring lock", e);
            }
        }
 
        @Override
        public boolean isLocked() {
            return txLockMap.isLocked(key);
        }
 
        @Override
        public void release() {
            try {
                txLockMap.unlock(key);
            } catch (Exception e){
                throw new RuntimeException(
"Error while releasing lock", e);
            }
        }
    }
}

请注意,Hazelcast IMDG实施也属于CAP系统的AP类别。但是,强一致性(即使在失败/异常情况下)也是需要分布式协调的所有任务的基本要求。因此,在某些情况下,基于Map实现的现有锁将失败。为了解决这些问题,Hazelcast后来提出了CPSubsystem实现。CP子系统在Raft共识之上获得了新的分布式锁实现。CPSubsystem与Hazelcast IMDG群集的AP数据结构并存。CPSubsystem在所有情况下均保持线性化,包括客户端和服务器故障,网络分区,并防止出现裂脑情况。实际上,Hazelcast声称它们是提供线性化和分布式锁实现的唯一且唯一的解决方案。
package common.concurrent.lock.impl;
 
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import common.concurrent.lock.DistributedLock;
import common.concurrent.lock.DistributedLocker;
 
import java.util.concurrent.TimeUnit;
 
public class HzLockBasedDistributedLocker implements DistributedLocker {
    private HazelcastInstance hazelcastInstance;
 
    public HzLockBasedDistributedLocker(int cpMemberCount){
        Config config = new Config();
        config.getCPSubsystemConfig().setCPMemberCount(3);
        config.getCPSubsystemConfig().setGroupSize(3);
        hazelcastInstance = Hazelcast.newHazelcastInstance(config);
    }
 
    @Override
    public DistributedLock getLock(String key) {
        return wrapHzLock(key);
    }
 
    private DistributedLock wrapHzLock(String key){
        return new HzLock(key);
    }
 
    private class HzLock implements DistributedLock {
        private final FencedLock lock;
 
        public HzLock(String key) {
            this.lock = hazelcastInstance.getCPSubsystem().getLock(key);
        }
 
        @Override
        public void lock() {
            lock.lock();
        }
 
        @Override
        public boolean tryLock() {
            return lock.tryLock();
        }
 
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) {
            try {
                return lock.tryLock(timeout, unit);
            } catch (Exception e) {
                throw new RuntimeException("Error while acquiring lock", e);
            }
        }
 
        @Override
        public boolean isLocked() {
            return lock.isLocked();
        }
 
        @Override
        public void release() {
            try {
                lock.unlock();
               
//((DistributedObject) lock).destroy();
            } catch (Exception e){
                throw new RuntimeException(
"Error while releasing lock", e);
            }
        }
    }
}

上面的代码看起来很干净和简单。但是问题在于,除非明确销毁这些锁,否则它们在Hazelcast中永远不会自行失效。如果未销毁且创建频率更高,那么一段时间后我们可能会遇到内存不足的异常。Hazelcast文档中的以下内容对此进行了澄清:
锁不会自动移除。如果锁不再使用,Hazelcast不会自动在锁中执行垃圾回收。这可能导致OutOfMemoryError。如果您动态创建锁,请确保它们已被销毁。
尽管此修复程序看起来很简单,即取消注释上面代码中的destroy行,但这里的问题是,一旦销毁了锁,除非重新启动,否则无法在同一CP组中重新创建。因此,如果您需要重新使用一旦释放的锁,那么我们将无法销毁它们。在这种情况下,最好使用基于地图的实现本身。根据特定的用例,可以使用两种实现之一。Hazelcast可能会在近期功能中解决该问题。请参阅ticket。如果您还在寻找票,您也可以对其进行优先投票。
 
还有其他类似Redis的框架,它提供了分布式锁的解决方案,我在这里没有对其进行解释。我已经在资源部分列出了它们。请通过他们。最后要记住的一点是,谨慎使用这些锁总是更好的选择。如果存在不需要锁的任何替代解决方案,则最好使用该解决方案。

其他资源

  1. 使用Paxos实施复制的日志
  2. Raft:用于复制日志的共识算法
  3. Zab vs Paxos
  4. Cassandra 2.0中的轻量级交易
  5. ZAB的体系结构-ZooKeeper原子广播协议
  6. 使用Redis的分布式锁
  7. 分布式锁已失效;分布式锁万岁!