IT教程 ·

【Java并发工具类】CountDownLatch和CyclicBarrier

Flink系统之Table API 和 SQL

媒介

下面引见谐和让多线程步调一致的两个东西类:CountDownLatchCyclicBarrier

CountDownLatch和CyclicBarrier的用处引见

CountDownLatch

// API
 void       await(); // 使当前线程在闭锁计数器到零之前一向守候,除非线程被中断。
 boolean    await(long timeout, TimeUnit unit); // 使当前线程在闭锁计数器至零之前一向守候,除非线程被中断或超出了指定的守候时刻。
 void       countDown(); // 递减闭锁计数器,假如计数抵达零,则开释一切守候的线程。
 long       getCount(); // 返回当前计数。
 String     toString(); // 返回标识此闭锁及其状况的字符串。

CountDownLatch是一个同步东西类,在完成一组正在其他线程中实行的操纵之前,它许可一个或多个线程一向守候。可以指定计数初始化CountDownLatch,当挪用countDown()要领后,在当前计数抵达零之前,await()要领会一向受壅塞。计数抵达零以后,一切被壅塞的线程都邑被开释,await()的一切后续挪用都邑马上返回。CountDownLatch的计数只能被运用一次,假如须要反复计数运用,则要斟酌运用CyclicBarrier

CountDownLatch的用处有许多。将计数为1初始化的CountDownLatch可用作一个简朴的开/关或进口:在经由过程挪用countDown()的线程翻开进口前,一切挪用await()的线程都一向在进口出守候。而用N初始化CountDownLatch可以使一个线程在N个线程完成某项操纵之前一向守候,或许使其在某项操纵完成N次之前一向守候。

COuntDownLatch的内存一致性语义:线程中挪用 countDown() 之前的操纵 Happens-Before紧跟在从另一个线程中对应 await() 胜利返回的操纵。

CyclicBarrier

// API
 int        await(); // 线程将一向守候直到一切介入者都在此 barrier 上挪用 await 要领
 int        await(long timeout, TimeUnit unit); // 线程将一向守候直到一切介入者都在此 barrier 上挪用 await 要领, 或许超出了指定的守候时刻。
 int        getNumberWaiting(); // 返回当前在屏蔽处守候的介入者数目。
 int        getParties(); // 返回请求启动此 barrier 的介入者数目。
 boolean    isBroken(); // 查询此屏蔽是不是处于破坏状况。
 void       reset(); // 将屏蔽重置为其初始状况。

CyclicBarrier是一个同步辅佐类,它许可一组线程互相守候,直到抵达某个大众屏蔽点(barrier也可被翻译为栅栏) (common barrier point)。 CyclicBarrier 适用于在触及一组牢固大小的线程的程序中,这些线程必需不时地互相守候的状况。即一切线程都必需抵达屏蔽位置后,下面的程序才继承实行,适于在迭代算法中运用。因为 barrier 在开释守候线程后可以计数器会被重置可继承运用,所以称它为轮回 的 barrier。

CyclicBarrier支撑一个可选的 Runnable敕令(也就是可以传入一个线程实行其他操纵),在一组线程中的末了一个线程抵达以后(但在开释一切线程之前),该敕令将只在每一个 barrier point 运转一次。这对一切介入线程继承运转之前更新它们的同享状况将异常有效。

CyclicBarrier的内存一致性语义:线程中挪用 await() 之前的操纵 Happens-Before 那些是屏蔽操纵的一部分的操纵,后者顺次 Happens-Before 紧跟在从另一个线程中对应 await() 胜利返回的操纵。

Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.

在对账体系中运用CountDownLatch和CyclicBarrier

对账体系流程图以下:

【Java并发工具类】CountDownLatch和CyclicBarrier IT教程 第1张

如今对账体系的处置惩罚流程是:先查询定单,然后查询派送单,以后对照定单和派送单,将差别写入差别库。对账体系的代码笼统后以下:

