国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Java > 正文

Java 高并發八:NIO和AIO詳解

2019-11-26 13:50:38
字體:
來源:轉載
供稿:網友

IO感覺上和多線程并沒有多大關系,但是NIO改變了線程在應用層面使用的方式,也解決了一些實際的困難。而AIO是異步IO和前面的系列也有點關系。在此,為了學習和記錄,也寫一篇文章來介紹NIO和AIO。

1. 什么是NIO

NIO是New I/O的簡稱,與舊式的基于流的I/O方法相對,從名字看,它表示新的一套Java I/O標 準。它是在Java 1.4中被納入到JDK中的,并具有以下特性:

  1. NIO是基于塊(Block)的,它以塊為基本單位處理數據 (硬盤上存儲的單位也是按Block來存儲,這樣性能上比基于流的方式要好一些)
  2. 為所有的原始類型提供(Buffer)緩存支持
  3. 增加通道(Channel)對象,作為新的原始 I/O 抽象
  4. 支持鎖(我們在平時使用時經常能看到會出現一些.lock的文件,這說明有線程正在使用這把鎖,當線程釋放鎖時,會把這個文件刪除掉,這樣其他線程才能繼續拿到這把鎖)和內存映射文件的文件訪問接口
  5. 提供了基于Selector的異步網絡I/O


所有的從通道中的讀寫操作,都要經過Buffer,而通道就是io的抽象,通道的另一端就是操縱的文件。

2. Buffer

Java中Buffer的實現。基本的數據類型都有它對應的Buffer

Buffer的簡單使用例子:

