国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

zookeeper節點Watch機制實例展示

2019-11-14 23:18:50
字體:
來源:轉載
供稿:網友
zookeeper節點Watch機制實例展示

znode以某種方式發生變化時,“觀察”(watch)機制可以讓客戶端得到通知.可以針對ZooKeeper服務的“操作”來設置觀察,該服務的其他操作可以觸發觀察.

實現Watcher,復寫PRocess方法,處理收到的變更

    /**     * Watcher Server,處理收到的變更     * @param watchedEvent     */    @Override    public void process(WatchedEvent watchedEvent) {        LOG.info("收到事件通知:" + watchedEvent.getState() );        if ( Event.KeeperState.SyncConnected == watchedEvent.getState() ) {            connectedSemaphore.countDown();        }    }

如下實例展示操作節點變化:

public class ZkWatchAPI implements Watcher {    public static final Logger LOG = LoggerFactory.getLogger(ZkWatchAPI.class);    private static final int session_TIMEOUT = 10000;    private ZooKeeper zk = null;    private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );    /**     * 連接Zookeeper     * @param connectString  Zookeeper服務地址     */    public void connectionZookeeper(String connectString){        connectionZookeeper(connectString,SESSION_TIMEOUT);    }    /**     * <p>連接Zookeeper</p>     * <pre>     *     [關于connectString服務器地址配置]     *     格式: 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181     *     這個地址配置有多個ip:port之間逗號分隔,底層操作     *     ConnectStringParser connectStringParser =  new ConnectStringParser(“192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181”);     *     這個類主要就是解析傳入地址列表字符串,將其它保存在一個ArrayList中     *     ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();     *     接下去,這個地址列表會被進一步封裝成StaticHostProvider對象,并且在運行過程中,一直是這個對象來維護整個地址列表。     *     ZK客戶端將所有Server保存在一個List中,然后隨機打亂(這個隨機過程是一次性的),并且形成一個環,具體使用的時候,從0號位開始一個一個使用。     *     因此,Server地址能夠重復配置,這樣能夠彌補客戶端無法設置Server權重的缺陷,但是也會加大風險。     *     *     [客戶端和服務端會話說明]     *     ZooKeeper中,客戶端和服務端建立連接后,會話隨之建立,生成一個全局唯一的會話ID(Session ID)。     *     服務器和客戶端之間維持的是一個長連接,在SESSION_TIMEOUT時間內,服務器會確定客戶端是否正常連接(客戶端會定時向服務器發送heart_beat,服務器重置下次SESSION_TIMEOUT時間)。     *     因此,在正常情況下,Session一直有效,并且ZK集群所有機器上都保存這個Session信息。     *     在出現網絡或其它問題情況下(例如客戶端所連接的那臺ZK機器掛了,或是其它原因的網絡閃斷),客戶端與當前連接的那臺服務器之間連接斷了,     *     這個時候客戶端會主動在地址列表(實例化ZK對象的時候傳入構造方法的那個參數connectString)中選擇新的地址進行連接。     *     *     [會話時間]     *     客戶端并不是可以隨意設置這個會話超時時間,在ZK服務器端對會話超時時間是有限制的,主要是minSessionTimeout和maxSessionTimeout這兩個參數設置的。     *     如果客戶端設置的超時時間不在這個范圍,那么會被強制設置為最大或最小時間。 默認的Session超時時間是在2 * tickTime ~ 20 * tickTime     * </pre>     * @param connectString  Zookeeper服務地址     * @param sessionTimeout Zookeeper連接超時時間     */    public void connectionZookeeper(String connectString, int sessionTimeout){        this.releaseConnection();        try {            // ZK客戶端允許我們將ZK服務器的所有地址都配置在這里            zk = new ZooKeeper(connectString, sessionTimeout, this );            // 使用CountDownLatch.await()的線程(當前線程)阻塞直到所有其它擁有CountDownLatch的線程執行完畢(countDown()結果為0)            connectedSemaphore.await();        } catch ( InterruptedException e ) {            LOG.error("連接創建失敗,發生 InterruptedException , e " + e.getMessage(), e);        } catch ( IOException e ) {            LOG.error( "連接創建失敗,發生 IOException , e " + e.getMessage(), e );        }    }    /**     * <p>創建zNode節點, String create(path<節點路徑>, data[]<節點內容>, List(ACL訪問控制列表), CreateMode<zNode創建類型>) </p><br/>     * <pre>     *     節點創建類型(CreateMode)     *     1、PERSISTENT:持久化節點     *     2、PERSISTENT_SEQUENTIAL:順序自動編號持久化節點,這種節點會根據當前已存在的節點數自動加 1     *     3、EPHEMERAL:臨時節點客戶端,session超時這類節點就會被自動刪除     *     4、EPHEMERAL_SEQUENTIAL:臨時自動編號節點     * </pre>     * @param path zNode節點路徑     * @param data zNode數據內容     * @return 創建成功返回true, 反之返回false.     */    public boolean createPath( String path, String data ) {        try {            String zkPath =  this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            LOG.info( "節點創建成功, Path: " + zkPath + ", content: " + data );            return true;        } catch ( KeeperException e ) {            LOG.error( "節點創建失敗, 發生KeeperException! path: " + path + ", data:" + data                    + ", errMsg:" + e.getMessage(), e );        } catch ( InterruptedException e ) {            LOG.error( "節點創建失敗, 發生 InterruptedException! path: " + path + ", data:" + data                    + ", errMsg:" + e.getMessage(), e );        }        return false;    }    /**     * <p>刪除一個zMode節點, void delete(path<節點路徑>, stat<數據版本號>)</p><br/>     * <pre>     *     說明     *     1、版本號不一致,無法進行數據刪除操作.     *     2、如果版本號與znode的版本號不一致,將無法刪除,是一種樂觀加鎖機制;如果將版本號設置為-1,不會去檢測版本,直接刪除.     * </pre>     * @param path zNode節點路徑     * @return 刪除成功返回true,反之返回false.     */    public boolean deletePath( String path ){        try {            this.zk.delete(path,-1);            LOG.info( "節點刪除成功, Path: " + path);            return true;        } catch ( KeeperException e ) {            LOG.error( "節點刪除失敗, 發生KeeperException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        } catch ( InterruptedException e ) {            LOG.error( "節點刪除失敗, 發生 InterruptedException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        }        return false;    }    /**     * <p>更新指定節點數據內容, Stat setData(path<節點路徑>, data[]<節點內容>, stat<數據版本號>)</p>     * <pre>     *     設置某個znode上的數據時如果為-1,跳過版本檢查     * </pre>     * @param path zNode節點路徑     * @param data zNode數據內容     * @return 更新成功返回true,返回返回false     */    public boolean writeData( String path, String data){        try {            Stat stat = this.zk.setData(path, data.getBytes(), -1);            LOG.info( "更新數據成功, path:" + path + ", stat: " + stat );            return true;        } catch (KeeperException e) {            LOG.error( "更新數據失敗, 發生KeeperException! path: " + path + ", data:" + data                    + ", errMsg:" + e.getMessage(), e );        } catch (InterruptedException e) {            LOG.error( "更新數據失敗, 發生InterruptedException! path: " + path + ", data:" + data                    + ", errMsg:" + e.getMessage(), e );        }        return false;    }    /**     * <p>讀取指定節點數據內容,byte[] getData(path<節點路徑>, watcher<監視器>, stat<數據版本號>)</p>     * @param path zNode節點路徑     * @return 節點存儲的值,有值返回,無值返回null     */    public String readData( String path ){        String data = null;        try {            data = new String( this.zk.getData( path, false, null ) );            LOG.info( "讀取數據成功, path:" + path + ", content:" + data);        } catch (KeeperException e) {            LOG.error( "讀取數據失敗,發生KeeperException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        } catch (InterruptedException e) {            LOG.error( "讀取數據失敗,發生InterruptedException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        }        return  data;    }    /**     * <p>獲取某個節點下的所有子節點,List getChildren(path<節點路徑>, watcher<監視器>)該方法有多個重載</p>     * @param path zNode節點路徑     * @return 子節點路徑集合 說明,這里返回的值為節點名     * <pre>     *     eg.     *     /node     *     /node/child1     *     /node/child2     *     getChild( "node" )戶的集合中的值為["child1","child2"]     * </pre>     *     *     *     * @throws KeeperException     * @throws InterruptedException     */    public List<String> getChild( String path ){        try{            List<String> list=this.zk.getChildren( path, false );            if(list.isEmpty()){                LOG.info( "中沒有節點" + path );            }            return list;        }catch (KeeperException e) {            LOG.error( "讀取子節點數據失敗,發生KeeperException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        } catch (InterruptedException e) {            LOG.error( "讀取子節點數據失敗,發生InterruptedException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        }        return null;    }    /**     * <p>判斷某個zNode節點是否存在, Stat exists(path<節點路徑>, watch<并設置是否監控這個目錄節點,這里的 watcher 是在創建 ZooKeeper 實例時指定的 watcher>)</p>     * @param path zNode節點路徑     * @return 存在返回true,反之返回false     */    public boolean isExists( String path ){        try {            Stat stat = this.zk.exists( path, false );            return null != stat;        } catch (KeeperException e) {            LOG.error( "讀取數據失敗,發生KeeperException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        } catch (InterruptedException e) {            LOG.error( "讀取數據失敗,發生InterruptedException! path: " + path                    + ", errMsg:" + e.getMessage(), e );        }        return false;    }    /**     * Watcher Server,處理收到的變更     * @param watchedEvent     */    @Override    public void process(WatchedEvent watchedEvent) {        LOG.info("收到事件通知:" + watchedEvent.getState() );        if ( Event.KeeperState.SyncConnected == watchedEvent.getState() ) {            connectedSemaphore.countDown();        }    }    /**     * 關閉ZK連接     */    public void releaseConnection() {        if ( null != zk ) {            try {                this.zk.close();            } catch ( InterruptedException e ) {                LOG.error("release connection error ," + e.getMessage() ,e);            }        }    }    public static void main(String [] args){        // 定義父子類節點路徑        String rootPath = "/nodeRoot";        String child1Path = rootPath + "/nodeChildren1";        String child2Path = rootPath + "/nodeChildren2";        ZkWatchAPI zkWatchAPI = new ZkWatchAPI();        // 連接zk服務器        zkWatchAPI.connectionZookeeper("192.168.155.47:2181");        // 創建節點數據        if ( zkWatchAPI.createPath( rootPath, "<父>節點數據" ) ) {            System.out.println( "節點[" + rootPath + "]數據內容[" + zkWatchAPI.readData( rootPath ) + "]" );        }        // 創建子節點, 讀取 + 刪除        if ( zkWatchAPI.createPath( child1Path, "<父-子(1)>節點數據" ) ) {            System.out.println( "節點[" + child1Path + "]數據內容[" + zkWatchAPI.readData( child1Path ) + "]" );            zkWatchAPI.deletePath(child1Path);            System.out.println( "節點[" + child1Path + "]刪除值后[" + zkWatchAPI.readData( child1Path ) + "]" );        }        // 創建子節點, 讀取 + 修改        if ( zkWatchAPI.createPath( child2Path, "<父-子(2)>節點數據" ) ) {            System.out.println( "節點[" + child2Path + "]數據內容[" + zkWatchAPI.readData( child2Path ) + "]" );            zkWatchAPI.writeData( child2Path, "<父-子(2)>節點數據,更新后的數據" );            System.out.println( "節點[" + child2Path+ "]數據內容更新后[" + zkWatchAPI.readData( child2Path ) + "]" );        }        // 獲取子節點        List<String> childPaths = zkWatchAPI.getChild(rootPath);        if(null != childPaths){            System.out.println( "節點[" + rootPath + "]下的子節點數[" + childPaths.size() + "]" );            for(String childPath : childPaths){                System.out.println(" |--節點名[" +  childPath +  "]");            }        }        // 判斷節點是否存在        System.out.println( "檢測節點[" + rootPath + "]是否存在:" + zkWatchAPI.isExists(rootPath)  );        System.out.println( "檢測節點[" + child1Path + "]是否存在:" + zkWatchAPI.isExists(child1Path)  );        System.out.println( "檢測節點[" + child2Path + "]是否存在:" + zkWatchAPI.isExists(child2Path)  );        zkWatchAPI.releaseConnection();    }}
View Code

代碼運行結果:

