IT教程 ·

非壅闭同步算法实战(四)- 计数器准时持久化

如何高效开展测试用例评审?附用例评审检查清单及用例评审报告模板

问题背景及请求

  • 须要对批评举行点赞次数和被批评次数举行统计,或许更多维度
  • 请求高并发、高机能计数,许可极度状况丧失一些统计次数,比方宕机
  • 批评很多,不能为每个批评都一向保留其计数器,计数器须要有接纳机制

问题笼统及剖析

依据以上需求,为了轻易编码与测试,我们把需求转化为以下接口

/**
 * 计数器
 */
public interface Counter {
    /**
     * 掏出统计数据,用Saver去耐久化(仅定时器会挪用,无并发)
     * @param saver
     */
    void save(Saver saver);

    /**
     * 计数(有并发)
     * @param key 营业ID
     * @param like 点赞
     * @param comment 批评
     */
    void add(String key, int like, int comment);

    /**
     * 耐久化器,将数目耐久化到数据库等
     */
    @FunctionalInterface
    interface Saver{
        void save(String key, int like, int comment);
    }
}

简朴剖析可知,计数器比较简朴,用AtomicInteger便能保证原子性,但斟酌到计数器会被接纳,则大概会涌现如许的场景:某计数器已被接纳了,此时继承在该计数器上计数,便会形成数据丧失,因而要处理该并发问题

处理计划

计划一

运用原生锁来处理合作问题

/**
 * 直接对一切操纵上锁,来保证线程平安
 */
public class SynchronizedCounter implements Counter{
    private HashMap<String, Adder> map = new HashMap<>();

    @Override
    public synchronized void save(Saver saver) {
        map.forEach((key, value)->{//由于已加锁,所以能够平安地取数据
            saver.save(key, value.like, value.comment);
        });
        map = new HashMap<>();
    }

    @Override
    public synchronized void add(String key, int like, int comment) {
        //由于已加锁,所以能够平安地更新数据
        Adder adder = map.computeIfAbsent(key, x -> new Adder());
        adder.like += like;
        adder.comment += comment;
    }
    static class Adder{
        private int like;
        private int comment;
    }
}

计划点评:该计划让营业线程和定时保留线程合作一致把实例锁,让他们互斥地接见,处理了合作问题,但锁粒度太粗爆,机能低下

计划二

为了循规蹈矩,我们把“计数器须要有接纳机制”这条请求去掉,如许我们能够很轻易地应用上AtomicInteger这个类

/**
 * 不接纳计数器,问题变得简朴很多
 */
public class IncompleteCounter implements Counter {
    private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
    @Override
    public void save(Saver saver) {
        map.forEach((key, value)->{//应用了AtomicInteger的原子特征,能够线程平安地掏出一切计数,并置0(由于还会继承运用)
            saver.save(key, value.like.getAndSet(0), value.comment.getAndSet(0));
        });
        //由于不接纳,所以不必斟酌Adder被接纳抛弃后,仍被别的线程运用的状况(由于没有锁,所以这类状况是大概发作的)
    }

    @Override
    public void add(String key, int like, int comment) {
        Adder adder = map.computeIfAbsent(key, k -> new Adder());
        adder.like.addAndGet(like);//应用AtomicInteger的原子特征,保证了线程平安
        adder.comment.addAndGet(comment);
    }
    static class Adder{
        AtomicInteger like = new AtomicInteger();
        AtomicInteger comment = new AtomicInteger();
    }
}

计划点评:除了没处理接纳问题,简朴高效

计划三

由于挪用save的线程没有并发状况,壅塞也没紧要,经剖析可奇妙地运用读写锁,同时又不让add要领进入壅塞

/**
 * 奇妙地应用读写锁,及save要领可壅塞的特性,完成add操纵无壅塞
 */
public class ReadWriteLockCounter implements Counter {
    private volatile MapWithLock mapWithLock = new MapWithLock();

    @Override
    public void save(Saver saver) {
        MapWithLock preMapWithLock = mapWithLock;
        mapWithLock = new MapWithLock();
        //不会一向壅塞,由于mapWithLock已被替代,新的add挪用会拿到新的mapWithLock
        preMapWithLock.lock.writeLock().lock();
        preMapWithLock.map.forEach((key,value)->{
            //value已烧毁,故无需value.like.getAndSet(0)
            saver.save(key, value.like.get(), value.comment.get());
        });
        //不能开释该锁,不然add要领中,对被替代掉的MapWithLock.lock实行tryLock会胜利
        //或许,这是你第一次见到的不须要且不许可开释的锁:)
    }

