今天在群里面看到一個很有意思的面試題:“編寫兩個線程,一個線程打印1~25,另一個線程打印字母A~Z,打印順序為12A34B56C……5152Z,要求使用線程間的通信。”這是一道非常好的面試題,非常能彰顯被面者關于多線程的功力,一下子就勾起了我的興趣。這里拋磚引玉,給出7種想到的解法。
1. 第一種解法,包含多種小的不同實現方式,但一個共同點就是靠一個共享變量來做控制;
a. 利用最基本的
synchronized、notify、wait:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960public class MethodOne { PRivate final ThreadToGo threadToGo = new ThreadToGo(); public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { try { for (int i = 0; i < arr.length; i=i+2) { synchronized (threadToGo) { while (threadToGo.value == 2) threadToGo.wait(); Helper.print(arr[i], arr[i + 1]); threadToGo.value = 2; threadToGo.notify(); } } } catch (InterruptedException e) { System.out.println("Oops..."); } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { try { for (int i = 0; i < arr.length; i++) { synchronized (threadToGo) { while (threadToGo.value == 1) threadToGo.wait(); Helper.print(arr[i]); threadToGo.value = 1; threadToGo.notify(); } } } catch (InterruptedException e) { System.out.println("Oops..."); } } }; } class ThreadToGo { int value = 1; } public static void main(String args[]) throws InterruptedException { MethodOne one = new MethodOne(); Helper.instance.run(one.newThreadOne()); Helper.instance.run(one.newThreadTwo()); Helper.instance.shutdown(); }}b. 利用
Lock和Condition:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364public class MethodTwo { private Lock lock = new ReentrantLock(true); private Condition condition = lock.newCondition(); private final ThreadToGo threadToGo = new ThreadToGo(); public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i=i+2) { try { lock.lock(); while(threadToGo.value == 2) condition.await(); Helper.print(arr[i], arr[i + 1]); threadToGo.value = 2; condition.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { try { lock.lock(); while(threadToGo.value == 1) condition.await(); Helper.print(arr[i]); threadToGo.value = 1; condition.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } }; } class ThreadToGo { int value = 1; } public static void main(String args[]) throws InterruptedException { MethodTwo two = new MethodTwo(); Helper.instance.run(two.newThreadOne()); Helper.instance.run(two.newThreadTwo()); Helper.instance.shutdown(); }}c. 利用
volatile:
volatile修飾的變量值直接存在main memory里面,子線程對該變量的讀寫直接寫入main memory,而不是像其它變量一樣在local thread里面產生一份copy。volatile能保證所修飾的變量對于多個線程可見性,即只要被修改,其它線程讀到的一定是最新的值。
1234567891011121314151617181920212223242526272829303132333435363738394041424344public class MethodThree { private volatile ThreadToGo threadToGo = new ThreadToGo(); class ThreadToGo { int value = 1; } public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i=i+2) { while(threadToGo.value==2){} Helper.print(arr[i], arr[i + 1]); threadToGo.value=2; } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { while(threadToGo.value==1){} Helper.print(arr[i]); threadToGo.value=1; } } }; } public static void main(String args[]) throws InterruptedException { MethodThree three = new MethodThree(); Helper.instance.run(three.newThreadOne()); Helper.instance.run(three.newThreadTwo()); Helper.instance.shutdown(); }}d. 利用
AtomicInteger:
12345678910111213141516171819202122232425262728293031323334353637383940public class MethodFive { private AtomicInteger threadToGo = new AtomicInteger(1); public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i=i+2) { while(threadToGo.get()==2){} Helper.print(arr[i], arr[i + 1]); threadToGo.set(2); } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { while(threadToGo.get()==1){} Helper.print(arr[i]); threadToGo.set(1); } } }; } public static void main(String args[]) throws InterruptedException { MethodFive five = new MethodFive(); Helper.instance.run(five.newThreadOne()); Helper.instance.run(five.newThreadTwo()); Helper.instance.shutdown(); }}2. 第二種解法,是利用
CyclicBarrierAPI;
CyclicBarrier可以實現讓一組線程在全部到達Barrier時(執行await()),再一起同時執行,并且所有線程釋放后,還能復用它,即為Cyclic。CyclicBarrier類提供兩個構造器:
12345public CyclicBarrier(int parties, Runnable barrierAction) {}public CyclicBarrier(int parties) {}這里是利用它到達
Barrier后去執行barrierAction。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465public class MethodFour{ private final CyclicBarrier barrier; private final List<String> list; public MethodFour() { list = Collections.synchronizedList(new ArrayList<String>()); barrier = new CyclicBarrier(2,newBarrierAction()); } public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0, j=0; i < arr.length; i=i+2,j++) { try { list.add(arr[i]); list.add(arr[i+1]); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { try { list.add(arr[i]); barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }; } private Runnable newBarrierAction(){ return new Runnable() { @Override public void run() { Collections.sort(list); list.forEach(c->System.out.print(c)); list.clear(); } }; } public static void main(String args[]){ MethodFour four = new MethodFour(); Helper.instance.run(four.newThreadOne()); Helper.instance.run(four.newThreadTwo()); Helper.instance.shutdown(); }}這里多說一點,這個API其實還是利用
lock和condition,無非是多個線程去爭搶CyclicBarrier的instance的lock罷了,最終barrierAction執行時,是在搶到CyclicBarrierinstance的那個線程上執行的。3. 第三種解法,是利用
PipedInputStreamAPI;這里用流在兩個線程間通信,但是java中的Stream是單向的,所以在兩個線程中分別建了一個input和output。這顯然是一種很搓的方式,不過也算是一種通信方式吧……-_-T,執行的時候那種速度簡直。。。請不要BS我。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889public class MethodSix { private final PipedInputStream inputStream1; private final PipedOutputStream outputStream1; private final PipedInputStream inputStream2; private final PipedOutputStream outputStream2; private final byte[] MSG; public MethodSix() { inputStream1 = new PipedInputStream(); outputStream1 = new PipedOutputStream(); inputStream2 = new PipedInputStream(); outputStream2 = new PipedOutputStream(); MSG = "Go".getBytes(); try { inputStream1.connect(outputStream2); inputStream2.connect(outputStream1); } catch (IOException e) { e.printStackTrace(); } } public void shutdown() throws IOException { inputStream1.close(); inputStream2.close(); outputStream1.close(); outputStream2.close(); } public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; private PipedInputStream in = inputStream1; private PipedOutputStream out = outputStream1; public void run() { for (int i = 0; i < arr.length; i=i+2) { Helper.print(arr[i], arr[i + 1]); try { out.write(MSG); byte[] inArr = new byte[2]; in.read(inArr); while(true){ if("Go".equals(new String(inArr))) break; } } catch (IOException e) { e.printStackTrace(); } } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; private PipedInputStream in = inputStream2; private PipedOutputStream out = outputStream2; public void run() { for (int i = 0; i < arr.length; i++) { try { byte[] inArr = new byte[2]; in.read(inArr); while(true){ if("Go".equals(new String(inArr))) break; } Helper.print(arr[i]); out.write(MSG); } catch (IOException e) { e.printStackTrace(); } } } }; } public static void main(String args[]) throws IOException { MethodSix six = new MethodSix(); Helper.instance.run(six.newThreadOne()); Helper.instance.run(six.newThreadTwo()); Helper.instance.shutdown(); six.shutdown(); }4. 第四種解法,是利用
BlockingQueue;順便總結下
BlockingQueue的一些內容。BlockingQueue定義的常用方法如下:add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常。offer(Object):表示如果可能的話,將Object加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false。put(Object):把Object加到BlockingQueue里,如果BlockingQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里有空間再繼續。poll(time):獲取并刪除BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null。當不傳入time值時,立刻返回。peek():立刻獲取BlockingQueue里排在首位的對象,但不從隊列里刪除,如果隊列為空,則返回null。take():獲取并刪除BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入為止。
BlockingQueue有四個具體的實現類:ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO順序排序的。PriorityBlockingQueue:類似于LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。這里我用了兩種玩法:
一種是共享一個queue,根據peek和poll的不同來實現;第二種是兩個queue,利用take()會自動阻塞來實現。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485public class MethodSeven { private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); public Runnable newThreadOne() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i=i+2) { Helper.print(arr[i], arr[i + 1]); queue.offer("TwoToGo"); while(!"OneToGo".equals(queue.peek())){} queue.poll(); } } }; } public Runnable newThreadTwo() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { while(!"TwoToGo".equals(queue.peek())){} queue.poll(); Helper.print(arr[i]); queue.offer("OneToGo"); } } }; } private final LinkedBlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<String> queue2 = new LinkedBlockingQueue<>(); public Runnable newThreadThree() { final String[] inputArr = Helper.buildNoArr(52); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i=i+2) { Helper.print(arr[i], arr[i + 1]); try { queue2.put("TwoToGo"); queue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }; } public Runnable newThreadFour() { final String[] inputArr = Helper.buildCharArr(26); return new Runnable() { private String[] arr = inputArr; public void run() { for (int i = 0; i < arr.length; i++) { try { queue2.take(); Helper.print(arr[i]); queue1.put("OneToGo"); } catch (InterruptedException e) { e.printStackTrace(); } } } }; } public static void main(String args[]) throws InterruptedException { MethodSeven seven = new MethodSeven(); Helper.instance.run(seven.newThreadOne()); Helper.instance.run(seven.newThreadTwo()); Thread.sleep(2000); System.out.println(""); Helper.instance.run(seven.newThreadThree()); Helper.instance.run(seven.newThreadFour()); Helper.instance.shutdown(); }本文所有代碼已上傳至GitHub:https://github.com/EdisonXu/POC/tree/master/concurrent-test
本文由 EdisonXu - 徐焱飛 創作,采用 CC BY 4.0 CN協議 進行許可。 可自由
新聞熱點
疑難解答