由于xxx平臺上自己的博客已經很久沒更新了,一直以來都是用的印象筆記來做工作中知識的積累存根,不知不覺印象筆記里已經有了四、五百遍文章。為了從新開始能與廣大攻城獅共同提高技術能力與水平,隨決心另起爐灶在新的博客與大家分享
經過一段時間項目的沉淀之后,對實際應用中的多線程開發及隊列使用產生了深厚的興趣,也將<<java并發編程實戰>>仔細的閱讀了兩三遍,也看了很多并發編程的實踐項目,也有了深刻的理解與在實踐中合理應用隊列、多線程開發的應用場景
1、真實應用場景描述:
由于一段時間以來要針對公司整個電商平臺包括官網、移動端所有的交易數據進行統計,統計指標包括:pv、uv、實付金額、轉化率、毛利率等等,按照各種不同的維度來統計計算出當前交易系統的各個指標的數據,但要求該項目是獨立的,沒有任務其它資源的協助及接品提供。經過一番xxxx思考討論之后。業務上決定用以下解決方案:
A: 用一個定時服務每隔10秒去別的系統數據庫抓取上一次查詢時間以來新確認的訂單(這種訂單表示已經支付完在或者客戶已經審核確認了),然后將這些訂單的唯一編號放入redis隊列。
B: 由于用到了隊列,根據經驗自然而然的想到了 啟動單獨的線程去redis隊列中不斷獲取要統計處理的訂單編號,然后將獲取到的訂單編號放入線程池中進行訂單的統計任務處理。
開發實現:
FetchConfirmOrdersFromErpJob.java
1 /** 2 * 1、從redis中獲取上次查詢的時間戳 3 * 2、將當前時間戳放入到redis中,以便 下次按這個時間查詢 4 * 3、去erp訂單表查詢confirm_time>=上次查詢的時間的訂單,放入隊列中 5 */ 6 @Scheduled(cron = "0/30 * * * * ?") 7 public void start(){ 8 logger.info("FetchConfirmOrdersFromErpJob start................."+ new Date()); 9 StopWatch watch=new StopWatch();10 watch.start();11 //上次查詢的時間12 String PReQueryTimeStr=this.readRedisService.get(Constans.CACHE_PREQUERYORDERTIME);13 14 Date now=new Date();15 if(StringUtils.isBlank(preQueryTimeStr)){16 preQueryTimeStr=DateFormatUtils.format(DateUtils.addHours(now, -1), Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS);//第一次查詢之前一個小時的訂單17 // preQueryTimeStr="2015-05-07 10:00:00";//本地測試的時候使用18 }19 //設置當前時間為上次查詢的時間20 this.writeRedisService.set(Constans.CACHE_PREQUERYORDERTIME, DateFormatUtils.format(now, Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS));21 22 List<Map<String, Object>> confirmOrderIds = this.erpOrderService.selectOrderIdbyConfirmtime(preQueryTimeStr);23 if(confirmOrderIds==null){24 logger.info("query confirmOrderIds is null,without order data need dealth..........");25 return;26 }27 for (Map<String, Object> map : confirmOrderIds) { //將訂單編號放入隊列中28 this.writeRedisService.lpush(Constans.CACHE_ORDERIDS, map.get("channel_orderid").toString());29 logger.info("=======lpush orderid:"+map.get("channel_orderid").toString());30 }31 32 watch.stop();33 logger.info("FetchConfirmOrdersFromErpJob end................."+ new Date()+" total cost time:"+watch.getTime()+" dealth data count:"+confirmOrderIds.size());34 }OrderCalculate.java 隊列獲取訂單線程
1 public class OrderCalculate { 2 3 private static final Log logger = LogFactory.getLog(OrderCalculate.class); 4 5 @Autowired 6 private static WriteRedisService writeRedisService; 7 8 private static ExecutorService threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4 9 ,new TjThreadFactory("CalculateAmount"));10 static{11 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {12 @Override13 public void run() {14 QueuePop.stop();15 threadPool.shutdown();16 }17 }));18 }19 20 public void init(){21 if(writeRedisService==null){22 writeRedisService=SpringContext.getBean(WriteRedisService.class);23 }24 new Thread(new QueuePop(),"OrderIdQueuePop").start();//由于是用redis做的隊列,所以只要使用一個線程從隊列里拿就ok25 }26 27 static class QueuePop implements Runnable{28 29 volatile static boolean stop=false;30 31 @Override32 public void run() {33 while(!stop){34 //不斷循環從隊列里取出訂單id35 String orderId=null;36 try {37 orderId = writeRedisService.rpop(Constans.CACHE_ORDERIDS);38 if(orderId!=null){39 logger.info("pop orderId:"+orderId); //將獲取的訂單編號交給訂單統計任務處理線程處理40 threadPool.submit(new CalculateAmount(Integer.parseInt(orderId),new Date()));41 }42 } catch (Exception e1) {43 logger.error("",e1);44 }45 //根據上線后的業務反饋來確定是否改成wait/notify策略來及時處理確認的訂單46 try {47 Thread.sleep(10);48 } catch (InterruptedException e) {49 logger.error("",e);50 // Thread.currentThread().interrupt();51 //stop=true;//線程被打算繼續執行,不應該被關閉,保證該線程永遠不會死掉52 }53 }54 }55 56 public static void stop(){57 stop=true;58 }59 60 }61 62 }CalculateAmoiunt.java 訂單任務處理
1 public class CalculateAmount implements Runnable { 2 private static final Log logger = LogFactory.getLog(CalculateAmount.class); 3 private int orderId; 4 private Date now;//確認時間 這個時間有一定的延遲,基本可以忽略,如果沒什么用 5 private OrderService orderServices; 6 private OrdHaveProductService ordHaveProductService; 7 private OrdPayByCashbackService ordPayByCashbackService; 8 private OrdPayByCouponService ordPayByCouponService; 9 private OrdPayByGiftCardService ordPayByGiftCardService; 10 private StatistiCSService statisticsService; 11 private WriteRedisService writeRedisService; 12 private ReadRedisService readRedisService; 13 private ErpOrderGoodsService erpOrderGoodsService; 14 private ErpOrderService erpOrderService; 15 16 17 public CalculateAmount(int orderId,Date now) { 18 super(); 19 this.orderId = orderId; 20 this.now=now; 21 orderServices=SpringContext.getBean(OrderService.class); 22 ordHaveProductService=SpringContext.getBean(OrdHaveProductService.class); 23 ordPayByCashbackService=SpringContext.getBean(OrdPayByCashbackService.class); 24 ordPayByCouponService=SpringContext.getBean(OrdPayByCouponService.class); 25 ordPayByGiftCardService=SpringContext.getBean(OrdPayByGiftCardService.class); 26 statisticsService=SpringContext.getBean(StatisticsService.class); 27 writeRedisService=SpringContext.getBean(WriteRedisService.class); 28 readRedisService=SpringContext.getBean(ReadRedisService.class); 29 erpOrderGoodsService=SpringContext.getBean(ErpOrderGoodsService.class); 30 erpOrderService=SpringContext.getBean(ErpOrderService.class); 31 } 32 33 @Override 34 public void run() { 35 logger.info("CalculateAmount task run start........orderId:"+orderId); 36 StopWatch watch=new StopWatch(); 37 watch.start(); 38 /** 39 * 取出訂單相關的所有數據同步到統計的庫中 40 */ 41 //TODO 考慮要不要將下面所有操作放到一個事務里面 42 List<Map<String, Object>> orders = this.orderServices.selectOrderById(orderId); 43 if(orders!=null&&orders.size()>0){ 44 Map<String, Object> order = orders.get(0); 45 46 String orderSN=U.nvl(order.get("OrderSN"));//訂單編號 47 Integer userId=U.nvlInt(order.get("usr_UserID"),null);//用戶d 48 Integer status=U.nvlInt(order.get("Status"),null);//狀態 49 Date createTime=now;//(Date)order.get("CreateTime");//創建時間 50 Date modifyTime=now;//(Date)order.get("ModifyTime");// 更新時間 51 BigDecimal discountPrice=U.nvlDecimal(order.get("DiscountPrice"),null);//優惠總額 滿減金額 52 BigDecimal payPrice=U.nvlDecimal(order.get("PayPrice"), null);//實付金額 53 BigDecimal totalPrice=U.nvlDecimal(order.get("TotalPrice"), null);//總金額 54 55 //從erp里查詢出訂單的確認時間 56 int dbConfirmTime=0; 57 try { 58 dbConfirmTime = this.erpOrderService.selectConfirmTimeByOrderId(orderId); 59 } catch (Exception e2) { 60 logger.error("",e2); 61 } 62 Date ct=new Date(dbConfirmTime*1000L); 63 64 int[] dates=U.getYearMonthDayHour(ct);// 65 if(modifyTime!=null){ 66 dates=U.getYearMonthDayHour(modifyTime);// 67 } 68 int year=dates[0];//年 69 int month=dates[1];//月 70 int day=dates[2];//日 71 int hour=dates[3];//小時 72 73 String ordersId=orderId+"";//生成訂單id 74 75 //查詢訂單的來源和搜索引擎關鍵字 76 String source=""; 77 String seKeyWords=""; 78 List<OrdersData> orderDataList=this.statisticsService.selectOrdersDataByOrdersId(orderSN); 79 if(orderDataList!=null&&!orderDataList.isEmpty()){ 80 OrdersData ordersData = orderDataList.get(0); 81 source=ordersData.getSource(); 82 seKeyWords=ordersData.getSeKeyWords(); 83 } 84 85 //TODO 將訂單入庫 86 ArrayList<RelOrders> relOrdersList = Lists.newArrayList(); 87 RelOrders relOrders=new RelOrders(orderSN,userId+"",Byte.valueOf(status+""),source,seKeyWords,IsCal.未計算.getFlag(),(byte)U.getSimpleYearByYear(year),(byte)month,(byte)day,(byte)hour,ct,createTime,modifyTime); 88 relOrdersList.add(relOrders); 89 90 try { 91 relOrders.setConfirmTime(ct); 92 //查詢RelOrders是否存在 93 RelOrders dbOrders=this.statisticsService.selectByPrimaryKey(orderSN); 94 if(dbOrders!=null){ 95 //更新 96 dbOrders.setStatus(Byte.valueOf(status+"")); 97 dbOrders.setConfirmTime(ct); 98 dbOrders.setModifyTime(modifyTime); 99 this.statisticsService.updateByPrimaryKeySelective(dbOrders);100 return;101 }else{102 Integer relResult=this.statisticsService.insertRelOrdersBatch(relOrdersList);103 }104 } catch (Exception e) {105 logger.error("insertRelOrdersBatch error",e);106 }107 /**108 * 查這個訂單的返現、優惠券、禮品卡 的金額109 */110 List<Map<String, Object>> cashs = this.ordPayByCashbackService.selectDecutionPriceByOrderId(orderId);111 List<Map<String, Object>> coupons = this.ordPayByCouponService.selectDecutionPriceByOrderId(orderId);112 113 BigDecimal cashAmount=U.getValueByKey(cashs, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//返現金額114 BigDecimal couponAmont=U.getValueByKey(coupons, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//紅包金額115 /**116 * 查詢出這個訂單的所有商品117 */118 List<Map<String, Object>> products=null;119 Map<String,Object> productToKeyWordMap=Maps.newHashMap();120 try {121 products = this.ordHaveProductService.selectByOrderId(orderId);122 List<OrdersItemData> ordersItemDataList=this.statisticsService.selectOrdersItemDataByOrdersId(orderSN);123 if(ordersItemDataList!=null){124 for (OrdersItemData ordersItemData : ordersItemDataList) {125 productToKeyWordMap.put(ordersItemData.getItemId(), ordersItemData.getKeyWords());126 }127 }128 } catch (Exception e1) {129 logger.error("",e1);130 }131 if(products!=null){132 ArrayList<RelOrdersItem> relOrdersItemList = Lists.newArrayList();133 for (Map<String, Object> product : products) {134 Integer productId=U.nvlInt(product.get("pro_ProductID"), null);//商品Id135 Integer buyNo=U.nvlInt(product.get("BuyNo"), 0);//購買數量136 String SN=U.nvl(product.get("SN"),"");137 BigDecimal buyPrice=U.nvlDecimal(product.get("BuyPrice"), BigDecimal.ZERO);//購買價格138 BigDecimal buyTotalPrice=U.nvlDecimal(product.get("BuyTotalPrice"), null);//購買總價格139 BigDecimal productPayPrice=U.nvlDecimal(product.get("PayPrice"), null);//單品實付金額140 141 BigDecimal cost=null;//商品成本 TODO 調別人的接口142 BigDecimal realtimeAmount=null;//實付金額143 144 BigDecimal pdCashAmount=BigDecimal.ZERO;//每個商品的返現145 BigDecimal pdcouponAmont=BigDecimal.ZERO;//每個商品的優惠券146 147 //商品價格所占訂單比例148 if(buyTotalPrice!=null&&totalPrice!=null&&totalPrice.doubleValue()!=0){149 pdCashAmount=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(cashAmount).setScale(2,BigDecimal.ROUND_HALF_UP);150 pdcouponAmont=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(couponAmont).setScale(2,BigDecimal.ROUND_HALF_UP);151 discountPrice=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(discountPrice).setScale(2,BigDecimal.ROUND_HALF_UP);152 }153 154 realtimeAmount=buyTotalPrice.subtract((pdCashAmount.add(pdcouponAmont).add(discountPrice))).setScale(2,BigDecimal.ROUND_HALF_UP);155 156 RelOrdersItem item=new RelOrdersItem(U.randomUUID(),orderSN,productId,SN,buyNo,realtimeAmount,U.nvl(productToKeyWordMap.get(productId)));157 158 relOrdersItemList.add(item);159 160 //如果確認時間屬于同一天的話,將商品實付金額放入到redis排行榜中161 if((status==1||status==5||status==6||status==7||status==11)&&DateUtils.isSameDay(new Date(), ct)){162 //如果訂單的狀態是這幾種,剛將該商品加入到實付金額的排行 榜中163 dates=U.getYearMonthDayHour(ct);//164 int days=dates[2];165 //某一個商品某一天的實付金額166 BigDecimal itemRelAmount=BigDecimal.ZERO;167 //從redis里取出這個商品的實付金額,然后累加168 String itemRelAmountStr=readRedisService.get(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);169 if(StringUtils.isNotBlank(itemRelAmountStr)){170 itemRelAmount=new BigDecimal(itemRelAmountStr);171 }172 realtimeAmount=itemRelAmount.add(realtimeAmount);173 writeRedisService.set(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days, realtimeAmount.toPlainString());174 writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);175 writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTSS_KEY+days, realtimeAmount.doubleValue(), productId+"");176 //確認的銷量177 Long itemCount= writeRedisService.incrBy(Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days,buyNo);178 writeRedisService.zadd(Constans.CACHE_ITEMSALES_SS_KEY_PRDFIX+days, itemCount, productId+"");179 180 String itemType="";181 Map<String, String> pMap = this.readRedisService.hmget(Constans.CACHE_PRODUCT_KEY+productId);182 itemType=pMap.get("categoryId");183 if(StringUtils.isNotBlank(itemType)){184 if(ProductCategory.isGuanBai(itemType)){185 //如果是白酒 官白的訪客數排行 186 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTWHITESS_KEY+days, realtimeAmount.doubleValue(), productId+"");//187 //確認的銷量排行188 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESWHITE_SS_KEY_PRDFIX+days, itemCount, productId+"");//189 }else if(ProductCategory.isGuanHong(itemType)){190 //官紅的訪客數排行 191 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTREDSS_KEY+days, realtimeAmount.doubleValue(), productId+"");//192 //確認的銷量排行193 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESRED_SS_KEY_PRDFIX+days, itemCount, productId+"");//194 }195 }196 197 //某一個商品的銷量加入刪除列表198 writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);199 }200 }201 try {202 //TODO 將訂單商品明細入庫203 this.statisticsService.insertRelOrdersItemBatch(relOrdersItemList);204 //再將訂單的狀態改為已計算205 this.statisticsService.updateIsCal(orderSN,IsCal.已計算.getFlag());//將是否計算改成已計算206 //該訂單的所有商品的成本同步到現在的庫中。207 this.calOrderProductCostSync(orderId,orderSN,products);208 } catch (Exception e) {209 logger.error("insertRelOrdersItemBatch or updateIsCal error",e);210 }211 }212 }213 watch.stop();214 logger.info("CalculateAmount task run end........total cost time:"+watch.getTime()+" orderId:"+orderId);215 }216 217 private void calOrderProductCostSync(int orderId,String orderSN,List<Map<String, Object>> products){218 List<Map<String, Object>> ordersList = this.erpOrderGoodsService.selectProductCostByOrderSN(orderSN);219 if(ordersList==null||ordersList.isEmpty()){220 logger.error("according orderId to query some data from erp return is null.........");221 return;222 }223 Map<String, String> itemIdToItemSnMap = U.convertToMapByList(products, "pro_ProductID", "SN");224 225 List<RelItemCosts> list=Lists.newArrayList();226 for (Map<String, Object> map : ordersList) {227 RelItemCosts itemCost=new RelItemCosts();228 if(map==null){229 continue;230 }231 Integer itemId=U.nvlInt(map.get("goods_id"),-99);232 BigDecimal costs=U.nvlDecimal(map.get("Dynamic_price"), BigDecimal.ZERO);233 itemCost.setId(U.randomUUID());234 itemCost.setOrdersId(orderId+"");235 itemCost.setOrdersNo(orderSN);236 itemCost.setItemId(itemId);237 itemCost.setItemNo(itemIdToItemSnMap.get(itemId+""));238 itemCost.setCosts(costs);239 itemCost.setCreateTime(new Date());240 itemCost.setModifyTime(new Date());241 list.add(itemCost);242 }243 244 this.statisticsService.insertRelItemCostsBatch(list);245 246 }247 248 }注意:
1、redis2.6版本使用lpush、rpop出列的時候會丟失數據。換成2.8及以上的版本運行正常。
2、由于應用會部署到多個結點,所以無法直接采用java的BlockingQueue阻塞隊列,幫采用redis提供的隊列支持。
3、如果要做到統計的絕對實時,最好采用大數據的實時計算的解決方案:kafka+storm 來實現
以上為隊列結合線程的實踐案例,供大家一起探討。
轉載請注明出處,請大家尊重作者的勞動成果。
新聞熱點
疑難解答