CyclicBarrier和CountDownLatch一樣,都是關(guān)于線程的計(jì)數(shù)器。其實(shí)原理都是一樣的只是,CyclicBarrier與CountDownLatch 最大區(qū)別在 CyclicBarrier 在運(yùn)行錯(cuò)誤可以重新set數(shù)值,重新跑線程,而CountDownLatch 只能減一 不能重新設(shè)置。
CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。示例代碼如代碼清單8-3所示。
public class CyclicBarrierTest {staticCyclicBarrier c = new CyclicBarrier(2);public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {try {c.await();} catch (Exception e) {}System.out.PRintln(1);}}).start();try {c.await();} catch (Exception e) {}System.out.println(2);}}
因?yàn)橹骶€程和子線程的調(diào)度是由CPU決定的,兩個(gè)線程都有可能先執(zhí)行,所以會(huì)產(chǎn)生兩種輸出1,2 或者2,1
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會(huì)永遠(yuǎn)等待,因?yàn)闆](méi)有第三個(gè)線程執(zhí)行await方法,即沒(méi)有第三個(gè)線程到達(dá)屏障,所以之前到達(dá)屏障的兩個(gè)線程都不會(huì)繼續(xù)執(zhí)行。
CyclicBarrier還提供一個(gè)更高級(jí)的構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrier-Action),用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景,如代碼清單8-4所示。
public class CyclicBarrierTest2 {static CyclicBarrier c = new CyclicBarrier(2, new A());public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {try {c.await();} catch (Exception e) {}System.out.println(1);}}).start();try {c.await();} catch (Exception e) {}System.out.println(2);}static class A implements Runnable {@Overridepublic void run() {System.out.println(3);}}}
CyclicBarrier的應(yīng)用場(chǎng)景
CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場(chǎng)景。例如,用一個(gè)Excel保存了用戶所有銀行流水,每個(gè)Sheet保存一個(gè)賬戶近一年的每筆銀行流水,現(xiàn)在需要統(tǒng)計(jì)用戶的日均銀行流水,先用多線程處理每個(gè)sheet里的銀行流水,都執(zhí)行完之后,得到每個(gè)sheet的日均銀行流水,最后,再用barrierAction用這些線程的計(jì)算結(jié)果,計(jì)算出整個(gè)Excel的日均銀行流水,如代碼清單8-5所示。
public class BankWaterService implements Runnable {/*** 創(chuàng)建4個(gè)屏障,處理完之后執(zhí)行當(dāng)前類的run方法*/private CyclicBarrier c = new CyclicBarrier(4, this);/*** 假設(shè)只有4個(gè)sheet,所以只啟動(dòng)4個(gè)線程*/private Executor executor = Executors.newFixedThreadPool(4);/*** 保存每個(gè)sheet計(jì)算出的銀流結(jié)果*/private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();private void count() {for (int i = 0; i < 4; i++) {executor.execute(new Runnable() {@Overridepublic void run() {// 計(jì)算當(dāng)前sheet的銀流數(shù)據(jù),計(jì)算代碼省略sheetBankWaterCount.put(Thread.currentThread().getName(), 1);// 銀流計(jì)算完成,插入一個(gè)屏障try {c.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});}}@Overridepublic void run() {int result = 0;// 匯總每個(gè)sheet計(jì)算出的結(jié)果for (Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {result += sheet.getValue();}// 將結(jié)果輸出sheetBankWaterCount.put("result", result);System.out.println(result);}public static void main(String[] args) {BankWaterService bankWaterCount = new BankWaterService();bankWaterCount.count();}}
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注