Java中semaphore信号量使用场景

在Java的并发API中,信号量是另一种同步工具,它同时控制访问特定资源或代码段的线程数量。

它管理一组许可证;

  • 线程在继续之前必须获得许可。
  • 如果许可可用,则线程获取它并继续执行。
  • 如果没有,则线程将被阻塞,直到许可可用或被中断。

Java 中的 Semaphore 类提供了两个主要方法:acquire() 和release()。

们假设一个数据库只能同时承受十个连接。随着 JDK 21 中虚拟线程的出现,创建许多线程变得相对容易且便宜。对于这种特殊情况,这成为一把双刃剑,因为许多线程可以尝试同时连接该数据库,这可能会压垮数据库。

这是我们的 MockDatabase 模拟类:

public class MockDatabase {
    public MockConnection getConnection() {
        try {
            TimeUnit.MILLISECONDS.sleep(10); // assume it takes 10 ms
            return new MockConnection();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

为了避免数据库不堪重负,我们可以使用信号量来限制并发访问。下面是带有信号量的修改后的 MockDatabase 类:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class MockDatabase {
    private final Semaphore semaphore = new Semaphore(10);

    public MockConnection getConnection() {
        try {
            semaphore.acquire();
            TimeUnit.MILLISECONDS.sleep(10);
            return new MockConnection();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            semaphore.release();
        }
    }
}

为了确保信号量有效地将数据库连接限制为十个,我们可以创建一个 JUnit 测试。该测试将利用 AtomicInteger 来计算对 getConnection() 方法的并发调用的最大数量,并验证它不会超过 10。

import org.junit.jupiter.api.Test;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;

class MockDatabaseTest {
    @Test
    public void testGetConnection_shouldLimitConcurrentInvocationsToTen() {
        var concurrentCalls = new AtomicInteger(0);
        var maxConcurrentCalls = new AtomicInteger(0);

        var instance = new MockDatabase() {
            @Override
            public MockConnection getConnection() {
                concurrentCalls.incrementAndGet();
                maxConcurrentCalls.set(Math.max(maxConcurrentCalls.get(), concurrentCalls.get()));
                concurrentCalls.decrementAndGet();
                return super.getConnection();
            }
        };

        try (var executorService = Executors.newVirtualThreadPerTaskExecutor()) {
            IntStream.range(0, 1000)
                    .forEach(index -> instance.getConnection());
        }

        assertThat(maxConcurrentCalls.get()).isLessThanOrEqualTo(10);
    }
}

总之,Java 的 Semaphore 类提供了一种健壮而直接的机制,用于控制对并发环境中有限资源的访问。