    @Override
    public void add(String key, int like, int comment) {
        MapWithLock mapWithLock;
        //假如经由过程tryLock猎取锁失利,则示意该mapWithLock已被烧毁了(由于只要烧毁了的MapWithLock才会加写锁),故从新猎取最新的mapWithLock
        while(!(mapWithLock = this.mapWithLock).lock.readLock().tryLock());
        try{
            Adder adder = mapWithLock.map.computeIfAbsent(key, k -> new Adder());
            adder.like.getAndAdd(like);
            adder.comment.getAndAdd(comment);
        }finally {
            mapWithLock.lock.readLock().unlock();
        }
    }

    static class Adder{
        private AtomicInteger like = new AtomicInteger();
        private AtomicInteger comment = new AtomicInteger();

    }
    static class MapWithLock{
        private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
        private ReadWriteLock lock = new ReentrantReadWriteLock();
    }
}

计划点评:减少了锁的粒度,同时add线程能够互相兼容,大幅提升了并发才能,save线程虽会壅塞,但连系其定时实行的特性,并不受影响,且纵然极度状况也不会一向壅塞

计划四

运用一个原子的state来替代LockCounter中的ReadWriteLock(由于只运用到了它的部份特征),完成wait-free,取得更高机能

/**
 * ReadWriteLockCounter的改进版,去掉ReadWriteLock,连系当前场景,完成一个wait-free的浅易读写锁<br/>
 */
public class CustomLockCounter implements Counter {
    private volatile MapWithState mapWithState = new MapWithState();

    @Override
    public void save(Saver saver) {
        MapWithState preMapWithState = mapWithState;
        mapWithState = new MapWithState();
        //compareAndSet失利则示意该MapWithState正在被运用,等其运用完,它不会一向失利,由于mapWithState已被替代
        while(!preMapWithState.state.compareAndSet(0,Integer.MIN_VALUE)){
            Thread.yield();
        }
        preMapWithState.map.forEach((key, value)->{
            //value已烧毁,故无需value.like.getAndSet(0)
            saver.save(key, value.like.get(), value.comment.get());
        });
    }

    @Override
    public void add(String key, int like, int comment) {
        MapWithState mapWithState;//add的并发,不大概将Integer.MIN_VALUE自增成正数(设置为Integer.MIN_VALUE时,该MapWithState已被烧毁了)
        while((mapWithState = this.mapWithState).state.getAndIncrement()<0);
        try{
            Adder adder = mapWithState.map.computeIfAbsent(key, k -> new Adder());
            adder.like.getAndAdd(like);
            adder.comment.getAndAdd(comment);
        }finally {
            mapWithState.state.getAndDecrement();
        }
    }

    static class Adder{
        private AtomicInteger like = new AtomicInteger();
        private AtomicInteger comment = new AtomicInteger();

    }
    static class MapWithState {
        private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
        private AtomicInteger state = new AtomicInteger();
    }
}

计划点评:保留了前一计划ReadWriteLockCounter的长处,同时连系场景的特性做了些优化,实质就是将CAS失利重试轮回替代成了一条fetch-and-add指令,假如不是由于save是低频实行,本计划多是最高效的了(临时疏忽ConcurrentHashMap等别的大概的优化空间)

计划五

先假定不会发作合作,然后检测合作状况,假如发作合作,则赔偿

/**
 * 乐观地假定不会发作合作,假如发作了,则尝试举行赔偿
 */
public class CompensationCounter implements Counter {
    private ConcurrentHashMap<String, Adder> map = new ConcurrentHashMap<>();
    @Override
    public void save(Saver saver) {
        for(Iterator<Map.Entry<String, Adder>> it = map.entrySet().iterator(); it.hasNext();){
            Map.Entry<String, Adder> entry = it.next();
            it.remove();
            entry.getValue().discarded = true;
            saver.save(entry.getKey(), entry.getValue().like.getAndSet(0), entry.getValue().comment.getAndSet(0));//需将计数器置0,此处存在合作
        }
    }