while(存在未对账定单){
    // 查询未对账定单
    pos = getPOrders();
    // 查询派送单
    dos = getDOrders();
    // 实行对账操纵
    diff = check(pos, dos);
    // 差别写入差别库
    save(diff);
}

应用并行优化对账体系

如今的对账体系,因为定单量和派送单量庞大,所以查询未对账定单getPOrder()和查询派送单getDOrder()都相对照较慢。如今对账体系是单线程实行的,示意图以下(图来自参考[1]):

【Java并发工具类】CountDownLatch和CyclicBarrier IT教程 第2张

关于串行化的体系,优化机能起首想到的就是可否应用多线程并行处置惩罚
假如我们能将getPOrders()和getDOrders()这两个操纵并行处置惩罚,那末将会提拔效力许多。因为这两个操纵并没有先后顺序的依靠,所以,我们可以并行处置惩罚这两个耗时的操纵。
并行后的示意图以下(图来自参考[1]):

【Java并发工具类】CountDownLatch和CyclicBarrier IT教程 第3张

对照单线程的实行示意图,我们发如今一致时刻里,并行实行的吞吐量近乎单线程的2倍,优化效果照样相对显著的。

优化后的代码以下:

while(存在未对账定单){
    // 查询未对账定单
    Thread T1 = new Thread(()->{
        pos = getPOrders();
    });
    T1.start();

    // 查询派送单
    Thread T2 = new Thread(()->{
        dos = getDOrders();
    });
    T2.start();

    // 要守候线程T1和T2实行完才实行check()和save()这两个操纵
    // 经由过程挪用T1.join()和T2.join()来完成守候
    // 当T2和T2线程退出时,挪用T1.jion()和T2.join()的主线程就会从壅塞态被叫醒,从而实行check()和save()
    T1.join();
    T2.join();

    // 实行对账操纵
    diff = check(pos, dos);
    // 差别写入差别库
    save(diff);
}

运用CountDownLatch完成线程守候

上面的处理计划美中不足的处所在于:每一次while轮回都邑建立新的线程,而线程的建立是一个耗时操纵。所以,最好能使建立出来的线程可以轮回运用。一个自然而然的计划就是线程池。

// 建立 2 个线程的线程池
Executor executor =Executors.newFixedThreadPool(2);
while(存在未对账定单){
    // 查询未对账定单
    executor.execute(()-> {
        pos = getPOrders();
    });

    // 查询派送单
    executor.execute(()-> {
        dos = getDOrders();
    });

    /* ??怎样完成守候??*/

    // 实行对账操纵
    diff = check(pos, dos);
    // 差别写入差别库
    save(diff);
}   

因而我们就建立两个牢固大小为2的线程池,以后在while轮回里反复应用。
然则问题也出来了:主线程怎样得知getPOrders()和getDOrders()这两个操纵什么时刻执完?
前面主线程经由过程挪用线程T1和T2的join()要领来守候T1和T2退出,然则在线程池的计划里,线程基础就不会退出,所以,join()要领不可取。

这时刻我们就可以运用CountDownLatch东西类,将其初始计数值设置为2。当实行完pos = getPOrders();后,将计数器减一,实行完dos = getDOrders();后也将计数器减一。当计数器为0时,被壅塞的主线程就可以继承实行了。

// 建立 2 个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);

while(存在未对账定单){
    // 计数器初始化为 2
    CountDownLatch latch = new CountDownLatch(2);
    // 查询未对账定单
    executor.execute(()-> {
        pos = getPOrders();
        latch.countDown();    // 完成对计数器减1
    });

    // 查询派送单
    executor.execute(()-> {
        dos = getDOrders();
        latch.countDown();    // 完成对计数器减1
    });

    // 守候两个查询操纵完毕
    latch.await(); // 在await()返回之前,主线程会一向被壅塞

    // 实行对账操纵
    diff = check(pos, dos);
    // 差别写入差别库
    save(diff);
}

运用CyclicBarrier进一步优化对账体系

除了getPOrders()和getDOrders()这两个操纵可以并行,这两个查询操纵和check()save()这两个对账操纵之间也可以并行。

