生產者
//生產者import java.util.concurrent.BlockingQueue;public class PRoducer implements Runnable { private BlockingQueue<FilesRes> queue; private FilesRes produce; public Producer(BlockingQueue<FilesRes> queue, FilesRes produce) { this.queue = queue; if (null != produce) this.produce = produce; else this.produce = null; } @Override public void run() { try { queue.put(produce); //當隊列里滿的話,會阻塞 } catch (InterruptedException e) { System.out.println(e.getMessage()); } } }消費者
//消費者import java.io.File;import java.io.IOException;import java.util.concurrent.BlockingQueue;import java.util.concurrent.atomic.AtomicInteger;import org.apache.commons.io.FileUtils;public class Consumer implements Runnable { private BlockingQueue<FilesRes> queue; private static AtomicInteger atomic = new AtomicInteger(0);//初始化計數器 public Consumer(BlockingQueue<FilesRes> queue) { this.queue = queue; } @Override public void run() { try { FilesRes filesRes = queue.take(); //當隊列里是空的話,會阻塞 String nameFiles = "E:/Source/CopyPhotossss/" ; File file = new File(filesRes.getUrl()); File ofile = new File(nameFiles+filesRes.getFileName()); FileUtils.copyFile(file, ofile, true); //使用的是commons-io工具包 } catch (InterruptedException e) { System.out.println(e.getMessage()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }JavaBean:里面只是保存了一下文件的路徑,在這個測試里面沒有多少實際意義
public class FilesRes { private String fileName; private String url; private String idCard; /** * * @param fileName 文件名 * @param url 路徑 * @param idCard 身份證 */ public FilesRes(String fileName, String url, String idCard) { super(); this.fileName = fileName; this.url = url; this.idCard = idCard; } ...get和set 方法省略}測試類:
import java.io.File;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class Tester { public static void main(String[] args) throws InterruptedException { long starTime=System.currentTimeMillis(); LinkedBlockingQueue<FilesRes> queue = new LinkedBlockingQueue<FilesRes>(10); //線程池 操作 ExecutorService service = Executors.newCachedThreadPool(); String sourceFile = "E:/Source/Photos"; File file = new File(sourceFile); File [] files = file.listFiles(); for (File filename : files) { FilesRes filesRes = new FilesRes(filename.getName(),filename.getAbsolutePath(),filename.getName()); service.submit(new Producer(queue, filesRes)); } //開始 for (int i = 0; i < files.length; i++) { service.submit(new Consumer(queue)); } service.shutdown(); while(true){//用于判斷是否已經導完 if(service.isTerminated()){ System.out.println("文件已經遷移成功!"); long endTime=System.currentTimeMillis(); long time=endTime-starTime; System.out.println(time); break; } Thread.sleep(200); } } }搞定,但是代碼最后判斷是否已經導入完畢的時候,使用了輪詢方式去遍歷不是很理想。查了寫資料,雖然也有其他方式,但是比較麻煩。在研究的過程中,我發現了一個非常好用的disruptor架構提供的生產者和消費者模式有這個功能,很不錯。 具體的使用可以看下面的連接,我就不再闡述了。
disruptor-3.3.2源碼解析 (序列)http://www.myexception.cn/open-source/2036769.html (隊列)http://www.myexception.cn/open-source/2036766.html (發布事件)http://www.myexception.cn/open-source/2036781.html (處理事件)http://www.myexception.cn/open-source/2036779.html (框架支持)http://www.myexception.cn/open-source/2036776.html
最后我把代碼貼一些,有興趣的人可以調一下
package disruptor;import java.io.File;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.IgnoreExceptionHandler;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.SequenceBarrier;import com.lmax.disruptor.WorkHandler;import com.lmax.disruptor.WorkerPool;import com.lmax.disruptor.YieldingWaitStrategy; public class Demo3 { public static void main(String[] args) throws InterruptedException { long starTime=System.currentTimeMillis(); int BUFFER_SIZE=1024 * 1024 * 2; RingBuffer<FilesRes> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<FilesRes>() { @Override public FilesRes newInstance() { return new FilesRes(); } },BUFFER_SIZE,new YieldingWaitStrategy()); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(2); WorkHandler<FilesRes> workHandlers=new FilesWorkHandlers(); WorkerPool<FilesRes> workerPool=new WorkerPool<FilesRes>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), workHandlers); workerPool.start(executor); String sourceFile = "E:/Source/Photos"; //下面這個生產8個數據,圖簡單就寫到主線程算了 File file = new File(sourceFile); File [] files = file.listFiles(); for (File filename : files) { long seq=ringBuffer.next(); try{ FilesRes filesRes = ringBuffer.get(seq); filesRes.setFileName(filename.getName()); filesRes.setUrl(filename.getAbsolutePath()); filesRes.setIdCard(filename.getName()); }finally{ ringBuffer.publish(seq); System.out.println(seq); } } workerPool.drainAndHalt();//任務完成后關閉 線程會在這里堵塞 (太愛了,不用判斷是否所有的線程都完畢了,哦也) executor.shutdown(); long endTime=System.currentTimeMillis(); long time=endTime-starTime; System.out.println(time); } } package disruptor;import java.io.File;import org.apache.commons.io.FileUtils;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.WorkHandler;public class FilesWorkHandlers implements EventHandler<FilesRes>,WorkHandler<FilesRes>{ @Override public void onEvent(FilesRes filesRes) throws Exception { //System.out.println(filesRes.getUrl()); String nameFiles = "E:/Source/CopyPhoto2/" ; File file = new File(filesRes.getUrl()); File ofile = new File(nameFiles+filesRes.getFileName()); FileUtils.copyFile(file, ofile, true); } @Override public void onEvent(FilesRes arg0, long arg1, boolean arg2) throws Exception { // TODO Auto-generated method stub }}怎么樣是不是代碼更加優雅簡潔呢?
新聞熱點
疑難解答