国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

redis隊列及多線程應用

2019-11-15 00:07:54
字體:
來源:轉載
供稿:網友
redis隊列及多線程應用

  由于xxx平臺上自己的博客已經很久沒更新了,一直以來都是用的印象筆記來做工作中知識的積累存根,不知不覺印象筆記里已經有了四、五百遍文章。為了從新開始能與廣大攻城獅共同提高技術能力與水平,隨決心另起爐灶在新的博客與大家分享

  經過一段時間項目的沉淀之后,對實際應用中的多線程開發及隊列使用產生了深厚的興趣,也將<<java并發編程實戰>>仔細的閱讀了兩三遍,也看了很多并發編程的實踐項目,也有了深刻的理解與在實踐中合理應用隊列、多線程開發的應用場景

  1、真實應用場景描述:

   由于一段時間以來要針對公司整個電商平臺包括官網、移動端所有的交易數據進行統計,統計指標包括:pv、uv、實付金額、轉化率、毛利率等等,按照各種不同的維度來統計計算出當前交易系統的各個指標的數據,但要求該項目是獨立的,沒有任務其它資源的協助及接品提供。經過一番xxxx思考討論之后。業務上決定用以下解決方案:

    A: 用一個定時服務每隔10秒去別的系統數據庫抓取上一次查詢時間以來新確認的訂單(這種訂單表示已經支付完在或者客戶已經審核確認了),然后將這些訂單的唯一編號放入redis隊列。

    B: 由于用到了隊列,根據經驗自然而然的想到了 啟動單獨的線程去redis隊列中不斷獲取要統計處理的訂單編號,然后將獲取到的訂單編號放入線程池中進行訂單的統計任務處理。

    開發實現:

    FetchConfirmOrdersFromErpJob.java

 1 /** 2      * 1、從redis中獲取上次查詢的時間戳 3      * 2、將當前時間戳放入到redis中,以便 下次按這個時間查詢 4      * 3、去erp訂單表查詢confirm_time>=上次查詢的時間的訂單,放入隊列中 5      */ 6     @Scheduled(cron = "0/30 * * * * ?") 7     public void start(){ 8         logger.info("FetchConfirmOrdersFromErpJob start................."+ new Date()); 9         StopWatch watch=new StopWatch();10         watch.start();11         //上次查詢的時間12         String PReQueryTimeStr=this.readRedisService.get(Constans.CACHE_PREQUERYORDERTIME);13         14         Date now=new Date();15         if(StringUtils.isBlank(preQueryTimeStr)){16             preQueryTimeStr=DateFormatUtils.format(DateUtils.addHours(now, -1), Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS);//第一次查詢之前一個小時的訂單17 //            preQueryTimeStr="2015-05-07 10:00:00";//本地測試的時候使用18         }19         //設置當前時間為上次查詢的時間20         this.writeRedisService.set(Constans.CACHE_PREQUERYORDERTIME, DateFormatUtils.format(now, Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS));21         22         List<Map<String, Object>> confirmOrderIds = this.erpOrderService.selectOrderIdbyConfirmtime(preQueryTimeStr);23         if(confirmOrderIds==null){24             logger.info("query confirmOrderIds is null,without order data need dealth..........");25             return;26         }27         for (Map<String, Object> map : confirmOrderIds) {         //將訂單編號放入隊列中28             this.writeRedisService.lpush(Constans.CACHE_ORDERIDS, map.get("channel_orderid").toString());29             logger.info("=======lpush orderid:"+map.get("channel_orderid").toString());30         }31         32         watch.stop();33         logger.info("FetchConfirmOrdersFromErpJob end................."+ new Date()+" total cost time:"+watch.getTime()+" dealth data count:"+confirmOrderIds.size());34     }

    OrderCalculate.java 隊列獲取訂單線程

 1 public class OrderCalculate { 2  3     private static final Log logger = LogFactory.getLog(OrderCalculate.class); 4      5     @Autowired 6     private static WriteRedisService writeRedisService; 7      8     private static ExecutorService threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4 9             ,new TjThreadFactory("CalculateAmount"));10     static{11         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {12             @Override13             public void run() {14                 QueuePop.stop();15                 threadPool.shutdown();16             }17         }));18     }19     20     public void init(){21         if(writeRedisService==null){22             writeRedisService=SpringContext.getBean(WriteRedisService.class);23         }24         new Thread(new QueuePop(),"OrderIdQueuePop").start();//由于是用redis做的隊列,所以只要使用一個線程從隊列里拿就ok25     }26     27     static class QueuePop implements Runnable{28 29         volatile static boolean stop=false;30         31         @Override32         public void run() {33             while(!stop){34                 //不斷循環從隊列里取出訂單id35                 String orderId=null;36                 try {37                     orderId = writeRedisService.rpop(Constans.CACHE_ORDERIDS);38                     if(orderId!=null){39                         logger.info("pop orderId:"+orderId);                //將獲取的訂單編號交給訂單統計任務處理線程處理40                         threadPool.submit(new CalculateAmount(Integer.parseInt(orderId),new Date()));41                     }42                 } catch (Exception e1) {43                     logger.error("",e1);44                 }45                 //根據上線后的業務反饋來確定是否改成wait/notify策略來及時處理確認的訂單46                 try {47                     Thread.sleep(10);48                 } catch (InterruptedException e) {49                     logger.error("",e);50 //                    Thread.currentThread().interrupt();51                     //stop=true;//線程被打算繼續執行,不應該被關閉,保證該線程永遠不會死掉52                 }53             }54         }55         56         public static void stop(){57             stop=true;58         }59         60     }61     62 }

      CalculateAmoiunt.java 訂單任務處理

  1 public class CalculateAmount implements Runnable {  2     private static final Log logger = LogFactory.getLog(CalculateAmount.class);  3     private int orderId;  4     private Date now;//確認時間  這個時間有一定的延遲,基本可以忽略,如果沒什么用  5     private OrderService orderServices;  6     private OrdHaveProductService ordHaveProductService;  7     private OrdPayByCashbackService ordPayByCashbackService;  8     private OrdPayByCouponService ordPayByCouponService;  9     private OrdPayByGiftCardService ordPayByGiftCardService; 10     private StatistiCSService statisticsService; 11     private WriteRedisService writeRedisService; 12     private ReadRedisService readRedisService; 13     private ErpOrderGoodsService erpOrderGoodsService; 14     private ErpOrderService erpOrderService; 15      16      17     public CalculateAmount(int orderId,Date now) { 18         super(); 19         this.orderId = orderId; 20         this.now=now; 21         orderServices=SpringContext.getBean(OrderService.class); 22         ordHaveProductService=SpringContext.getBean(OrdHaveProductService.class); 23         ordPayByCashbackService=SpringContext.getBean(OrdPayByCashbackService.class); 24         ordPayByCouponService=SpringContext.getBean(OrdPayByCouponService.class); 25         ordPayByGiftCardService=SpringContext.getBean(OrdPayByGiftCardService.class); 26         statisticsService=SpringContext.getBean(StatisticsService.class); 27         writeRedisService=SpringContext.getBean(WriteRedisService.class); 28         readRedisService=SpringContext.getBean(ReadRedisService.class); 29         erpOrderGoodsService=SpringContext.getBean(ErpOrderGoodsService.class); 30         erpOrderService=SpringContext.getBean(ErpOrderService.class); 31     } 32  33     @Override 34     public void run() { 35         logger.info("CalculateAmount task run start........orderId:"+orderId); 36         StopWatch watch=new StopWatch(); 37         watch.start(); 38         /** 39          * 取出訂單相關的所有數據同步到統計的庫中 40          */ 41         //TODO  考慮要不要將下面所有操作放到一個事務里面 42         List<Map<String, Object>> orders = this.orderServices.selectOrderById(orderId); 43         if(orders!=null&&orders.size()>0){ 44             Map<String, Object> order = orders.get(0); 45              46             String orderSN=U.nvl(order.get("OrderSN"));//訂單編號 47             Integer userId=U.nvlInt(order.get("usr_UserID"),null);//用戶d 48             Integer status=U.nvlInt(order.get("Status"),null);//狀態 49             Date createTime=now;//(Date)order.get("CreateTime");//創建時間 50             Date modifyTime=now;//(Date)order.get("ModifyTime");// 更新時間 51             BigDecimal discountPrice=U.nvlDecimal(order.get("DiscountPrice"),null);//優惠總額  滿減金額 52             BigDecimal payPrice=U.nvlDecimal(order.get("PayPrice"), null);//實付金額 53             BigDecimal totalPrice=U.nvlDecimal(order.get("TotalPrice"), null);//總金額 54              55             //從erp里查詢出訂單的確認時間 56             int dbConfirmTime=0; 57             try { 58                 dbConfirmTime = this.erpOrderService.selectConfirmTimeByOrderId(orderId); 59             } catch (Exception e2) { 60                 logger.error("",e2); 61             } 62             Date ct=new Date(dbConfirmTime*1000L); 63              64             int[] dates=U.getYearMonthDayHour(ct);// 65             if(modifyTime!=null){ 66                 dates=U.getYearMonthDayHour(modifyTime);// 67             } 68             int year=dates[0];//年 69             int month=dates[1];//月 70             int day=dates[2];//日 71             int hour=dates[3];//小時 72              73             String ordersId=orderId+"";//生成訂單id 74              75             //查詢訂單的來源和搜索引擎關鍵字 76             String source=""; 77             String seKeyWords=""; 78             List<OrdersData> orderDataList=this.statisticsService.selectOrdersDataByOrdersId(orderSN); 79             if(orderDataList!=null&&!orderDataList.isEmpty()){ 80                 OrdersData ordersData = orderDataList.get(0); 81                 source=ordersData.getSource(); 82                 seKeyWords=ordersData.getSeKeyWords(); 83             } 84              85             //TODO 將訂單入庫 86             ArrayList<RelOrders> relOrdersList = Lists.newArrayList(); 87             RelOrders relOrders=new RelOrders(orderSN,userId+"",Byte.valueOf(status+""),source,seKeyWords,IsCal.未計算.getFlag(),(byte)U.getSimpleYearByYear(year),(byte)month,(byte)day,(byte)hour,ct,createTime,modifyTime); 88             relOrdersList.add(relOrders); 89              90             try { 91                 relOrders.setConfirmTime(ct); 92                 //查詢RelOrders是否存在 93                 RelOrders dbOrders=this.statisticsService.selectByPrimaryKey(orderSN); 94                 if(dbOrders!=null){ 95                     //更新 96                     dbOrders.setStatus(Byte.valueOf(status+"")); 97                     dbOrders.setConfirmTime(ct); 98                     dbOrders.setModifyTime(modifyTime); 99                     this.statisticsService.updateByPrimaryKeySelective(dbOrders);100                     return;101                 }else{102                     Integer relResult=this.statisticsService.insertRelOrdersBatch(relOrdersList);103                 }104             } catch (Exception e) {105                 logger.error("insertRelOrdersBatch error",e);106             }107             /**108              * 查這個訂單的返現、優惠券、禮品卡  的金額109              */110             List<Map<String, Object>> cashs = this.ordPayByCashbackService.selectDecutionPriceByOrderId(orderId);111             List<Map<String, Object>> coupons = this.ordPayByCouponService.selectDecutionPriceByOrderId(orderId);112             113             BigDecimal cashAmount=U.getValueByKey(cashs, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//返現金額114             BigDecimal couponAmont=U.getValueByKey(coupons, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//紅包金額115             /**116              * 查詢出這個訂單的所有商品117              */118             List<Map<String, Object>> products=null;119             Map<String,Object> productToKeyWordMap=Maps.newHashMap();120             try {121                 products = this.ordHaveProductService.selectByOrderId(orderId);122                 List<OrdersItemData> ordersItemDataList=this.statisticsService.selectOrdersItemDataByOrdersId(orderSN);123                 if(ordersItemDataList!=null){124                     for (OrdersItemData ordersItemData : ordersItemDataList) {125                         productToKeyWordMap.put(ordersItemData.getItemId(), ordersItemData.getKeyWords());126                     }127                 }128             } catch (Exception e1) {129                 logger.error("",e1);130             }131             if(products!=null){132                 ArrayList<RelOrdersItem> relOrdersItemList = Lists.newArrayList();133                 for (Map<String, Object> product : products) {134                     Integer productId=U.nvlInt(product.get("pro_ProductID"), null);//商品Id135                     Integer buyNo=U.nvlInt(product.get("BuyNo"), 0);//購買數量136                     String SN=U.nvl(product.get("SN"),"");137                     BigDecimal buyPrice=U.nvlDecimal(product.get("BuyPrice"), BigDecimal.ZERO);//購買價格138                     BigDecimal buyTotalPrice=U.nvlDecimal(product.get("BuyTotalPrice"), null);//購買總價格139                     BigDecimal productPayPrice=U.nvlDecimal(product.get("PayPrice"), null);//單品實付金額140                     141                     BigDecimal cost=null;//商品成本  TODO 調別人的接口142                     BigDecimal realtimeAmount=null;//實付金額143                     144                     BigDecimal pdCashAmount=BigDecimal.ZERO;//每個商品的返現145                     BigDecimal pdcouponAmont=BigDecimal.ZERO;//每個商品的優惠券146                     147                     //商品價格所占訂單比例148                     if(buyTotalPrice!=null&&totalPrice!=null&&totalPrice.doubleValue()!=0){149                         pdCashAmount=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(cashAmount).setScale(2,BigDecimal.ROUND_HALF_UP);150                         pdcouponAmont=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(couponAmont).setScale(2,BigDecimal.ROUND_HALF_UP);151                         discountPrice=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(discountPrice).setScale(2,BigDecimal.ROUND_HALF_UP);152                     }153                     154                     realtimeAmount=buyTotalPrice.subtract((pdCashAmount.add(pdcouponAmont).add(discountPrice))).setScale(2,BigDecimal.ROUND_HALF_UP);155                     156                     RelOrdersItem item=new RelOrdersItem(U.randomUUID(),orderSN,productId,SN,buyNo,realtimeAmount,U.nvl(productToKeyWordMap.get(productId)));157                     158                     relOrdersItemList.add(item);159                     160                     //如果確認時間屬于同一天的話,將商品實付金額放入到redis排行榜中161                     if((status==1||status==5||status==6||status==7||status==11)&&DateUtils.isSameDay(new Date(), ct)){162                         //如果訂單的狀態是這幾種,剛將該商品加入到實付金額的排行 榜中163                         dates=U.getYearMonthDayHour(ct);//164                         int days=dates[2];165                         //某一個商品某一天的實付金額166                         BigDecimal itemRelAmount=BigDecimal.ZERO;167                         //從redis里取出這個商品的實付金額,然后累加168                         String itemRelAmountStr=readRedisService.get(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);169                         if(StringUtils.isNotBlank(itemRelAmountStr)){170                             itemRelAmount=new BigDecimal(itemRelAmountStr);171                         }172                         realtimeAmount=itemRelAmount.add(realtimeAmount);173                         writeRedisService.set(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days, realtimeAmount.toPlainString());174                         writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);175                         writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTSS_KEY+days, realtimeAmount.doubleValue(), productId+"");176                         //確認的銷量177                         Long itemCount= writeRedisService.incrBy(Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days,buyNo);178                         writeRedisService.zadd(Constans.CACHE_ITEMSALES_SS_KEY_PRDFIX+days, itemCount, productId+"");179                         180                         String itemType="";181                         Map<String, String> pMap = this.readRedisService.hmget(Constans.CACHE_PRODUCT_KEY+productId);182                         itemType=pMap.get("categoryId");183                         if(StringUtils.isNotBlank(itemType)){184                             if(ProductCategory.isGuanBai(itemType)){185                                 //如果是白酒  官白的訪客數排行 186                                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTWHITESS_KEY+days,  realtimeAmount.doubleValue(), productId+"");//187                                 //確認的銷量排行188                                  this.writeRedisService.zadd(Constans.CACHE_ITEMSALESWHITE_SS_KEY_PRDFIX+days, itemCount, productId+"");//189                             }else if(ProductCategory.isGuanHong(itemType)){190                                 //官紅的訪客數排行 191                                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTREDSS_KEY+days,  realtimeAmount.doubleValue(), productId+"");//192                                 //確認的銷量排行193                                  this.writeRedisService.zadd(Constans.CACHE_ITEMSALESRED_SS_KEY_PRDFIX+days, itemCount, productId+"");//194                             }195                         }196                         197                         //某一個商品的銷量加入刪除列表198                         writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);199                     }200                 }201                 try {202                     //TODO 將訂單商品明細入庫203                     this.statisticsService.insertRelOrdersItemBatch(relOrdersItemList);204                     //再將訂單的狀態改為已計算205                     this.statisticsService.updateIsCal(orderSN,IsCal.已計算.getFlag());//將是否計算改成已計算206                     //該訂單的所有商品的成本同步到現在的庫中。207                     this.calOrderProductCostSync(orderId,orderSN,products);208                 } catch (Exception e) {209                     logger.error("insertRelOrdersItemBatch or updateIsCal error",e);210                 }211             }212         }213         watch.stop();214         logger.info("CalculateAmount task run end........total cost time:"+watch.getTime()+"   orderId:"+orderId);215     }216   217     private void calOrderProductCostSync(int orderId,String orderSN,List<Map<String, Object>> products){218         List<Map<String, Object>> ordersList = this.erpOrderGoodsService.selectProductCostByOrderSN(orderSN);219         if(ordersList==null||ordersList.isEmpty()){220             logger.error("according orderId to query some data from erp return is null.........");221             return;222         }223         Map<String, String> itemIdToItemSnMap = U.convertToMapByList(products, "pro_ProductID", "SN");224         225         List<RelItemCosts> list=Lists.newArrayList();226         for (Map<String, Object> map : ordersList) {227             RelItemCosts itemCost=new RelItemCosts();228             if(map==null){229                 continue;230             }231             Integer itemId=U.nvlInt(map.get("goods_id"),-99);232             BigDecimal costs=U.nvlDecimal(map.get("Dynamic_price"), BigDecimal.ZERO);233             itemCost.setId(U.randomUUID());234             itemCost.setOrdersId(orderId+"");235             itemCost.setOrdersNo(orderSN);236             itemCost.setItemId(itemId);237             itemCost.setItemNo(itemIdToItemSnMap.get(itemId+""));238             itemCost.setCosts(costs);239             itemCost.setCreateTime(new Date());240             itemCost.setModifyTime(new Date());241             list.add(itemCost);242         }243         244         this.statisticsService.insertRelItemCostsBatch(list);245         246     }247     248 }

  注意:

    1、redis2.6版本使用lpush、rpop出列的時候會丟失數據。換成2.8及以上的版本運行正常。

    2、由于應用會部署到多個結點,所以無法直接采用java的BlockingQueue阻塞隊列,幫采用redis提供的隊列支持。

    3、如果要做到統計的絕對實時,最好采用大數據的實時計算的解決方案:kafka+storm 來實現

  以上為隊列結合線程的實踐案例,供大家一起探討。

  轉載請注明出處,請大家尊重作者的勞動成果。


上一篇:字符串轉換功能

下一篇:練習

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 青神县| 延吉市| 乌审旗| 上饶县| 蓬溪县| 安庆市| 吉水县| 遵义市| 仁化县| 泌阳县| 岫岩| 德昌县| 哈尔滨市| 吉首市| 伊宁市| 友谊县| 绵竹市| 霍林郭勒市| 静宁县| 孝义市| 丘北县| 监利县| 蒙城县| 明水县| 河西区| 弥渡县| 满洲里市| 临猗县| 邵东县| 准格尔旗| 遵化市| 龙游县| 龙口市| 仙桃市| 巴青县| 德庆县| 开鲁县| 四川省| 新宁县| 青川县| 威宁|