和同樣是計(jì)算框架的MaPReduce相比,Mapreduce集群上運(yùn)行的是Job,而Storm集群上運(yùn)行的是Topology。但是Job在運(yùn)行結(jié)束之后會(huì)自行結(jié)束,Topology卻只能被手動(dòng)的kill掉,否則會(huì)一直運(yùn)行下去。
Storm集群中有兩種節(jié)點(diǎn),一種是控制節(jié)點(diǎn)(Nimbus節(jié)點(diǎn)),另一種是工作節(jié)點(diǎn)(Supervisor節(jié)點(diǎn))。所有Topology任務(wù)的提交必須在Storm客戶端節(jié)點(diǎn)上進(jìn)行(需要配置~/.storm/storm.yaml文件),由Nimbus節(jié)點(diǎn)分配給其他Supervisor節(jié)點(diǎn)進(jìn)行處理。Nimbus節(jié)點(diǎn)首先將提交的Topology進(jìn)行分片,分成一個(gè)個(gè)的Task,并將Task和Supervisor相關(guān)的信息提交到zookeeper集群上,Supervisor會(huì)去zookeeper集群上認(rèn)領(lǐng)自己的Task,通知自己的Worker進(jìn)程進(jìn)行Task的處理。總體的Topology處理流程圖為:

每個(gè)Topology都由Spout和Bolt組成,在Spout和Bolt傳遞信息的基本單位叫做Tuple,由Spout發(fā)出的連續(xù)不斷的Tuple及其在相應(yīng)Bolt上處理的子Tuple連起來(lái)稱為一個(gè)Steam,每個(gè)Stream的命名是在其首個(gè)Tuple被Spout發(fā)出的時(shí)候,此時(shí)Storm會(huì)利用內(nèi)部的Ackor機(jī)制保證每個(gè)Tuple可靠的被處理。
而Tuple可以理解成鍵值對(duì),其中,鍵就是在定義在declareStream方法中的Fields字段,而值就是在emit方法中發(fā)送的Values字段。
2 Configuration在運(yùn)行Topology之前,可以通過(guò)一些參數(shù)的配置來(lái)調(diào)節(jié)運(yùn)行時(shí)的狀態(tài),參數(shù)的配置是通過(guò)Storm框架部署目錄下的conf/storm.yaml文件來(lái)完成的。在次文件中可以配置運(yùn)行時(shí)的Storm本地目錄路徑、運(yùn)行時(shí)Worker的數(shù)目等。
在代碼中,也可以設(shè)置Config的一些參數(shù),但是優(yōu)先級(jí)是不同的,不同位置配置Config參數(shù)的優(yōu)先級(jí)順序?yàn)椋?/p>
default.yaml<storm.yaml<topology內(nèi)部的configuration<內(nèi)部組件的special configuration<外部組件的special configuration
在storm.yaml中常用的幾個(gè)選項(xiàng)為:
配置選項(xiàng)名稱 | 配置選項(xiàng)作用 |
topology.max.task.parallelism | 每個(gè)Topology運(yùn)行時(shí)最大的executor數(shù)目 |
topology.workers | 每個(gè)Topology運(yùn)行時(shí)的worker的默認(rèn)數(shù)目,若在代碼中設(shè)置,則此選項(xiàng)值被覆蓋 |
storm.zookeeper.servers | zookeeper集群的節(jié)點(diǎn)列表 |
storm.local.dir | Storm用于存儲(chǔ)jar包和臨時(shí)文件的本地存儲(chǔ)目錄 |
storm.zookeeper.root | Storm在zookeeper集群中的根目錄,默認(rèn)是“/” |
ui.port | Storm集群的UI地址端口號(hào),默認(rèn)是8080 |
nimbus.host: | Nimbus節(jié)點(diǎn)的host |
supervisor.slots.ports | Supervisor節(jié)點(diǎn)的worker占位槽,集群中的所有Topology公用這些槽位數(shù),即使提交時(shí)設(shè)置了較大數(shù)值的槽位數(shù),系統(tǒng)也會(huì)按照當(dāng)前集群中實(shí)際剩余的槽位數(shù)來(lái)進(jìn)行分配,當(dāng)所有的槽位數(shù)都分配完時(shí),新提交的Topology只能等待,系統(tǒng)會(huì)一直監(jiān)測(cè)是否有空余的槽位空出來(lái),如果有,就再次給新提交的Topology分配 |
supervisor.worker.timeout.secs | Worker的超時(shí)時(shí)間,單位為秒,超時(shí)后,Storm認(rèn)為當(dāng)前worker進(jìn)程死掉,會(huì)重新分配其運(yùn)行著的task任務(wù) |
drpc.servers | 在使用drpc服務(wù)時(shí),drpc server的服務(wù)器列表 |
drpc.port | 在使用drpc服務(wù)時(shí),drpc server的服務(wù)端口 |
Spout是Stream的消息產(chǎn)生源, Spout組件的實(shí)現(xiàn)可以通過(guò)繼承BaseRichSpout類或者其他*Spout類來(lái)完成,也可以通過(guò)實(shí)現(xiàn)IRichSpout接口來(lái)實(shí)現(xiàn)。
需要根據(jù)情況實(shí)現(xiàn)Spout類中重要的幾個(gè)方法有:

