并发场景下数据写入功能的实现

Posted by Gevin on Category: 开发 Tags: 并发

enter image description here

1. 准备工作

1.1 理论基础

在并发场景下,实现数据的正确写入,主要需理解“锁”相关的原理和技术。

并发时写数据,需要考虑要不要上锁,根本原因是,数据存在共享且数据会发生变化,即多线程会同时读写同一数据。 若数据不存在共享,即不同的线程读写不同的数据,不需要上锁; 若数据共享,所有线程对数据只读不写,也不需要上锁 若数据共享,有的线程读数据,有的线程写数据,则需要上锁。

当多个线程同时访问同一数据,并且至少有一个线程会写这个数据,这种情况被称之为“数据竞争”。

并发场景下,锁的作用,是并发改为串行,以保证数据的一致性(更具体而言,通过上锁,解决了并发执行程序时的原子性、可见性和有序性问题,有兴趣的同学可以近一步深入相关理论,本文以实战为主,不在展开)。

故,在并发场景下读写数据,首先要分析是否存在“数据竞争”问题,若存在,需要“上锁”。数据竞争如果发生在本地应用中,则用本地锁;如果发生在分布式服务间中,则使用分布式锁;本地锁和分布式锁在原理上相同,不用分别讨论。

锁相关技术,有“悲观锁”和“乐观锁”两种。

(1)悲观锁

我们通常说的锁,如无特殊说明,就是指“悲观锁”。它是通过一些技术手段,实现线程或服务间的互斥和同步,其使用时,有显示(或隐式)的锁持有/释放操作。 由于上锁本身要损耗性能,上锁后并发处理变成串行,故上锁是比较影响系统性能的操作;且锁的应用不当,会潜在死锁/活锁风险;故悲观锁的使用要慎重。

(2)乐观锁

乐观锁通常又叫“无锁技术”,它不是通过“上锁”把并发改串行的方式保证数据一致性,而是通过CAS(Compare And Swap)方式来实现,由于CAS通常很快,该过程也不用“上锁”,性能损耗少。不过,通过CAS并发写数据时,通常伴有“自旋”,即当出现多个写并发时,只有一个能写入成功,其他要自旋后再次写入,直至写入成功或因超时/超过重试次数失败。 自旋会带来性能开销,频繁自旋的性能开销会超过上锁。故乐观锁通常用在并发不太激烈的场景中,且在该场景下性能比悲观锁要好,而在高并发场景下,建议使用悲观锁。

1.2 业务场景及分析

本文主题是介绍并发写数据的几种方案,为此,我们先确立几个常用的业务场景,并做简单分析。

写数据,我们讨论最常见的把数据写入数据库的场景,主要包括新建数据和修改数据两种具体场景,两个场景不完全一致,分别讨论。

(1) 往数据库写新数据

往数据库里写信数据时,如果数据直接相互独立,即不存在“数据竞争”,则按照1.1节的理论,此时不需要考虑锁的问题。

对于存在“数据竞争”的场景,我们考虑写入流水码的场景:假设创建数据有个编码字段,形如“CON_0001”,其后半段的“0001”是流水码,需要根据当前最大的流水码+1 来计算待创建数据的流水码。这里存在数据范围的竞争,并发创建数据时,如果不做并发控制,会创建多个编码相同的数据。

(2)更新数据库记录

并发更新数据库记录时,如果可以确保各并发请求要更新的数据各不相同,则不存在“数据竞争”,不需要上锁;而并发更新同一数据记录时,如果不做并发控制,可能出现一个写请求覆盖另一个写请求的情况,导致最终结果错误。

这里我们考虑“访问量+1”的场景,即设计一个访问量表,每次请求给访问量+1,如当前访问量为5,若5个并发请求同时为该记录+1,正确的结果为10,但如果不加并发控制,结果通常会<10。

2. 并发插入流水码的实现方案

2.1 业务逻辑分析