   [     74]   INFO - rg.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.155.47/192.168.155.47:2181, initiating session     [     97]   INFO - rg.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.155.47/192.168.155.47:2181, sessionid = 0x24c11eded7f000b, negotiated timeout = 10000     [     99]   INFO -       com.test.zk.ZkWatchAPI - 收到事件通知:SyncConnected     [    119]   INFO -       com.test.zk.ZkWatchAPI - 節點創建成功, Path: /nodeRoot, content: <父>節點數據     [    130]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot, content:<父>節點數據     節點[/nodeRoot]數據內容[<父>節點數據]     [    140]   INFO -       com.test.zk.ZkWatchAPI - 節點創建成功, Path: /nodeRoot/nodeChildren1, content: <父-子(1)>節點數據     [    145]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot/nodeChildren1, content:<父-子(1)>節點數據     節點[/nodeRoot/nodeChildren1]數據內容[<父-子(1)>節點數據]     [    156]   INFO -       com.test.zk.ZkWatchAPI - 節點刪除成功, Path: /nodeRoot/nodeChildren1     [    171]  ERROR -       com.test.zk.ZkWatchAPI - 讀取數據失敗,發生KeeperException! path: /nodeRoot/nodeChildren1, errMsg:KeeperErrorCode = NoNode for /nodeRoot/nodeChildren1 ...     節點[/nodeRoot/nodeChildren1]刪除值后[null]     [    185]   INFO -       com.test.zk.ZkWatchAPI - 節點創建成功, Path: /nodeRoot/nodeChildren2, content: <父-子(2)>節點數據     [    200]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot/nodeChildren2, content:<父-子(2)>節點數據     節點[/nodeRoot/nodeChildren2]數據內容[<父-子(2)>節點數據]     [    213]   INFO -       com.test.zk.ZkWatchAPI - 更新數據成功, path:/nodeRoot/nodeChildren2, stat: 21474836549,21474836550,1426235422098,1426235422123,1,0,0,0,43,0,21474836549     [    228]   INFO -       com.test.zk.ZkWatchAPI - 讀取數據成功, path:/nodeRoot/nodeChildren2, content:<父-子(2)>節點數據,更新后的數據     節點[/nodeRoot/nodeChildren2]數據內容更新后[<父-子(2)>節點數據,更新后的數據]     節點[/nodeRoot]下的子節點數[1]     |--節點名[nodeChildren2]     檢測節點[/nodeRoot]是否存在:true     檢測節點[/nodeRoot/nodeChildren1]是否存在:false     檢測節點[/nodeRoot/nodeChildren2]是否存在:true     [    319]   INFO - rg.apache.zookeeper.ClientCnxn - EventThread shut down     [    319]   INFO - org.apache.zookeeper.ZooKeeper - Session: 0x24c11eded7f000b closed

客戶端命令行查看數據:

轉載請注明出處:[http://m.survivalescaperooms.com/dennisit/p/4340746.html]


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 元朗区| 苗栗县| 台湾省| 贵南县| 彭阳县| 大姚县| 墨脱县| 开原市| 西安市| 兖州市| 邢台县| 新沂市| 兴和县| 霸州市| 平定县| 家居| 合作市| 铁力市| 通许县| 凉城县| 建始县| 铁力市| 托克逊县| 潼关县| 乳山市| 周至县| 乐东| 娱乐| 乌拉特前旗| 南昌市| 裕民县| 铜川市| 德阳市| 寿阳县| 咸阳市| 镇江市| 双流县| 阿克陶县| 静安区| 修文县| 伊金霍洛旗|