国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

Flume-NG內置計數器(監控)源碼級分析

2019-11-06 06:44:07
字體:
來源:轉載
供稿:網友
Flume的內置監控怎么整?這個問題有很多人問。目前了解到的信息是可以使用Cloudera Manager、Ganglia有圖形的監控工具,以及從瀏覽器獲取json串,或者自定義向其他監控系統匯報信息。那監控的信息是什么呢?就是各個組件的統計信息,比如成功接收的Event數量、成功發送的Event數量,處理的Transaction的數量等等。而且不同的組件有不同的Countor來做統計,目前直到1.5版本仍然只對三大組件:source、sink、channel進行統計分別是SourceCounter、SinkCounter、ChannelCounter,這三個計數器的統計項是固定的,就是你不能自己設置自己的統計項;另外還有ChannelPRocessorCounter和SinkProcessorCounter,這兩項目前沒有設置統計項,所以是目前還是“擺設”。另外有些同學可能也發現了,有些內置的組件使用CounterGroup這個來統計信息,這個可以自己隨意設置統計項,但是遺憾的是目前(1.5版本)這個可以自定義的計數器的信息還無法用在監控上,因為這只是一個單獨的類,并沒有繼承MonitoredCounterGroup這個抽象類。有些內置組件使用的是CounterGroup,所以監控時會沒有數據,不同的版本使用此CounterGroup的組件可能不同。下面我們重點介紹:SourceCounter、SinkCounter、ChannelCounter。  Flume-NG的所有統計信息、監控及相關的類都在org.apache.flume.instrumentation.http、org.apache.flume.instrumentation、org.apache.flume.instrumentation.util三個包下。  上面提到了MonitoredCounterGroup,這個類是用來跟蹤內部的統計指標的,注冊組件的MBean并跟蹤和更新統計值。需要監控的組件都要繼承這個類,這個類可以跟蹤flume內部的所有組件,但是目前只實現了3個。其中比較重要的方法有以下幾個:  (1)、構造方法MonitoredCounterGroup(Type type, String name, String... attrs),這個方法主要是設置組件的類型、名稱;然后將所有的attrs(這是設定的各個統計項)加入Map<String, AtomicLong> counterMap,值設定為0;然后初始化計數器的開始時間和結束時間,都設為0.  (2)、start()方法,會先注冊計數器,然后對所有統計項的統計值設為0;將開始時間設置為當前時間  (3)、register()方法,如果這個計數器還未注冊,將這個計數器的MBean進行注冊,就可以進行跟蹤了  (4)、stop()方法,會設置結束時間為當前時間;輸出各個統計項的信息。我們 Ctrl+C 結束進程時,最后顯示的統計信息就是來自這里。  其它方法都是獲取counterMap的中信息或者更新值等,比較簡單。  接下來我們看看,三個組件中各種統計項及其含義吧:  一、SourceCounter,繼承了MonitoredCounterGroup。主要統計項如下:  (1)"src.events.received",表示source接受的event個數;  (2)"src.events.accepted",表示source處理成功的event個數,和上面的區別就是上面雖然接受了可能沒處理成功;  (3)"src.append.received",表示調用append次數,在avrosource和thriftsource中調用;  (4)"src.append.accepted",表示append處理成功次數;  (5)"src.append-batch.received",表示appendBatch被調用的次數,在avrosource和thriftsource中調用;  (6)"src.append-batch.accepted",表示appendBatch處理成功次數;  (7)"src.open-connection.count",用在avrosource中表示打開連接的數量;  一般source調用都集中在前倆。  二、SinkCounter,繼承了MonitoredCounterGroup  (1)"sink.connection.creation.count",這個調用的地方頗多,都表示“鏈接”創建的數量,比如與HBase建立鏈接,與avrosource建立鏈接以及文件的打開等;  (2)"sink.connection.closed.count",對應于上面的stop操作、destroyConnection、close文件操作等。  (3)"sink.connection.failed.count",表示上面所表示“鏈接”時異常、失敗的次數;  (4)"sink.batch.empty",表示這個批次處理的event數量為0的情況;  (5)"sink.batch.underflow",表示這個批次處理的event的數量介于0和配置的batchSize之間;  (6)"sink.batch.complete",表示這個批次處理的event數量等于設定的batchSize;  (7)"sink.event.drain.attempt",準備處理的event的個數;  (8)"sink.event.drain.sucess",這個表示處理成功的event數量,與上面不同的是上面的是還未處理的。  三、ChannelCounter,繼承了MonitoredCounterGroup  (1)"channel.current.size",這個表示這個channel的當前容量;  (2)"channel.event.put.attempt",一般指的是在channel的事務當中,source的put操作中記錄嘗試發送event的個數;  (3)"channel.event.take.attempt",一般指的是在channel的事務中,sink的take操作記錄嘗試拿event的個數;  (4)"channel.event.put.success",一般指的是在channel的事務中,put成功的event的數量;  (5)"channel.event.take.success",一般指的是channel事務中,take成功的event的數量;  (6)"channel.capacity",指的是channel的容量,在channel的start方法中設置。  上面這些統計項都是固定的,我們可以根據需要增加相應項的值,可以在監控中查看組件的變化情況,從而掌握flume進程的運行情況。比如可以查看channel的容量從而了解到source和sink的相對處理速度,還有可以看source或者sink每個批次處理成功與失敗的次數,了解組件的運行狀況等等。  當然有些同學可能在自定義自己的組件時,想統計一些自己的統計項,這些統計項在上面三大組件中是沒有,怎么辦?自己定制啊,上面說了必須要繼承MonitoredCounterGroup這個抽象類,設定自己的統計項,然后將統計項設置成數組調用MonitoredCounterGroup的構造函數;然后在自定義的計數器中增加更新數值的方法。最后在自定義的組件中構造自定義的計數器,并啟用它的start方法,剩下的就是在該更新統計項數值的地方更新就可以了。  還有一個重要的內容就是監控的實現!沒錯,內置的有兩種HTTP方式(就是json串)和Ganglia,后者需要安裝Ganglia,前者非常簡單,只需要在Flume的啟動命令中加上:-Dflume.monitoring.type=http -Dflume.monitoring.port=XXXX  ,最后的XXXX是你需要設置的端口!然后你就可以在瀏覽器上通過訪問這個Flume所在節點的ip:XXXX/metrics,不斷刷新就可以看到最新的組件統計信息。關于Ganglia的請讀者自行組建Ganglia集群并參考用戶指南來操作。  如果我想自己實現一個server向其他系統匯報信息,咋整?目前有至少兩個方法:  一、就是上面的HTTP啊,你可以不斷去獲取json串,自己解析出來各個統計指標,然后剩下的就是你想怎么整就怎么整吧。  二、就是自己實現一個類似HTTP的server,必須實現org.apache.flume.instrumentation.MonitorService接口,這個接口只有倆方法:start和stop。這個接口繼承自Configurable接口所以擁有可以讀取配置文件的configure(configure(Context context))方法,來獲取一些配置信息。  以HTTP為例(對應的類是org.apache.flume.instrumentation.http.HTTPMetriCSServer),它的start方法啟動了一個jetty作為web server,提供WEB服務。并實現了AbstractHandler的一個處理數據的類HTTPMetricsHandler,這個類的handle(String target, HttpServletRequest request, HttpServletResponse response,int dispatch)方法來設置一些WEB頁面的格式以及通過JMXPollUtil.getAllMBeans()獲取所有組件注冊的MBean構成的Map<String, Map<String, String>> metricsMap,遍歷這個metricsMap將這個metricsMap轉換成json輸出到web頁面。stop方法就是一些清理工作,這里是關閉jetty server。很簡單吧,所以我們完全可以實現一個server,在start方法中啟動一個線程每隔一秒或者自己定遍歷這個metricsMap,寫入MySQL、HBase或者別的地方,你隨便。。。  你可以在定義的組件中調用自己的計數器,然后將計數器、監控類、自定義組件(source、sink、channel)打包放到lib下,在啟動命令后加-Dflume.monitoring.type=AAAAA -Dflume.monitoring.node=BBBB,就可以了。注意,Dflume.monitoring.type這個好似必須要設置的,就是你自己的監控類(這里是AAAAA),后面的可有可無都是一些參數,你可以自定義參數名,比如可以設置數據庫服務器IP、端口等。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 日喀则市| 荣成市| 江阴市| 古浪县| 驻马店市| 福海县| 固原市| 紫阳县| 丹阳市| 南川市| 伊宁县| 利辛县| 平潭县| 翁牛特旗| 平昌县| 简阳市| 仙居县| 浙江省| 西城区| 玉林市| 如皋市| 从江县| 裕民县| 乐亭县| 乌拉特前旗| 佛坪县| 辛集市| 青浦区| 海安县| 大名县| 南充市| 得荣县| 理塘县| 罗源县| 舟曲县| 黑河市| 永登县| 吐鲁番市| 新疆| 紫阳县| 玛沁县|