【Java并发工具类】CountDownLatch和CyclicBarrier IT教程 第4张

两次查询操纵和对账操纵并行,对账操纵还依靠查询操纵的效果,有点像生产者-花费者的意义,两次查询操纵是生产者,对账操纵是花费者。那末,我们就须要一个行列,来保留生产者生产的数据,而花费者则从这个行列花费数据。

不过,针对对账体系,可以设想两个行列,而且这两个行列之间另有对应关联。定单查询操纵将定单查询效果插进去定单行列,派送单查询操纵将派送单插进去派送单行列,这两个行列的元素之间是有一一对应关联。如许的优点在于:对账操纵可以每次从定单行列出一个元素和从派送单行列出一个元素,然后对这两个元素实行对账操纵,如许数据肯定不会乱掉。

【Java并发工具类】CountDownLatch和CyclicBarrier IT教程 第5张

怎样使两个行列完成完整的并行?
两个查询操纵所需时刻并不相同,那末一个简朴的主意就是,一个线程T1实行定单的查询工程,一个线程T2实行派送单的查询事情,仅当线程T1和T2各自都生产完1条数据的时刻,关照线程T3实行对账操纵。

【Java并发工具类】CountDownLatch和CyclicBarrier IT教程 第6张

先查询完的一方须要在设置的屏蔽点守候另一方,直到两边都抵达屏蔽点,才入手下手继承下一步使命。

因而我们可以运用CyclicBarrier来完成这个功用。建立一个计数器初始值为2的CyclicBarrier,同时传入一个回调函数,当计数器减为0的时刻,便挪用这个函数。

Vector<P> pos; // 定单行列
Vector<D> dos; // 派送单行列
// 实行回调的线程池 
// 牢固线程数目为1是因为只要单线程取猎取两个行列中的数据才不会涌现数据婚配不一致问题
Executor executor = Executors.newFixedThreadPool(1); 
// 建立CyclicBarrier的计数器为2,传入一个线程别的实行对账操纵
// 当计数器为0时,会运转传入线程实行对账操纵
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
                                    executor.execute(()->check());
                             });
void check(){
    P p = pos.remove(0); // 从定单行列中猎取定单
    D d = dos.remove(0); // 从派送单行列中猎取派送单
    // 实行对账操纵
    diff = check(p, d);
    // 差别写入差别库
    save(diff);
}

void checkAll(){
    // 轮回查询定单库
    Thread T1 = new Thread(()->{
        while(存在未对账定单){
            pos.add(getPOrders()); // 查询定单库
            barrier.await(); // 将计数器减一并守候直到计数器为0
        }
    });
    T1.start();  
    // 轮回查询运单库
    Thread T2 = new Thread(()->{
        while(存在未对账定单){
            dos.add(getDOrders()); // 查询运单库
            barrier.await(); // 将计数器减一并守候直到计数器为0
        }
    });
    T2.start();
}

线程T1担任查询定单,当查出一条时,挪用barrier.await()来将计数器减1,同时守候计数器变成0;线程T2担任查询派送定单,当查出一条时,也挪用barrier.await()来将计数器减1,同时守候计数器变成0;当T1和T2都挪用barrier.await()时,计数器就会减到0,此时T1和T2就可以实行下一条语句了,同时会挪用barrier的回调函数来实行对账操纵。

CyclicBarrier的计数器有自动重置的功用,当减到0时,会自动重置你设置的初始值。因而,我们便可以反复运用CyclicBarrier。

小结

CountDownLatchCyclicBarrier是Java并发包供应的两个异常易用的线程同步东西类。它们的区分在于:CountDownLatch重要用来处理一个线程守候多个线程的场景(计数器一旦减到0,再有线程挪用await(),该线程会直接经由过程,计数器不会被重置);CyclicBarrier是一组线程之间的互相守候(计数器可以重用,减到0会重置为设置的初始值),还可以传入回调函数,当计数器为0时,实行回调函数。

 

golang中使用Shutdown特性对http服务进行优雅退出使用总结

参与评论