国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

flume監控spoolDir日志到HDFS整個流程小Demo

2019-11-09 13:30:34
字體:
來源:轉載
供稿:網友
1.編寫java代碼,隨機生成用戶ID號碼,區縣號碼,鄉鎮號碼(區縣和鄉鎮號碼用隨機的三位字母表示)和個人總收入格式樣例:779362a1-bf04-468a-91b6-a19d772f41fa####AFC####sfe####8091748。

2.用一個線程循環執行,用Thread.sleep(100)來控制線程執行一次停止100ms,防止cpu在死循環中過載,一秒生成10條數據,用log4j生成相應的日志到指定的目錄下面,其中日志每分鐘就生成一個格式為yyyy-MM-dd-HH-mm 例如:service.log.2016-10-13-11-32,最后在linux下用shell腳本啟動這個java程序。

3.編寫shell腳本,定時每分鐘從log4j生成的腳本copy當前時間前一分鐘產生的日志文件到被flume監控的文件夾內,注意copy過去應該在文件名后面加上.COMPLETED,copy完成后又把這個文件名的.COMPLETED去掉。例如:

#首先cp ./log4j/service.log.2016-10-13-11-37 ./monitor/service.log.2016-10-13-11-37.COMPLETED#然后mv ./monitor/service.log.2016-10-13-11-37.COMPLETED ./monitor/service.log.2016-10-13-11-37

主要是防止源日志文件太大copy的時候會花比較長的時間,到時候flume會拋異常,當然你還可以使用另外一種解決方案:直接move源日志文件到被flume監控的目錄中,不過這種方案沒有上面的方案優。

4.配置flume的conf文件

5.編輯crontab每分鐘執行這個腳本來拉取源日志文件。


環境: 1.使用的虛擬機為:vmware12 2.centOS6.5 3.hadoop2.2.0 單節點(主要測試用,所以直接用的單節點)4.Flume 1.6.0 (剛開始用的flume-ng-1.5.0-cdh5.4.5,結果配置中的一個方法在這個版本的flume包里找不到拋異常,就換了個版本搞定)這里寫圖片描述


java代碼如下: 其中需要配置log4j配置文件,以及添加log4j的依賴jar包