package test; import java.io.File;import java.io.FileInputStream;import java.nio.ByteBuffer;import java.nio.channels.FileChannel; public class Test { public static void main(String[] args) throws Exception {  FileInputStream fin = new FileInputStream(new File(    "d://temp_buffer.tmp"));  FileChannel fc = fin.getChannel();  ByteBuffer byteBuffer = ByteBuffer.allocate(1024);  fc.read(byteBuffer);  fc.close();  byteBuffer.flip();//讀寫轉換 }}

總結下使用的步驟是:

1. 得到Channel

2. 申請Buffer

3. 建立Channel和Buffer的讀/寫關系

4. 關閉

下面的例子是使用NIO來復制文件:

public static void nioCopyFile(String resource, String destination)   throws IOException {  FileInputStream fis = new FileInputStream(resource);  FileOutputStream fos = new FileOutputStream(destination);  FileChannel readChannel = fis.getChannel(); // 讀文件通道  FileChannel writeChannel = fos.getChannel(); // 寫文件通道  ByteBuffer buffer = ByteBuffer.allocate(1024); // 讀入數據緩存  while (true) {   buffer.clear();   int len = readChannel.read(buffer); // 讀入數據   if (len == -1) {    break; // 讀取完畢   }   buffer.flip();   writeChannel.write(buffer); // 寫入文件  }  readChannel.close();  writeChannel.close(); }

Buffer中有3個重要的參數:位置(position)、容量(capactiy)和上限(limit)

這里要區別下容量和上限,比如一個Buffer有10KB,那么10KB就是容量,我將5KB的文件讀到Buffer中,那么上限就是5KB。

下面舉個例子來理解下這3個重要的參數:

public static void main(String[] args) throws Exception {  ByteBuffer b = ByteBuffer.allocate(15); // 15個字節大小的緩沖區  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()    + " position=" + b.position());  for (int i = 0; i < 10; i++) {   // 存入10個字節數據   b.put((byte) i);  }  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()    + " position=" + b.position());  b.flip(); // 重置position  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()    + " position=" + b.position());  for (int i = 0; i < 5; i++) {   System.out.print(b.get());  }  System.out.println();  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()    + " position=" + b.position());  b.flip();  System.out.println("limit=" + b.limit() + " capacity=" + b.capacity()    + " position=" + b.position());  }

整個過程如圖:

此時position從0到10,capactiy和limit不變。

該操作會重置position,通常,將buffer從寫模式轉換為讀 模式時需要執行此方法 flip()操作不僅重置了當前的position為0,還將limit設置到當前position的位置 。

limit的意義在于,來確定哪些數據是有意義的,換句話說,從position到limit之間的數據才是有意義的數據,因為是上次操作的數據。所以flip操作往往是讀寫轉換的意思。

意義同上。

而Buffer中大多數的方法都是去改變這3個參數來達到某些功能的:

public final Buffer rewind()

將position置零,并清除標志位(mark)

public final Buffer clear()

將position置零,同時將limit設置為capacity的大小,并清除了標志mark

public final Buffer flip()

先將limit設置到position所在位置,然后將position置零,并清除標志位mark,通常在讀寫轉換時使用

文件映射到內存

public static void main(String[] args) throws Exception {  RandomAccessFile raf = new RandomAccessFile("C://mapfile.txt", "rw");  FileChannel fc = raf.getChannel();  // 將文件映射到內存中  MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0,    raf.length());  while (mbb.hasRemaining()) {   System.out.print((char) mbb.get());  }  mbb.put(0, (byte) 98); // 修改文件  raf.close(); }

對MappedByteBuffer的修改就相當于修改文件本身,這樣操作的速度是很快的。

3. Channel

多線程網絡服務器的一般結構:

簡單的多線程服務器:

public static void main(String[] args) throws Exception {  ServerSocket echoServer = null;  Socket clientSocket = null;  try {   echoServer = new ServerSocket(8000);  } catch (IOException e) {   System.out.println(e);  }  while (true) {   try {    clientSocket = echoServer.accept();    System.out.println(clientSocket.getRemoteSocketAddress()      + " connect!");    tp.execute(new HandleMsg(clientSocket));   } catch (IOException e) {    System.out.println(e);   }  } }

功能就是服務器端讀到什么數據,就向客戶端回寫什么數據。

這里的tp是一個線程池,HandleMsg是處理消息的類。

static class HandleMsg implements Runnable{    省略部分信息        public void run(){       try {        is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));      os = new PrintWriter(clientSocket.getOutputStream(), true);      // 從InputStream當中讀取客戶端所發送的數據         String inputLine = null;          long b=System. currentTimeMillis ();          while ((inputLine = is.readLine()) != null)     {         os.println(inputLine);          }          long e=System. currentTimeMillis ();          System. out.println ("spend:"+(e - b)+" ms ");       } catch (IOException e) {         e.printStackTrace();       }finally   {     關閉資源    }    }   }

客戶端:

public static void main(String[] args) throws Exception {  Socket client = null;  PrintWriter writer = null;  BufferedReader reader = null;  try {   client = new Socket();   client.connect(new InetSocketAddress("localhost", 8000));   writer = new PrintWriter(client.getOutputStream(), true);   writer.println("Hello!");   writer.flush();   reader = new BufferedReader(new InputStreamReader(     client.getInputStream()));   System.out.println("from server: " + reader.readLine());  } catch (Exception e) {  } finally {   // 省略資源關閉  } }

以上的網絡編程是很基本的,使用這種方式,會有一些問題:

為每一個客戶端使用一個線程,如果客戶端出現延時等異常,線程可能會被占用很長時間。因為數據的準備和讀取都在這個線程中。此時,如果客戶端數量眾多,可能會消耗大量的系統資源。

解決方案:

使用非阻塞的NIO (讀取數據不等待,數據準備好了再工作)

為了體現NIO使用的高效。

這里先模擬一個低效的客戶端來模擬因網絡而延時的情況:

private static ExecutorService tp= Executors.newCachedThreadPool();   private static final int sleep_time=1000*1000*1000;   public static class EchoClient implements Runnable{    public void run(){       try {         client = new Socket();         client.connect(new InetSocketAddress("localhost", 8000));      writer = new PrintWriter(client.getOutputStream(), true);      writer.print("H");         LockSupport.parkNanos(sleep_time);       writer.print("e");        LockSupport.parkNanos(sleep_time);       writer.print("l");       LockSupport.parkNanos(sleep_time);      writer.print("l");       LockSupport.parkNanos(sleep_time);      writer.print("o");       LockSupport.parkNanos(sleep_time);      writer.print("!");        LockSupport.parkNanos(sleep_time);      writer.println();       writer.flush();     }catch(Exception e)    {    }   }  }

服務器端輸出:

spend:6000ms
spend:6000ms
spend:6000ms
spend:6001ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6002ms
spend:6003ms
spend:6003ms

因為

while ((inputLine = is.readLine()) != null)

是阻塞的,所以時間都花在等待中。

如果用NIO來處理這個問題會怎么做呢?

NIO有一個很大的特點就是:把數據準備好了再通知我

而Channel有點類似于流,一個Channel可以和文件或者網絡Socket對應 。

selector是一個選擇器,它可以選擇某一個Channel,然后做些事情。

一個線程可以對應一個selector,而一個selector可以輪詢多個Channel,而每個Channel對應了一個Socket。

與上面一個線程對應一個Socket相比,使用NIO后,一個線程可以輪詢多個Socket。

當selector調用select()時,會查看是否有客戶端準備好了數據。當沒有數據被準備好時,select()會阻塞。平時都說NIO是非阻塞的,但是如果沒有數據被準備好還是會有阻塞現象。

當有數據被準備好時,調用完select()后,會返回一個SelectionKey,SelectionKey表示在某個selector上的某個Channel的數據已經被準備好了。

只有在數據準備好時,這個Channel才會被選擇。

這樣NIO實現了一個線程來監控多個客戶端。

而剛剛模擬的網絡延遲的客戶端將不會影響NIO下的線程,因為某個Socket網絡延遲時,數據還未被準備好,selector是不會選擇它的,而會選擇其他準備好的客戶端。

selectNow()與select()的區別在于,selectNow()是不阻塞的,當沒有客戶端準備好數據時,selectNow()不會阻塞,將返回0,有客戶端準備好數據時,selectNow()返回準備好的客戶端的個數。

主要代碼:

package test; import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.channels.spi.AbstractSelector;import java.nio.channels.spi.SelectorProvider;import java.util.HashMap;import java.util.Iterator;import java.util.LinkedList;import java.util.Map;import java.util.Set;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class MultiThreadNIOEchoServer { public static Map<Socket, Long> geym_time_stat = new HashMap<Socket, Long>();  class EchoClient {  private LinkedList<ByteBuffer> outq;   EchoClient() {   outq = new LinkedList<ByteBuffer>();  }   public LinkedList<ByteBuffer> getOutputQueue() {   return outq;  }   public void enqueue(ByteBuffer bb) {   outq.addFirst(bb);  } }  class HandleMsg implements Runnable {  SelectionKey sk;  ByteBuffer bb;   public HandleMsg(SelectionKey sk, ByteBuffer bb) {   super();   this.sk = sk;   this.bb = bb;  }   @Override  public void run() {   // TODO Auto-generated method stub   EchoClient echoClient = (EchoClient) sk.attachment();   echoClient.enqueue(bb);   sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);   selector.wakeup();  }  }  private Selector selector; private ExecutorService tp = Executors.newCachedThreadPool();  private void startServer() throws Exception {  selector = SelectorProvider.provider().openSelector();  ServerSocketChannel ssc = ServerSocketChannel.open();  ssc.configureBlocking(false);  InetSocketAddress isa = new InetSocketAddress(8000);  ssc.socket().bind(isa);  // 注冊感興趣的事件,此處對accpet事件感興趣  SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);  for (;;) {   selector.select();   Set readyKeys = selector.selectedKeys();   Iterator i = readyKeys.iterator();   long e = 0;   while (i.hasNext()) {    SelectionKey sk = (SelectionKey) i.next();    i.remove();    if (sk.isAcceptable()) {     doAccept(sk);    } else if (sk.isValid() && sk.isReadable()) {     if (!geym_time_stat.containsKey(((SocketChannel) sk       .channel()).socket())) {      geym_time_stat.put(        ((SocketChannel) sk.channel()).socket(),        System.currentTimeMillis());     }     doRead(sk);    } else if (sk.isValid() && sk.isWritable()) {     doWrite(sk);     e = System.currentTimeMillis();     long b = geym_time_stat.remove(((SocketChannel) sk       .channel()).socket());     System.out.println("spend:" + (e - b) + "ms");    }   }  } }  private void doWrite(SelectionKey sk) {  // TODO Auto-generated method stub  SocketChannel channel = (SocketChannel) sk.channel();  EchoClient echoClient = (EchoClient) sk.attachment();  LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();  ByteBuffer bb = outq.getLast();  try {   int len = channel.write(bb);   if (len == -1) {    disconnect(sk);    return;   }   if (bb.remaining() == 0) {    outq.removeLast();   }  } catch (Exception e) {   // TODO: handle exception   disconnect(sk);  }  if (outq.size() == 0) {   sk.interestOps(SelectionKey.OP_READ);  } }  private void doRead(SelectionKey sk) {  // TODO Auto-generated method stub  SocketChannel channel = (SocketChannel) sk.channel();  ByteBuffer bb = ByteBuffer.allocate(8192);  int len;  try {   len = channel.read(bb);   if (len < 0) {    disconnect(sk);    return;   }  } catch (Exception e) {   // TODO: handle exception   disconnect(sk);   return;  }  bb.flip();  tp.execute(new HandleMsg(sk, bb)); }  private void disconnect(SelectionKey sk) {  // TODO Auto-generated method stub  //省略略干關閉操作 }  private void doAccept(SelectionKey sk) {  // TODO Auto-generated method stub  ServerSocketChannel server = (ServerSocketChannel) sk.channel();  SocketChannel clientChannel;  try {   clientChannel = server.accept();   clientChannel.configureBlocking(false);   SelectionKey clientKey = clientChannel.register(selector,     SelectionKey.OP_READ);   EchoClient echoClinet = new EchoClient();   clientKey.attach(echoClinet);   InetAddress clientAddress = clientChannel.socket().getInetAddress();   System.out.println("Accepted connection from "     + clientAddress.getHostAddress());  } catch (Exception e) {   // TODO: handle exception  } }  public static void main(String[] args) {  // TODO Auto-generated method stub  MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();  try {   echoServer.startServer();  } catch (Exception e) {   // TODO: handle exception  }  } }

代碼僅作參考,主要的特點是,對不同事件的感興趣來做不同的事。

當用之前模擬的那個延遲的客戶端時,這次的時間消耗就在2ms到11ms之間了。性能提升是很明顯的。

總結:

1. NIO會將數據準備好后,再交由應用進行處理,數據的讀取/寫入過程依然在應用線程中完成,只是將等待的時間剝離到單獨的線程中去。

2. 節省數據準備時間(因為Selector可以復用)

5. AIO

AIO的特點:

1. 讀完了再通知我

2. 不會加快IO,只是在讀完后進行通知

3. 使用回調函數,進行業務處理

AIO的相關代碼:

AsynchronousServerSocketChannel

server = AsynchronousServerSocketChannel.open().bind( new InetSocketAddress (PORT));
使用server上的accept方法

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
CompletionHandler為回調接口,當有客戶端accept之后,就做handler中的事情。

示例代碼:

server.accept(null,    new CompletionHandler<AsynchronousSocketChannel, Object>() {     final ByteBuffer buffer = ByteBuffer.allocate(1024);      public void completed(AsynchronousSocketChannel result,       Object attachment) {      System.out.println(Thread.currentThread().getName());      Future<Integer> writeResult = null;      try {       buffer.clear();       result.read(buffer).get(100, TimeUnit.SECONDS);       buffer.flip();       writeResult = result.write(buffer);      } catch (InterruptedException | ExecutionException e) {       e.printStackTrace();      } catch (TimeoutException e) {       e.printStackTrace();      } finally {       try {        server.accept(null, this);        writeResult.get();        result.close();       } catch (Exception e) {        System.out.println(e.toString());       }      }     }      @Override     public void failed(Throwable exc, Object attachment) {      System.out.println("failed: " + exc);     }    });

這里使用了Future來實現即時返回,關于Future請參考上一篇

在理解了NIO的基礎上,看AIO,區別在于AIO是等讀寫過程完成后再去調用回調函數。

NIO是同步非阻塞的

AIO是異步非阻塞的

由于NIO的讀寫過程依然在應用線程里完成,所以對于那些讀寫過程時間長的,NIO就不太適合。

而AIO的讀寫過程完成后才被通知,所以AIO能夠勝任那些重量級,讀寫過程長的任務。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 安徽省| 峡江县| 德惠市| 昭通市| 伊通| 泽库县| 石柱| 龙川县| 衢州市| 绵阳市| 邳州市| 扎囊县| 新兴县| 和龙市| 邻水| 东方市| 岑巩县| 大渡口区| 合水县| 西畴县| 香港| 平潭县| 平塘县| 大邑县| 武邑县| 天全县| 专栏| 吴川市| 潼关县| 宝坻区| 沙田区| 聂拉木县| 林口县| 逊克县| 平舆县| 郎溪县| 连江县| 镇原县| 武川县| 上栗县| 博白县|