    @Override
    public void add(String key, int like, int comment) {
        Adder adder = map.computeIfAbsent(key, k -> new Adder());
        adder.like.addAndGet(like);
        adder.comment.addAndGet(comment);
        if(adder.discarded){//假如数目加在了烧毁的Adder上面,则实行赔偿逻辑
            int likeTemp = adder.like.getAndSet(0);
            int commentTemp = adder.comment.getAndSet(0);
            //纵然今后又有线程在计数器上计数了也不妨
            if(likeTemp != 0 || commentTemp != 0){
                add(key, likeTemp, commentTemp);//赔偿
            }//也大概已被别的线程取走了,但并不影响营业正确性
        }
    }
    static class Adder{
        AtomicInteger like = new AtomicInteger();
        AtomicInteger comment = new AtomicInteger();
        volatile boolean discarded = false;//只要保留线程会将它改成true,故运用volatile便能保证线程平安
    }
}

计划点评:跟乐观锁的思绪相似,在合作猛烈的状况下,平常不会有最优机能,但此处由于save要领是低频实行的且本身无并发,add要领才有高并发,故失利赔偿实在很少真正被实行,这也是为何测试效果中本计划机能最优的缘由

机能测试

终究我们来测试一下各计划的机能,由于我们笼统出了一个一致的接口,故测试也较为轻易

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class CounterTester {
    private static final int THREAD_SIZE = 6;//add要领的并发线程数
    private static final int ADD_SIZE = 5000000;//测试范围
    private static final int KEYS_SIZE = 128*1024;
    public static void main(String[] args) throws InterruptedException {
        Counter[] counters = new Counter[]{new SynchronizedCounter(), new IncompleteCounter(), new ReadWriteLockCounter(), new CustomLockCounter(), new CompensationCounter()};
        String[] keys = new String[KEYS_SIZE];
        Random random = new Random();
        for (int i = 0; i < keys.length; i++) {
            keys[i]=String.valueOf(random.nextInt(KEYS_SIZE*1024));
        }
        for (Counter counter : counters) {
            AtomicInteger totalLike = new AtomicInteger();
            AtomicInteger totalComment = new AtomicInteger();
            AtomicInteger savedTotalLike = new AtomicInteger();
            AtomicInteger savedTotalComment = new AtomicInteger();
            Counter.Saver saver = (key, like, comment) -> {
                savedTotalLike.addAndGet(like);//模仿被耐久化到数据库,纪录数目以便后续校验正确性
                savedTotalComment.addAndGet(comment);//同上
            };
            CountDownLatch latch = new CountDownLatch(THREAD_SIZE);
            long start = System.currentTimeMillis();
            for (int i = 0; i < THREAD_SIZE; i++) {
                new Thread(()->{
                    Random r = new Random();
                    int like, comment;
                    for (int j = 0; j < ADD_SIZE; j++) {
                        like = 2;
                        comment = 4;
                        counter.add(keys[r.nextInt(KEYS_SIZE)], like, comment);
                        totalLike.addAndGet(like);
                        totalComment.addAndGet(comment);
                    }
                    latch.countDown();
                }).start();
            }
            Thread saveThread = new Thread(()->{
                while(latch.getCount() != 0){
                    try {
                        Thread.sleep(100);//模仿100毫秒实行一次耐久化
                    } catch (InterruptedException e) {}
                    counter.save(saver);
                }
                counter.save(saver);

            });
            saveThread.start();
            latch.await();
            System.out.println(counter.getClass().getSimpleName() +" cost:t"+(System.currentTimeMillis() - start));
            saveThread.join();
            boolean error = savedTotalLike.get() != totalLike.get() || savedTotalComment.get() != totalComment.get();
            (error?System.err:System.out).println("saved:tlike="+savedTotalLike.get()+"tcomment="+savedTotalComment.get());
            (error?System.err:System.out).println("added:tlike="+totalLike.get()+"tcomment="+totalComment.get()+"n");
        }
    }
}

 

在jdk11(jdk8也基础一致)下的测试效果以下:

注:计划二的IncompleteCounter并未完成接纳,仅作对照

SynchronizedCounter cost:    12377
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

IncompleteCounter cost:    2560
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

ReadWriteLockCounter cost:    7902
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

CustomLockCounter cost:    3541
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

CompensationCounter cost:    2093
saved:    like=60000000    comment=120000000
added:    like=60000000    comment=120000000

 

小结

非壅塞同步算法平常不须要我们去设想,直接运用现有的东西便可,但假如真想经由过程它进一步去压榨机能,应仔细剖析各线程交叉实行的状况,同时连系营业场景来斟酌(或许在A场景不许可的状况,在B场景是许可的)

使用Python打造一款间谍程序

参与评论