业务逻辑如下:

  1. 取出当前数据库中流水码最大的一条记录
  2. 从编码字段中解析出当前最大流水码
  3. 流水码+1,创建新纪录入库

代码实现即:

private Integer addEntity() {
        ConcurrentEntity entity = dataMapper.getLatestConcurrentEntity();
        int nextNumber = entity == null ? 1 : getNextNumberByCode(entity.getCode());
        String code = String.format("CON_%04d", nextNumber);
        return dataMapper.insertConcurrentEntity(new ConcurrentEntity(code));
    }

private int getNextNumberByCode(String code) {
        int index = code.lastIndexOf("_");
        String number = code.substring(index + 1);
        return Integer.parseInt(number) + 1;
    }

该业务在并发场景下,主要存在原子性隐患,即addEntity()中的代码,需要作为一个整体全部执行完,若多个线程交替执行执行逐行代码,某个线程读取到最新流水码后,该码被其他线程改了,本线程不可知,导致写入错误数据。

故本业务场景的并发中,主要避免原子性和可见性问题,最直接的方式,是通过上锁解决。

2.2 实现方案

2.2.1 方案1:在代码中上锁

private final Lock lock = new ReentrantLock(false);


    public Integer addEntityByLock() {
        synchronized (this) {
            return addEntity();
        }
    }

    public Integer addEntityByLock2() {
        lock.lock();
        try {
            return addEntity();
        } finally {
            lock.unlock();
        }
    }

在分布式系统中,lock可以用redisson或curator提供的分布式锁进行实例化。

2.2.2 方案2: 在数据库中上锁

锁除了可以在代码中用,也可以直接用到数据库上,select ... for update语句即可在数据库中上写锁,另由于业务执行的原子性问题,需要把addEntity()中的逻辑放到同一个事务中去。

@Transactional(isolation = Isolation.REPEATABLE_READ)
    public Integer addEntityByTransactionWithLock() {
        return addEntity();
    }

这里的事务隔离级别,选择RC或RR均可。

注,这个实现中,addEntity()中对ConcurrentEntity的查询,改成了加锁读的方式:

@Select("SELECT * FROM concurrent_entity\n" +
            "ORDER BY code DESC\n" +
            "LIMIT 1\n" +
            "FOR UPDATE;")
    ConcurrentEntity getLatestConcurrentEntityWithWriteLock();

2.2.3 性能对比

对于1000个并发请求,三种方法性能对比如下:

executeConcurrentAddByLock1: 1000 并发,花费时间 1179 ms
executeConcurrentAddByLock2: 1000 并发,花费时间 863 ms
executeConcurrentAddByTransactionWithLock: 1000 并发,花费时间 1284 ms

2.3 其他方案

本业务场景中,由于流水码的计算,存在数据竞争问题,所以并发时需要上锁,如果能避免数据竞争,就可以避免并发问题。针对本案例,可以把流水码获取的逻辑放到redis中去,redis本身是单线程的,避免了流水码的数据竞争,进而避免了上锁的开销,而redis本身又是高性能的,故这个方案理论上比上述方案的性能只高不低。

3. 并发更新访问量的实现方案

3.1 业务分析

并发更新数据库中的访问量时,存在的“数据竞争”问题,也是“原子性”隐患。如果更新本身是一个原子操作,则不存在并发问题;如果更新操作分两步,先读取当前数据,然后+1后重新写入,则该操作不是原子的,需要上锁。

3.2 实现方案

3.2.1 方案1: 原子更新

public Integer increaseVisitCountAtomically(int id) {
        return dataMapper.increaseConcurrentVisitAtomic(id);
    }

@Update("UPDATE concurrent_visit\n" +
            "SET visit = visit + 1, update_time = NOW()\n" +
            "WHERE id = #{id};")
    Integer increaseConcurrentVisitAtomic(int id);

3.2.2 方案2: 代码中上锁