package com.lijie.test;import java.util.UUID;import org.apache.log4j.Logger;public class DataPRoduct { public static void main(String[] args) { Thread t1 = new Thread(new A()); t1.start(); }}class A extends Thread { private final Logger log = Logger.getLogger(A.class); public void run() { //無限循環 while (true) { //隨機產生一個用戶uuid UUID userId = UUID.randomUUID(); //產生一個隨機的用戶總資產 int num = (int) (Math.random() * 10000000) + 100000; //產生一個隨意的縣名 StringBuilder sb = new StringBuilder(); for (int i = 0; i < 3; i++) { char a = (char) (Math.random() * (90 - 65) + 65); sb.append(a); } String xian = sb.toString(); //產生一個隨機的鎮名 StringBuilder sb1 = new StringBuilder(); for (int i = 0; i < 3; i++) { char a = (char) (Math.random() * (122 - 97) + 97); sb1.append(a); } String zhen = sb1.toString(); //生成日志 log.info(userId + "####" + xian + "####" + zhen + "####" + num); //停0.1秒鐘 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }}

log4j的配置文件:

log4j.rootCategory=INFO, stdout , Rlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p [%t] %C.%M(%L) | %m%nlog4j.appender.R=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.R.File=/home/hadoop/log4j/service.loglog4j.appender.R.DatePattern = '.'yyyy-MM-dd-HH-mmlog4j.appender.R.layout=org.apache.log4j.PatternLayoutlog4j.appender.R.layout.ConversionPattern=%d %p [%t] %C.%M(%L) | %m%nlog4j.logger.com.xxx=DEBUGlog4j.logger.controllers=DEBUGlog4j.logger.vo=DEBUGlog4j.logger.notifiers=DEBUGlog4j.logger.com.opensymphony.oscache=WARNlog4j.logger.net.sf.navigator=WARNlog4j.logger.org.apache.commons=WARNlog4j.logger.org.apache.struts=WARNlog4j.logger.org.displaytag=WARNlog4j.logger.org.springframework=WARNlog4j.logger.org.apache.velocity=FATAL

啟動java程序的shell腳本 start.sh

APP_HOME=/home/hadoop/myjarAPP_CLASSPATH=$APP_HOME/binjarList=$(ls $APP_CLASSPATH|grep jar)echo $jarListfor i in $jarListdo APP_CLASSPATH="$APP_CLASSPATH/$i":doneecho $APP_CLASSPATHexport CLASSPATH=$CLASSPATH:$APP_CLASSPATHecho $CLASSPATHjava -Xms50m -Xmx250m com.lijie.test.DataProductecho Linux Test End

定時拉取源日志的shell腳本 mvlog.sh

#! /bin/bashDIR=$(cd `dirname $0`; pwd)mydate=`date +%Y-%m-%d-%H-%M -d '-1 minutes'`logName="service.log"monitorDir="/home/hadoop/monitor/"filePath="${DIR}"/log4j/""fileName="${logName}"".""${mydate}"echo "文件地址:${filePath}"echo "文件名字:${fileName}"if [ -f "${monitorDir}""${fileName}" ]then echo "文件存在,刪除文件" rm -rf "${monitorDir}""${fileName}"fiecho "開始復制文件"cp "${filePath}${fileName}" "${monitorDir}${fileName}"".COMPLETED"echo "日志復制完成,更改名字"mv "${monitorDir}${fileName}"".COMPLETED" "${monitorDir}${fileName}"echo "日志改名完成"exit

flume的配置文件:

#agent名, source、channel、sink的名稱a1.sources = r1a1.channels = c1a1.sinks = k1#具體定義sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /home/hadoop/monitor#具體定義channela1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 100#具體定義sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://192.168.80.123:9000/flume/%Y%m%da1.sinks.k1.hdfs.filePrefix = events-a1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.useLocalTimeStamp = true#不按照條數生成文件a1.sinks.k1.hdfs.rollCount = 0#HDFS上的文件達到128M時生成一個文件a1.sinks.k1.hdfs.rollSize = 134217728#HDFS上的文件達到60秒生成一個文件a1.sinks.k1.hdfs.rollInterval = 60#組裝source、channel、sinka1.sources.r1.channels = c1a1.sinks.k1.channel = c1

啟動flume的命令:

../bin/flume-ng agent -n a1 -c conf -f ./flume-conf.properties -Dflume.root.logger=DEBUG,console

crontab的配置

#首先crontab -e編輯下面的代碼然后保存* * * * * sh /home/hadoop/mvlog.sh#然后啟動crontab服務service crond start

準備工作進行好之后,執行java程序 sh ./start.sh

產生如下日志文件: 日志文件

日志的內容: 日志的內容

定時任務會拉取這個目錄下的日志到monitor目錄下,flume就會收集,手機完成后會在文件名添加.COMPLETED后綴: 被監控的目錄

hdfs的flume下面就會生成當天時間格式化的目錄,并且收集的數據會被put到該目錄下: hdfs

java代碼一直生成日志文件,crontab每隔一分鐘都會拉取日志到flume監控的目錄下面,flume就會把該文件收集到hdfs,這樣一個簡單的flume監控spoolDir日志到HDFS整個流程的小Demo就實現了。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 汶上县| 岳西县| 桃园县| 乾安县| 花垣县| 平定县| 平和县| 巴东县| 珲春市| 岳普湖县| 中山市| 射阳县| 新郑市| 太保市| 彭水| 定边县| 栾川县| 焦作市| 庆元县| 包头市| 厦门市| 云南省| 莎车县| 遂川县| 睢宁县| 贡山| 禹州市| 张北县| 濉溪县| 玉环县| 若尔盖县| 元阳县| 晋州市| 工布江达县| 宁武县| 宜兰县| 泸水县| 博乐市| 阿拉尔市| 尖扎县| 社会|