webtail 能夠持續讀取一個文件,并將文件內容通過websocket,實時推送到web端,webtail文件讀取基于linux的inotify,所以沒有可移植性.
文件讀取
先介紹下背景,之前遇到過服務器上需要長時間tail一個日志,之前經常是通過一個終端連到服務器上,但是對于長時間觀察,就不太方便:老是要開著終端,沒法用手機等移動設備查看;多人共享查看比較麻煩,都需要登錄到服務器上。
webtail實現類似于tail,能夠持續讀取一個文件,并將文件內容通過websocket,實時推送到web端。webtail文件讀取基于linux的inotify,所以沒有可移植性,websocket使用基于asio的websocketpp,代碼維護在這里。
webtail代碼維護在https://gitorious.org/webtail/webtail,目前還是一個簡單可用的小應用,還有很多可以提升的地方。
本文介紹下文件讀取的部分.
文件讀取分為兩個部分,文件讀取和文件監控.
文件讀取:
由于unix的文件多次打開獨立維護file table,共享v-node table(參照APUE 3.10 file sharing)。因此只維護一個文件描述符和當前文件偏移,每次讀取通過fstat獲取文檔當前大小,和維護的文件偏移進行對比,如果小于文件大小,則讀取文件直到末尾,并輸出。
這里有幾點是模仿tailf的,首先是不修改文件訪問時間,這是通過在open系統調用中,增加O_NOATIME,按照man open的解釋:Do not update the file last access time (st_atime in the inode) when the file is read(2). This flag is intended for use by indexing or backup programs, where its use can significantly reduce the amount of disk activity. This flag may not be effective on all file systems. One example is NFS, where the server main?tains the access time. 啟動了這個參數,通過read調用讀取文件的時候,不會修改atime了。
其次是首次讀取的時候,讀取文件最后10行,由于websocket發送沒有做緩存,所以從web端沒法看見,這里代碼也是參照了tailf,代碼如下:
- char *buffer = new char[initLen * BUFSIZ];
- char *p = buffer;
- char lineBuffer[BUFSIZ];
- int readSize;
- int lineCount = 0;
- bool head = true;
- int i = 0, j = 0;
- while((readSize = ::read(fd, lineBuffer, BUFSIZ - 1)) > 0) {
- for(i = 0, j = 0; i < readSize; ++i) {
- // read line, save to buffer
- if(lineBuffer[i] == 'n') {
- std::memcpy(p, lineBuffer + j, i - j + 1);
- std::memset(p + i - j + 1, '', 1);
- j = i + 1;
- if(++lineCount >= initLen) {
- lineCount = 0;
- head = false;
- }
- p = buffer + (lineCount * BUFSIZ);
- }
- }
- // read break in the middle of line
- if(j < i) {
- // finished read all files
- if(readSize < BUFSIZ) {
- std::memcpy(p, lineBuffer + j, i - j + 1);
- std::memset(p + i - j + 1, '', 1);
- ++lineCount;
- } else if (j == 0){ // long line drop?
- continue;
- } else {
- // not finished, seek to line begin
- curPos = lseek(fd, j - i -1, SEEK_CUR);
- }
- }
- }
- std::string initReadResult;
- if(head) {
- for(i = 0; i < lineCount; ++i) {
- initReadResult += (buffer + i * BUFSIZ);
- }
- } else {
- for(i = lineCount; i < initLen; ++i) {
- initReadResult += (buffer + i * BUFSIZ);
- }
- for(i = 0; i < lineCount; ++i) {
- initReadResult += (buffer + i * BUFSIZ);
- } //Vevb.com
- }
- curPos = lseek(fd, 0, SEEK_CUR);
- delete buffer;
首先聲明一段用來保存最終n行的緩存,緩存最長行長度是BUFSIZ,這個在linux中的定義是8192字節,然后每次讀取BUFSIZ-1個字節,最后一個用來放,類似fgets的實現,解析其中的換行符,由于只打算在linux中使用,所以只解析n,最后根據狀態,從緩存中讀取最后n行數據到string中.
如果不是初始讀取,之前已經說過邏輯了,將fstat獲取到的文件大小和保存的當前偏移做比較,如果有新的內容,則讀取,沒有就直接返回。
文件監控:
文件監控直接通過了linux的inotify接口實現。這里沒有考慮移植性,也就沒像tailf那樣,通過宏來判斷是否支持inotify,如果不支持,降級使用循環輪尋的方式讀取。
inotify的使用還是比較方便的基本上就是:inotify_init,inotify_add_watch,然后配合read系統調用,獲取文件修改信息。因此實現也非常方便。
首先是在構造函數里面初始化inotify:inotifyFd = inotify_init();
然后提供一個watch接口,通過傳入前文描述的TFile對象和內容讀取的回調函數,添加對應文件的監控和回調,代碼如下:
- void FileWatcher::watch ( boost::shared_ptr< TFile > tFile, std::list< FileWatcher::ReadCallBack > callBackList )
- {
- if(!tFile->hasError() && !callBackList.empty()) {
- int wd = inotify_add_watch(inotifyFd, tFile->name().c_str(), IN_MODIFY);
- if(wd > 0) {
- tFileMap.insert(std::make_pair<int, boost::shared_ptr<TFile> >(wd, tFile));
- callBackMap.insert(std::make_pair<int, std::list<ReadCallBack> >(wd, callBackList));
- //init read
- std::string initContent = tFile->read();
- BOOST_FOREACH(ReadCallBack &callback, callBackList) {
- callback(initContent);
- }
- }
- }
- }
這里通過TFile的文件名,向內核注冊添加該文件的modify事件,并且在注冊成功之后,進行初始讀取(這里有個小問題,由于后面websocket端沒有做緩存,所以由于初始讀取的時候還沒有任何websocket客戶端連接,所以通過web無法讀取初始內容,也就是文件最后10行)。同時,這個類維護兩個hashmap,分別是監聽描述符wd->tFile和wd->callbacklist。
監聽完成后,就是啟動監聽,也就是通過讀取fd,感知被監聽文件的變更,由于這里只監聽了文件修改,那么讀取到這個事件之后,就可以對該文件進行增量讀取(前文已經描述了讀取方法),代碼如下:
- char * buffer = new char[inotifyBufferSize];
- while(!_interrupted) {
- if(read(inotifyFd, buffer, inotifyBufferSize) < 0) {
- if(errno == EINTR) {
- // interrupt
- delete buffer;
- return;
- }
- }
- struct inotify_event *event = ( struct inotify_event * ) buffer;
- int wd = event->wd;
- BOOST_AUTO(tFileIter, tFileMap.find(wd));
- if(tFileIter != tFileMap.end()) {
- boost::shared_ptr<TFile> tFile = tFileIter->second;
- std::string content = tFile->read();
- BOOST_AUTO(iter, callBackMap.find(wd));
- if(iter != callBackMap.end()) {
- std::list<ReadCallBack> callbacks = iter->second;
- BOOST_FOREACH(ReadCallBack &callback, callbacks) {
- callback(content);
- }
- }
- }
- }
- delete buffer;
這里參照inotify的文檔,首先讀取緩沖區大小設置為:static const int inotifyBufferSize = sizeof(struct inotify_event) + NAME_MAX + 1;
也就是inotify_event結構的長度,和名字最大值。由于inotify_event是變長字段(包含一個可選的文件名字段),所以這里采用了系統限制的文件名最大值NAME_MAX,這個宏在climits中定義,在linux中大小為255字節。
然后通過系統調用read,讀取文件描述符inotifyFd,這里如果沒有新的事件產生,read會進入阻塞狀態,節省系統資源。如果有返回,則處理返回的inotify_event對象(注意在監聽modify事件的時候,是沒有文件名的)。通過結構中的wd,從之前保存的hashmap中獲取對應的tFile對象進行增量讀取,然后再讀取wd對應的回調函數,將讀取內容返回。
這里有個小問題需要處理,就是如何中斷讀取。之前為了在gtest中能夠通過單元測試的方式進行測試,通過查看手冊可以知道,如果read調用被系統信號中斷,會標記錯誤碼為EINTR。所以,當讀取失敗的時候,可以通過對ERRNO檢查,判斷是否是信號中斷。
由于程序會一直運行,知道通過信號終止,所以析構變的不是很重要了。這里析構函數里面通過調用inotify_rm_watch將之前保存的wd全部去掉,然后通過close調用交inotify的文件描述符關閉即可:
- FileWatcher::~FileWatcher()
- {
- if(inotifyFd > 0) {
- boost::unordered::unordered_map<int, boost::shared_ptr<TFile> >::iterator iter;
- for(iter = tFileMap.begin(); iter != tFileMap.end(); ++iter) {
- inotify_rm_watch(inotifyFd, iter->first);
- }
- close(inotifyFd);
- }
- }
- websocket
前面介紹了服務器端如何監聽和增量讀取文件,這里通過基于boost asio的websocketpp,實現了一個簡單的websocket服務端,能夠和瀏覽器進行通信,將讀取到的文件通過websocket協議進行實時傳送.
關于websocket的簡單介紹,可以參考維基百科,websocket協議,在rfc6455中定義.
websocketpp對websocket和簡單的http都進行了比較好的封裝,只要實現幾個handler,就可以完成對連接、消息等的操作和控制,主要需要處理的,可能有以下的handler,代碼如下:
- typedef lib::function<void(connection_hdl)> open_handler;
- typedef lib::function<void(connection_hdl)> close_handler;
- typedef lib::function<void(connection_hdl)> http_handler;
- typedef lib::function<void(connection_hdl,message_ptr)> message_handler
分別處理連接創建,連接關閉,http請求和消息請求,其中connection_hdl是連接的weak_ptr.
這里對websocket使用很簡單,唯一的需求,就是維護已經建立的連接(既創建連接的時候記錄,關閉連接的時候移出),然后通過將自己的回調注冊到文件監控類中,實時的將消息推送到websocket客戶端。
首先,維護一個set,用來保存當前已經建立的所有連接:
- typedef std::set<websocketpp::connection_hdl,boost::owner_less<websocketpp::connection_hdl> > ConnectionSet;
然后在建立連接的時候插入到這個set中,代碼如下:
- void WebSocketServer::onOpen ( websocketpp::connection_hdl hdl )
- {
- boost::lock_guard<boost::mutex> lock(_mutex);
- _conns.insert(hdl);
- }
在連接關閉的時候,移除連接,代碼如下:
- void WebSocketServer::onClose ( websocketpp::connection_hdl hdl )
- {
- boost::lock_guard<boost::mutex> lock(_mutex);
- _conns.erase(hdl);
- }
另外,定義一個讓filewatcher調用的回調,代碼如下:
- void WebSocketServer::write ( const std::string& content )
- {
- boost::lock_guard<boost::mutex> lock(_mutex);
- BOOST_AUTO(it, _conns.begin());
- for(; it != _conns.end(); ++it) {
- _s.send(*it, content, websocketpp::frame::opcode::text);
- }
- }
這個回調很簡單,就是當有讀取到內容的時候,遍歷連接集合,向每個連接發送具體的內容,最后,為了方便用戶訪問,當發現是標準http請求的時候,我們返回一個簡單的html,用于顯示和建立websocket連接,如果不指定具體的http_handler,websocketpp在發現請求是http的時候,會返回http的426錯誤,Upgrade Required,代碼如下:
- void WebSocketServer::httpHandler ( websocketpp::connection_hdl hdl )
- {
- server::connection_ptr connPtr = _s.get_con_from_hdl(hdl);;
- connPtr->set_status(websocketpp::http::status_code::ok);
- connPtr->set_body(htmlContent);
- }
另外,還有一個小坑,按照websocketpp的example,在啟動的時候都是直接調用server的listen函數,而且使用的都是只有端口號的那個實現,實際使用過程中,發現這個只有端口號的實現,直接使用了ipv6協議,雖說如果本機同時支持ipv6和ipv4的情況下,兩個協議對應的端口都會監聽,但是遇到了服務器上關閉了ipv6,會導致boost asio拋出address_family_not_supported異常,導致應用被迫退出,為了兼容這種方式,對這個異常進行了抓取,重新降級嘗試ipv4協議,這樣能夠很好的在只有ipv4的服務器上進行使用,代碼如下:
- try{
- _s.listen(port);
- } catch(boost::system::system_error const& e) {
- if(e.code() == boost::asio::error::address_family_not_supported) {
- _s.listen(boost::asio::ip::tcp::v4(), port);
- }
- } catch (...) {
- throw;
- }
新聞熱點
疑難解答