public Integer increaseVisitCountByLock(int id) {
        synchronized (this) {
            return increaseVisitCount(id);
        }
    }

 private Integer increaseVisitCount(int id) {
        ConcurrentVisit concurrentVisit = dataMapper.getConcurrentVisitObject(id);
        concurrentVisit.increaseVisit().updateUpdateTime();
        return dataMapper.updateConcurrentVisit(concurrentVisit);
    }

3.2.3 方案3: 数据库中上锁

    @Transactional()
    public Integer increaseVisitCountByTransaction(int id) {
        return increaseVisitCount(id, true);
    }

    private Integer increaseVisitCount(int id, boolean withLock) {
        ConcurrentVisit concurrentVisit = withLock ? dataMapper.getConcurrentVisitObjectWithLock(id)
                : dataMapper.getConcurrentVisitObject(id);
        return dataMapper.increaseConcurrentVisit(concurrentVisit.increaseVisit().updateUpdateTime());
    }

    @Select("SELECT * FROM concurrent_visit\n" +
            "WHERE id = #{id}\n" +
            "FOR UPDATE;")
    ConcurrentVisit getConcurrentVisitObjectWithLock(int id);

3.2.4 方案4: 使用乐观锁

使用乐观锁时,需要一个递增的版本字段(version),每次update 成功时,version都要 +1,version要作为更新前的compare字段,若当前读到的version与数据库中的version不一致,则更新失败。

public Integer increaseVisitCountOptimistically(int id) {
        ConcurrentVisit concurrentVisit = dataMapper.getConcurrentVisitObject(id);
        return dataMapper.increaseConcurrentVisitOptimistically(concurrentVisit.increaseVisit().updateUpdateTime());
    }

    @Update("UPDATE concurrent_visit\n" +
            "SET visit = #{visit}, update_time = #{updateTime}, version = #{version} + 1\n" +
            "WHERE id = #{id} and version = #{version};")
    Integer increaseConcurrentVisitOptimistically(ConcurrentVisit concurrentVisit);

使用乐观锁时,compare不一致,会更新失败,这时需要自旋重试,故上述代码可以优化为:

public Integer increaseVisitCountOptimisticallyWithRetry(int id) {
        int result = 0;
        int maxRetry = 5;
        long interval = 20L;

        for (int i = 0; i < maxRetry; i++) {
            result = increaseVisitCountOptimistically(id);
            if (result > 0) {
                break;
            }
            interval = interval + i * 50;
            helper.sleep(interval);

        }
        return result;
    }

3.2.5 性能比较

发起10000个并发更新操作,结果如下:


executeConcurrentAddAtomically: 10000 并发,花费时间 2112 ms
executeConcurrentAddByLock: 10000 并发,花费时间 5796 ms
executeConcurrentAddByTransaction: 10000 并发,花费时间 3902 ms
executeConcurrentAddOptimisticallyWithRetry: 10000 并发,花费时间 5998 ms

mysql> select * from concurrent_visit;
+----+-------------+-------+---------+---------------------+---------------------+
| id | resourceKey | visit | version | create_time         | update_time         |
+----+-------------+-------+---------+---------------------+---------------------+
|  1 | resource1   | 39925 |    9925 | 2022-03-31 11:42:54 | 2022-04-01 12:06:36 |
+----+-------------+-------+---------+---------------------+---------------------+

可以看到:

  1. 并发比较激烈时,乐观锁性能最差,而且有些请求,即使超过了最大重试次数,也没更新成功
  2. 把锁上在数据库中,性能比所在代码上还要好,原因是,数据库上的锁是经过充分的性能优化的,而且锁的颗粒度更小,而我们这个业务场景下,代码中锁的颗粒度已经很难再缩小了。锁的颗粒度,决定了并发程度,并发场景下,锁的颗粒度越小越好

What's More

本文相关代码,可以在我GitHub的concurrent-write-db 中查看

本文同步发表于我的微信公众号,欢迎关注。


注:转载本文,请与Gevin联系




如果您觉得Gevin的文章有价值,就请Gevin喝杯茶吧!

|

欢迎关注我的微信公众账号



Comments