国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Java > 正文

Java多線程之異步Future機制的原理和實現

2019-11-26 13:55:57
字體:
來源:轉載
供稿:網友

項目中經常有些任務需要異步(提交到線程池中)去執行,而主線程往往需要知道異步執行產生的結果,這時我們要怎么做呢?用runnable是無法實現的,我們需要用callable看下面的代碼:

 import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class AddTask implements Callable<Integer> { private int a,b;  public AddTask(int a, int b) { this.a = a; this.b = b; }  @Override public Integer call throws Exception { Integer result = a + b; return result; }  public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor; //JDK目前為止返回的都是FutureTask的實例 Future<Integer> future = executor.submit(new AddTask(1, 2)); Integer result = future.get;// 只有當future的狀態是已完成時(future.isDone = true),get方法才會返回 }} 

雖然可以實現獲取異步執行結果的需求,但是我們發現這個Future其實很不好用,因為它沒有提供通知的機制,也就是說我們不知道future什么時候完成(如果我們需要輪詢isDone()來判斷的話感覺就沒有用這個的必要了)??聪耲ava.util.concurrent.future.Future 的接口方法:

 public interface Future<V> {  boolean cancel(boolean mayInterruptIfRunning);  boolean isCancelled;  boolean isDone;  V get throws InterruptedException, ExecutionException;  V get(long timeout, TimeUnit unit)    throws InterruptedException, ExecutionException, TimeoutException;} 

由此可見JDK的Future機制其實并不好用,如果能給這個future加個監聽器,讓它在完成時通知監聽器的話就比較好用了,就像下面這個IFuture:

 package future;import java.util.concurrent.CancellationException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;/** * The result of an asynchronous operation. *  * @author lixiaohui * @param <V> 執行結果的類型參數 */public interface IFuture<V> extends Future<V> {  boolean isSuccess; // 是否成功  V getNow; //立即返回結果(不管Future是否處于完成狀態) Throwable cause; //若執行失敗時的原因    boolean isCancellable; //是否可以取消 IFuture<V> await throws InterruptedException; //等待future的完成 boolean await(long timeoutMillis) throws InterruptedException; // 超時等待future的完成 boolean await(long timeout, TimeUnit timeunit) throws InterruptedException;    IFuture<V> awaitUninterruptibly; //等待future的完成,不響應中斷    boolean awaitUninterruptibly(long timeoutMillis);//超時等待future的完成,不響應中斷 boolean awaitUninterruptibly(long timeout, TimeUnit timeunit); IFuture<V> addListener(IFutureListener<V> l); //當future完成時,會通知這些加進來的監聽器 IFuture<V> removeListener(IFutureListener<V> l); } 

接下來就一起來實現這個IFuture,在這之前要說明下Object.wait,Object.notifyAll方法,因為整個Future實現的原���的核心就是這兩個方法.看看JDK里面的解釋:

 public class Object {  /**   * Causes the current thread to wait until another thread invokes the   * {@link java.lang.Object#notify} method or the   * {@link java.lang.Object#notifyAll} method for this object.   * In other words, this method behaves exactly as if it simply   * performs the call {@code wait(0)}.   * 調用該方法后,當前線程會釋放對象監視器鎖,并讓出CPU使用權。直到別的線程調用notify/notifyAll   */  public final void wait throws InterruptedException {    wait(0);  }  /**   * Wakes up all threads that are waiting on this object's monitor. A   * thread waits on an object's monitor by calling one of the   * {@code wait} methods.   * <p>   * The awakened threads will not be able to proceed until the current   * thread relinquishes the lock on this object. The awakened threads   * will compete in the usual manner with any other threads that might   * be actively competing to synchronize on this object; for example,   * the awakened threads enjoy no reliable privilege or disadvantage in   * being the next thread to lock this object.   */  public final native void notifyAll;} 

知道這個后,我們要自己實現Future也就有了思路,當線程調用了IFuture.await等一系列的方法時,如果Future還未完成,那么就調用future.wait 方法使線程進入WAITING狀態。而當別的線程設置Future為完成狀態(注意這里的完成狀態包括正常結束和異常結束)時,就需要調用future.notifyAll方法來喚醒之前因為調用過wait方法而處于WAITING狀態的那些線程。完整的實現如下(代碼應該沒有很難理解的地方,我是參考netty的Future機制的。有興趣的可以去看看netty的源碼):

 package future;import java.util.Collection;import java.util.concurrent.CancellationException;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;/** * <pre> * 正常結束時, 若執行的結果不為null, 則result為執行結果; 若執行結果為null, 則result = {@link AbstractFuture#SUCCESS_SIGNAL} * 異常結束時, result為 {@link CauseHolder} 的實例;若是被取消而導致的異常結束, 則result為 {@link CancellationException} 的實例, 否則為其它異常的實例 * 以下情況會使異步操作由未完成狀態轉至已完成狀態, 也就是在以下情況發生時調用notifyAll方法: * <ul> * <li>異步操作被取消時(cancel方法)</li> * <li>異步操作正常結束時(setSuccess方法)</li> * <li>異步操作異常結束時(setFailure方法)</li> * </ul> * </pre> *  * @author lixiaohui * * @param <V> * 異步執行結果的類型 */public class AbstractFuture<V> implements IFuture<V> { protected volatile Object result; // 需要保證其可見性    /**     * 監聽器集     */ protected Collection<IFutureListener<V>> listeners = new CopyOnWriteArrayList<IFutureListener<V>>; /** * 當任務正常執行結果為null時, 即客戶端調用{@link AbstractFuture#setSuccess(null)}時,  * result引用該對象 */ private static final SuccessSignal SUCCESS_SIGNAL = new SuccessSignal; @Override public boolean cancel(boolean mayInterruptIfRunning) { if (isDone) { // 已完成了不能取消  return false; } synchronized (this) {  if (isDone) { // double check  return false;  }  result = new CauseHolder(new CancellationException);  notifyAll; // isDone = true, 通知等待在該對象的wait的線程 } notifyListeners; // 通知監聽器該異步操作已完成 return true; }  @Override public boolean isCancellable { return result == null; }  @Override public boolean isCancelled { return result != null && result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } @Override public boolean isDone { return result != null; } @Override public V get throws InterruptedException, ExecutionException { await; // 等待執行結果 Throwable cause = cause; if (cause == null) { // 沒有發生異常,異步操作正常結束  return getNow; } if (cause instanceof CancellationException) { // 異步操作被取消了  throw (CancellationException) cause; } throw new ExecutionException(cause); // 其他異常 } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) {// 超時等待執行結果  Throwable cause = cause;  if (cause == null) {// 沒有發生異常,異步操作正常結束  return getNow;  }  if (cause instanceof CancellationException) {// 異步操作被取消了  throw (CancellationException) cause;  }  throw new ExecutionException(cause);// 其他異常 } // 時間到了異步操作還沒有結束, 拋出超時異常 throw new TimeoutException; } @Override public boolean isSuccess { return result == null ? false : !(result instanceof CauseHolder); } @SuppressWarnings("unchecked") @Override public V getNow { return (V) (result == SUCCESS_SIGNAL ? null : result); } @Override public Throwable cause { if (result != null && result instanceof CauseHolder) {  return ((CauseHolder) result).cause; } return null; } @Override public IFuture<V> addListener(IFutureListener<V> listener) { if (listener == null) {  throw new NullPointerException("listener"); } if (isDone) { // 若已完成直接通知該監聽器  notifyListener(listener);  return this; } synchronized (this) {  if (!isDone) {  listeners.add(listener);  return this;  } } notifyListener(listener); return this; } @Override public IFuture<V> removeListener(IFutureListener<V> listener) { if (listener == null) {  throw new NullPointerException("listener"); } if (!isDone) {  listeners.remove(listener); } return this; } @Override public IFuture<V> await throws InterruptedException { return await0(true); }  private IFuture<V> await0(boolean interruptable) throws InterruptedException { if (!isDone) { // 若已完成就直接返回了  // 若允許終端且被中斷了則拋出中斷異常  if (interruptable && Thread.interrupted) {  throw new InterruptedException("thread " + Thread.currentThread.getName + " has been interrupted.");  }  boolean interrupted = false;  synchronized (this) {  while (!isDone) {   try {   wait; // 釋放鎖進入waiting狀態,等待其它線程調用本對象的notify/notifyAll方法   } catch (InterruptedException e) {   if (interruptable) {    throw e;   } else {    interrupted = true;   }   }  }  }  if (interrupted) {  // 為什么這里要設中斷標志位?因為從wait方法返回后, 中斷標志是被clear了的,   // 這里重新設置以便讓其它代碼知道這里被中斷了。  Thread.currentThread.interrupt;  } } return this; }  @Override public boolean await(long timeoutMillis) throws InterruptedException { return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true); }  @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true); } private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone) {  return true; } if (timeoutNanos <= 0) {  return isDone; } if (interruptable && Thread.interrupted) {  throw new InterruptedException(toString); } long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime; long waitTime = timeoutNanos; boolean interrupted = false; try {  synchronized (this) {  if (isDone) {   return true;  }  if (waitTime <= 0) {   return isDone;  }  for (;;) {   try {   wait(waitTime / 1000000, (int) (waitTime % 1000000));   } catch (InterruptedException e) {   if (interruptable) {    throw e;   } else {    interrupted = true;   }   }   if (isDone) {   return true;   } else {   waitTime = timeoutNanos - (System.nanoTime - startTime);   if (waitTime <= 0) {    return isDone;   }   }  }  } } finally {  if (interrupted) {  Thread.currentThread.interrupt;  } } } @Override public IFuture<V> awaitUninterruptibly { try {  return await0(false); } catch (InterruptedException e) { // 這里若拋異常了就無法處理了  throw new java.lang.InternalError; } }  @Override public boolean awaitUninterruptibly(long timeoutMillis) { try {  return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false); } catch (InterruptedException e) {  throw new java.lang.InternalError; } } @Override public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { try {  return await0(unit.toNanos(timeout), false); } catch (InterruptedException e) {  throw new java.lang.InternalError; } } protected IFuture<V> setFailure(Throwable cause) { if (setFailure0(cause)) {  notifyListeners;  return this; } throw new IllegalStateException("complete already: " + this); } private boolean setFailure0(Throwable cause) { if (isDone) {  return false; } synchronized (this) {  if (isDone) {  return false;  }  result = new CauseHolder(cause);  notifyAll; } return true; } protected IFuture<V> setSuccess(Object result) { if (setSuccess0(result)) { // 設置成功后通知監聽器  notifyListeners;  return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(Object result) { if (isDone) {  return false; } synchronized (this) {  if (isDone) {  return false;  }  if (result == null) { // 異步操作正常執行完畢的結果是null  this.result = SUCCESS_SIGNAL;  } else {  this.result = result;  }  notifyAll; } return true; } private void notifyListeners { for (IFutureListener<V> l : listeners) {  notifyListener(l); } } private void notifyListener(IFutureListener<V> l) { try {  l.operationCompleted(this); } catch (Exception e) {  e.printStackTrace; } } private static class SuccessSignal { } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) {  this.cause = cause; } }} 

那么要怎么使用這個呢,有了上面的骨架實現,我們就可以定制各種各樣的異步結果了。下面模擬一下一個延時的任務:

 package future.test;import future.IFuture;import future.IFutureListener;/** * 延時加法 * @author lixiaohui * */public class DelayAdder {  public static void main(String[] args) { new DelayAdder.add(3 * 1000, 1, 2).addListener(new IFutureListener<Integer> {    @Override  public void operationCompleted(IFuture<Integer> future) throws Exception {  System.out.println(future.getNow);  }   }); } /** * 延遲加 * @param delay 延時時長 milliseconds * @param a 加數 * @param b 加數 * @return 異步結果 */ public DelayAdditionFuture add(long delay, int a, int b) { DelayAdditionFuture future = new DelayAdditionFuture;  new Thread(new DelayAdditionTask(delay, a, b, future)).start; return future; }  private class DelayAdditionTask implements Runnable { private long delay;  private int a, b;  private DelayAdditionFuture future;  public DelayAdditionTask(long delay, int a, int b, DelayAdditionFuture future) {  super;  this.delay = delay;  this.a = a;  this.b = b;  this.future = future; } @Override public void run {  try {  Thread.sleep(delay);  Integer i = a + b;  // TODO 這里設置future為完成狀態(正常執行完畢)  future.setSuccess(i);  } catch (InterruptedException e) {  // TODO 這里設置future為完成狀態(異常執行完畢)  future.setFailure(e.getCause);  } }  }} package future.test;import future.AbstractFuture;import future.IFuture;//只是把兩個方法對外暴露public class DelayAdditionFuture extends AbstractFuture<Integer> {  @Override public IFuture<Integer> setSuccess(Object result) { return super.setSuccess(result); }  @Override public IFuture<Integer> setFailure(Throwable cause) { return super.setFailure(cause); } } 

可以看到客戶端不用主動去詢問future是否完成,而是future完成時自動回調operationcompleted方法,客戶端只需在回調里實現邏輯即可。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 珠海市| 房山区| 同江市| 宁波市| 平果县| 沭阳县| 奉新县| 甘孜县| 什邡市| 高邑县| 延边| 宣威市| 宜兴市| 邓州市| 开封市| 潮州市| 河源市| 兴山县| 临颍县| 尤溪县| 普陀区| 阿瓦提县| 宁强县| 马鞍山市| 乾安县| 玛沁县| 乐安县| 濮阳县| 浪卡子县| 霍林郭勒市| 收藏| 依安县| 中牟县| 繁昌县| 中山市| 巴楚县| 滁州市| 百色市| 谢通门县| 专栏| 分宜县|