當(dāng)一個(gè)Task被初始化的時(shí)候會(huì)調(diào)用此open方法。一般都會(huì)在此方法中對(duì)發(fā)送Tuple的對(duì)象SpoutOutputCollector和配置對(duì)象TopologyContext初始化。
示例如下:
1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {2 3 _collector = collector;4 5 }3.2 declareOutputFields方法此方法用于聲明當(dāng)前Spout的Tuple發(fā)送流。Stream流的定義是通過(guò)OutputFieldsDeclare.declareStream方法完成的,其中的參數(shù)包括了發(fā)送的域Fields。
示例如下:
1 public void declareOutputFields(OutputFieldsDeclarer declarer) {2 3 declarer.declare(new Fields("Word"));4 5 }3.3 getComponentConfiguration方法此方法用于聲明針對(duì)當(dāng)前組件的特殊的Configuration配置。
示例如下:
1 public Map<String, Object> getComponentConfiguration() { 2 3 if(!_isDistributed) { 4 5 Map<String, Object> ret = new HashMap<String, Object>(); 6 7 ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); 8 9 return ret;10 11 } else {12 13 return null;14 15 }16 17 }這里便是設(shè)置了Topology中當(dāng)前Component的線程數(shù)量上限。
3.4 nextTuple方法這是Spout類中最重要的一個(gè)方法。發(fā)射一個(gè)Tuple到Topology都是通過(guò)這個(gè)方法來(lái)實(shí)現(xiàn)的。
示例如下:
1 public void nextTuple() { 2 3 Utils.sleep(100); 4 5 final String[] words = new String[] {"twitter","facebook","google"}; 6 7 final Random rand = new Random(); 8 9 final String word = words[rand.nextInt(words.length)];10 11 _collector.emit(new Values(word));12 13 }這里便是從一個(gè)數(shù)組中隨機(jī)選取一個(gè)單詞作為Tuple,然后通過(guò)_collector發(fā)送到Topology。
另外,除了上述幾個(gè)方法之外,還有ack、fail和close方法等。Storm在監(jiān)測(cè)到一個(gè)Tuple被成功處理之后會(huì)調(diào)用ack方法,處理失敗會(huì)調(diào)用fail方法,這兩個(gè)方法在BaseRichSpout類中已經(jīng)被隱式的實(shí)現(xiàn)了。
4 BoltsBolt類接收由Spout或者其他上游Bolt類發(fā)來(lái)的Tuple,對(duì)其進(jìn)行處理。Bolt組件的實(shí)現(xiàn)可以通過(guò)繼承BasicRichBolt類或者IRichBolt接口來(lái)完成。
Bolt類需要實(shí)現(xiàn)的主要方法有:

此方法和Spout中的open方法類似,為Bolt提供了OutputCollector,用來(lái)從Bolt中發(fā)送Tuple。Bolt中Tuple的發(fā)送可以在prepare方法中、execute方法中、cleanup等方法中進(jìn)行,一般都是些在execute中。
示例如下:
1 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {2 3 _collector = collector;4 5 }4.2 declareOutputFields方法用于聲明當(dāng)前Bolt發(fā)送的Tuple中包含的字段,和Spout中類似。
示例如下:
1 public void declareOutputFields(OutputFieldsDeclarer declarer) {2 3 declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));4 5 }此例說(shuō)明當(dāng)前Bolt類發(fā)送的Tuple包含了三個(gè)字段:"obj", "count", "actualWindowLengthInSeconds"。
4.3 getComponentConfiguration方法和Spout類一樣,在Bolt中也可以有g(shù)etComponentConfiguration方法。
示例如下:
1 public Map<String, Object> getComponentConfiguration() {2 3 Map<String, Object> conf = new HashMap<String, Object>();4 5 conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);6 7 return conf;8 9 }此例定義了從系統(tǒng)組件“_system”的“_tick”流中發(fā)送Tuple到當(dāng)前Bolt的頻率,當(dāng)系統(tǒng)需要每隔一段時(shí)間執(zhí)行特定的處理時(shí),就可以利用這個(gè)系統(tǒng)的組件的特性來(lái)完成。
4.4 execute方法這是Bolt中最關(guān)鍵的一個(gè)方法,對(duì)于Tuple的處理都可以放到此方法中進(jìn)行。具體的發(fā)送也是通過(guò)emit方法來(lái)完成的。此時(shí),有兩種情況,一種是emit方法中有兩個(gè)參數(shù),另一個(gè)種是有一個(gè)參數(shù)。
(1)emit有一個(gè)參數(shù):此唯一的參數(shù)是發(fā)送到下游Bolt的Tuple,此時(shí),由上游發(fā)來(lái)的舊的Tuple在此隔斷,新的Tuple和舊的Tuple不再屬于同一棵Tuple樹(shù)。新的Tuple另起一個(gè)新的Tuple樹(shù)。
(2)emit有兩個(gè)參數(shù):第一個(gè)參數(shù)是舊的Tuple的輸入流,第二個(gè)參數(shù)是發(fā)往下游Bolt的新的Tuple流。此時(shí),新的Tuple和舊的Tuple是仍然屬于同一棵Tuple樹(shù),即,如果下游的Bolt處理Tuple失敗,則會(huì)向上傳遞到當(dāng)前Bolt,當(dāng)前Bolt根據(jù)舊的Tuple流繼續(xù)往上游傳遞,申請(qǐng)重發(fā)失敗的Tuple。保證Tuple處理的可靠性。
這兩種情況要根據(jù)自己的場(chǎng)景來(lái)確定。
示例如下:
1 public void execute(Tuple tuple) { 2 3 _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 4 5 _collector.ack(tuple); 6 7 } 8 9 public void execute(Tuple tuple) {10 11 _collector.emit(new Values(tuple.getString(0) + "!!!"));12 13 }此外還有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法類似,都是在當(dāng)前Component關(guān)閉時(shí)調(diào)用,但是針對(duì)實(shí)時(shí)計(jì)算來(lái)說(shuō),除非一些特殊的場(chǎng)景要求以外,這兩個(gè)方法一般都很少用到。
5 Stream grouping上文中介紹了Topology的基本組件Spout和Bolt,在Topology中,數(shù)據(jù)流Tuple的處理就是不斷的通過(guò)調(diào)用不同的Spout和Bolt來(lái)完成的。不同的Bolt和Spout的上下游關(guān)系是通過(guò)在入口類中定義的。示例如下:
1 builder = new TopologyBuilder(); 2 3 builder.setSpout(spoutId, new TestWordSpout(), 5); 4 5 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); 6 7 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); 8 builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); 9此例中的builder是TopologyBuilder對(duì)象,通過(guò)它的createTopology方法可以創(chuàng)建一個(gè)Topology對(duì)象,同時(shí)此builder還要定義當(dāng)前Topology中用到的Spout和Bolt對(duì)象,分別通過(guò)setSpout方法和setBolt方法來(lái)完成。
setSpout方法和setBolt方法中的第一個(gè)參數(shù)是當(dāng)前的Component組件的Stream流ID號(hào);第二個(gè)參數(shù)是具體的Component實(shí)現(xiàn)類的構(gòu)造;第三個(gè)參數(shù)是當(dāng)前Component的并行執(zhí)行的線程數(shù)目,Storm會(huì)根據(jù)這個(gè)數(shù)字的累加和來(lái)確定Topology的Task數(shù)目。最后的小尾巴*Grouping是指的一個(gè)Stream應(yīng)如何分配數(shù)據(jù)給Bolt上面的Task。目前Storm的Stream Grouping有如下幾種:
(1)ShuffleGrouping:隨機(jī)分組,隨機(jī)分發(fā)Stream中的tuple,保證每個(gè)Bolt的Task接收Tuple數(shù)量大致一致;
(2)FieldsGrouping:按照字段分組,保證相同字段的Tuple分配到同一個(gè)Task中;
(3)AllGrouping:廣播發(fā)送,每一個(gè)Task都會(huì)受到所有的Tuple;
(4)GlobalGrouping:全局分組,所有的Tuple都發(fā)送到同一個(gè)Task中,此時(shí)一般將當(dāng)前Component的并發(fā)數(shù)目設(shè)置為1;
(5)NonGrouping:不分組,和ShuffleGrouping類似,當(dāng)前Task的執(zhí)行會(huì)和它的被訂閱者在同一個(gè)線程中執(zhí)行;
(6)DirectGrouping:直接分組,直接指定由某個(gè)Task來(lái)執(zhí)行Tuple的處理,而且,此時(shí)必須有emitDirect方法來(lái)發(fā)送;
(7) localOrShuffleGrouping:和ShuffleGrouping類似,若Bolt有多個(gè)Task在同一個(gè)進(jìn)程中,Tuple會(huì)隨機(jī)發(fā)給這些Task。
不同的的Grouping,需要根據(jù)不同的場(chǎng)景來(lái)具體設(shè)定,不一而論。
6 Topology運(yùn)行6.1 Topology運(yùn)行方式Topology的運(yùn)行可以分為本地模式和分布式模式,模式的設(shè)置可以在配置文件中設(shè)定,也可以在代碼中設(shè)置。
(1)本地運(yùn)行的提交方式:
1 LocalCluster cluster = new LocalCluster();2 3 cluster.submitTopology(topologyName, conf, topology);4 5 cluster.killTopology(topologyName);6 7 cluster.shutdown();
(2)分布式提交方式:
StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());
需要注意的是,在Storm代碼編寫(xiě)完成之后,需要打包成jar包放到Nimbus中運(yùn)行,打包的時(shí)候,不需要把依賴的jar都打進(jìn)去,否則如果把依賴的storm.jar包打進(jìn)去的話,運(yùn)行時(shí)會(huì)出現(xiàn)重復(fù)的配置文件錯(cuò)誤導(dǎo)致Topology無(wú)法運(yùn)行。因?yàn)門opology運(yùn)行之前,會(huì)加載本地的storm.yaml配置文件。
在Nimbus運(yùn)行的命令如下:
storm jarStormTopology.jar maincalss args
6.2 Topology運(yùn)行流程有幾點(diǎn)需要說(shuō)明的地方:
(1)Storm提交后,會(huì)把代碼首先存放到Nimbus節(jié)點(diǎn)的inbox目錄下,之后,會(huì)把當(dāng)前Storm運(yùn)行的配置生成一個(gè)stormconf.ser文件放到Nimbus節(jié)點(diǎn)的stormdist目錄中,在此目錄中同時(shí)還有序列化之后的Topology代碼文件;
(2)在設(shè)定Topology所關(guān)聯(lián)的Spouts和Bolts時(shí),可以同時(shí)設(shè)置當(dāng)前Spout和Bolt的executor數(shù)目和task數(shù)目,默認(rèn)情況下,一個(gè)Topology的task的總和是和executor的總和一致的。之后,系統(tǒng)根據(jù)worker的數(shù)目,盡量平均的分配這些task的執(zhí)行。worker在哪個(gè)supervisor節(jié)點(diǎn)上運(yùn)行是由storm本身決定的;
(3)任務(wù)分配好之后,Nimbes節(jié)點(diǎn)會(huì)將任務(wù)的信息提交到zookeeper集群,同時(shí)在zookeeper集群中會(huì)有workerbeats節(jié)點(diǎn),這里存儲(chǔ)了當(dāng)前Topology的所有worker進(jìn)程的心跳信息;
(4)Supervisor節(jié)點(diǎn)會(huì)不斷的輪詢zookeeper集群,在zookeeper的assignments節(jié)點(diǎn)中保存了所有Topology的任務(wù)分配信息、代碼存儲(chǔ)目錄、任務(wù)之間的關(guān)聯(lián)關(guān)系等,Supervisor通過(guò)輪詢此節(jié)點(diǎn)的內(nèi)容,來(lái)領(lǐng)取自己的任務(wù),啟動(dòng)worker進(jìn)程運(yùn)行;
(5)一個(gè)Topology運(yùn)行之后,就會(huì)不斷的通過(guò)Spouts來(lái)發(fā)送Stream流,通過(guò)Bolts來(lái)不斷的處理接收到的Stream流,Stream流是無(wú)界的。
最后一步會(huì)不間斷的執(zhí)行,除非手動(dòng)結(jié)束Topology。
6.3 Topology方法調(diào)用流程Topology中的Stream處理時(shí)的方法調(diào)用過(guò)程如下:

有幾點(diǎn)需要說(shuō)明的地方:
(1)每個(gè)組件(Spout或者Bolt)的構(gòu)造方法和declareOutputFields方法都只被調(diào)用一次。
(2)open方法、prepare方法的調(diào)用是多次的。入口函數(shù)中設(shè)定的setSpout或者setBolt里的并行度參數(shù)指的是executor的數(shù)目,是負(fù)責(zé)運(yùn)行組件中的task的線程 的數(shù)目,此數(shù)目是多少,上述的兩個(gè)方法就會(huì)被調(diào)用多少次,在每個(gè)executor運(yùn)行的時(shí)候調(diào)用一次。相當(dāng)于一個(gè)線程的構(gòu)造方法。
(3)nextTuple方法、execute方法是一直被運(yùn)行的,nextTuple方法不斷的發(fā)射Tuple,Bolt的execute不斷的接收Tuple進(jìn)行處理。只有這樣不斷地運(yùn)行,才會(huì)產(chǎn) 生無(wú)界的Tuple流,體現(xiàn)實(shí)時(shí)性。相當(dāng)于線程的run方法。
(4)在提交了一個(gè)topology之后,Storm就會(huì)創(chuàng)建spout/bolt實(shí)例并進(jìn)行序列化。之后,將序列化的component發(fā)送給所有的任務(wù)所在的機(jī)器(即Supervisor節(jié) 點(diǎn)),在每一個(gè)任務(wù)上反序列化component。
(5)Spout和Bolt之間、Bolt和Bolt之間的通信,是通過(guò)zeroMQ的消息隊(duì)列實(shí)現(xiàn)的。
(6)上圖沒(méi)有列出ack方法和fail方法,在一個(gè)Tuple被成功處理之后,需要調(diào)用ack方法來(lái)標(biāo)記成功,否則調(diào)用fail方法標(biāo)記失敗,重新處理這個(gè)Tuple。
6.4 Topology并行度在Topology的執(zhí)行單元里,有幾個(gè)和并行度相關(guān)的概念。
(1)worker:每個(gè)worker都屬于一個(gè)特定的Topology,每個(gè)Supervisor節(jié)點(diǎn)的worker可以有多個(gè),每個(gè)worker使用一個(gè)單獨(dú)的端口,它對(duì)Topology中的每個(gè)component運(yùn)行一個(gè)或者多個(gè)executor線程來(lái)提供task的運(yùn)行服務(wù)。
(2)executor:executor是產(chǎn)生于worker進(jìn)程內(nèi)部的線程,會(huì)執(zhí)行同一個(gè)component的一個(gè)或者多個(gè)task。
(3)task:實(shí)際的數(shù)據(jù)處理由task完成,在Topology的生命周期中,每個(gè)組件的task數(shù)目是不會(huì)發(fā)生變化的,而executor的數(shù)目卻不一定。executor數(shù)目小于等于task的數(shù)目,默認(rèn)情況下,二者是相等的。
在運(yùn)行一個(gè)Topology時(shí),可以根據(jù)具體的情況來(lái)設(shè)置不同數(shù)量的worker、task、executor,而設(shè)置的位置也可以在多個(gè)地方。
(1)worker設(shè)置:
(1.1)可以通過(guò)設(shè)置yaml中的topology.workers屬性
(1.2)在代碼中通過(guò)Config的setNumWorkers方法設(shè)定
(2)executor設(shè)置:
通過(guò)在Topology的入口類中setBolt、setSpout方法的最后一個(gè)參數(shù)指定,不指定的話,默認(rèn)為1;
(3)task設(shè)置:
(3.1) 默認(rèn)情況下,和executor數(shù)目一致;
(3.2)在代碼中通過(guò)TopologyBuilder的setNumTasks方法設(shè)定具體某個(gè)組件的task數(shù)目;
6.5 終止Topology通過(guò)在Nimbus節(jié)點(diǎn)利用如下命令來(lái)終止一個(gè)Topology的運(yùn)行:
storm killtopologyName
kill之后,可以通過(guò)UI界面查看topology狀態(tài),會(huì)首先變成KILLED狀態(tài),在清理完本地目錄和zookeeper集群中的和當(dāng)前Topology相關(guān)的信息之后,此Topology就會(huì)徹底消失了。
7 Topology跟蹤Topology提交后,可以在Nimbus節(jié)點(diǎn)的web界面查看,默認(rèn)的地址是http://Nimbusip:8080。
8 Storm應(yīng)用上面給出了如何編寫(xiě)Storm框架任務(wù)Topology的方法,那么在哪些場(chǎng)景下能夠使用Storm框架呢?下面介紹Storm框架的幾個(gè)典型的應(yīng)用場(chǎng)景。
(1)利用Storm框架的DRPC進(jìn)行大量的函數(shù)并行調(diào)用,即實(shí)現(xiàn)分布式的RPC;
(2)利用Storm框架的Transaction Topology,可以進(jìn)行實(shí)時(shí)性的批量更新或者查詢數(shù)據(jù)庫(kù)操作或者應(yīng)用需要同一批內(nèi)的消息以及批與批之間的消息并行處理這樣的場(chǎng)景,此時(shí)Topology中只能有一個(gè)TrasactionalSpout;
(3)利用滑動(dòng)窗口的邏輯結(jié)合Storm框架來(lái)計(jì)算得出某段時(shí)間內(nèi)的售出量最多的產(chǎn)品、購(gòu)買者最多的TopN地區(qū)等;
(4)精確的廣告推送,在用戶瀏覽產(chǎn)品的時(shí)候,將瀏覽記錄實(shí)時(shí)性的搜集,發(fā)送到Bolt,由Bolt來(lái)根據(jù)用戶的賬戶信息(如果有的話)完成產(chǎn)品的分類統(tǒng)計(jì),產(chǎn)品的相關(guān)性查詢等邏輯計(jì)算之后,將計(jì)算結(jié)果推送給用戶;
(5)實(shí)時(shí)日志的處理,Storm可以和一個(gè)分布式存儲(chǔ)結(jié)合起來(lái),實(shí)時(shí)性的從多個(gè)數(shù)據(jù)源發(fā)送數(shù)據(jù)到處理邏輯Bolts,Bolts完成一些邏輯處理之后,交給分布式存儲(chǔ)框架進(jìn)行存儲(chǔ),此時(shí),Spout可以是多個(gè);
(6)實(shí)時(shí)性的監(jiān)控輿論熱點(diǎn),比如針對(duì)某個(gè)關(guān)鍵詞,在用戶查詢的時(shí)候,產(chǎn)生數(shù)據(jù)源Spout,結(jié)合語(yǔ)義分析等,由Bolt來(lái)完成查詢關(guān)鍵詞的統(tǒng)計(jì)分析,匯總當(dāng)前的輿論熱點(diǎn);
(7)數(shù)據(jù)流的實(shí)時(shí)聚合操作。
新聞熱點(diǎn)
疑難解答
圖片精選
網(wǎng)友關(guān)注