在大數(shù)據(jù)時(shí)代,數(shù)據(jù)規(guī)模變得越來越大。由于數(shù)據(jù)的增長(zhǎng)速度和非結(jié)構(gòu)化的特性,常用的軟硬件工具已無法在用戶可容忍的時(shí)間內(nèi)對(duì)數(shù)據(jù)進(jìn)行采集、管理和處理。本文主要介紹如何在阿里云上使用Kafka和Storm搭建大規(guī)模消息分發(fā)和實(shí)時(shí)數(shù)據(jù)流處理系統(tǒng),以及這個(gè)過程中主要遭遇的一些挑戰(zhàn)。實(shí)踐主要立足建立一套汽車狀態(tài)實(shí)時(shí)監(jiān)控系統(tǒng),可以在阿里云上立即進(jìn)行部署。
一、實(shí)時(shí)大數(shù)據(jù)處理利器——Storm和Kafka
大數(shù)據(jù)時(shí)代,隨著可獲取數(shù)據(jù)的渠道增多,比如常見的電子商務(wù)、網(wǎng)絡(luò)、傳感器的數(shù)據(jù)流、太空數(shù)據(jù)等,數(shù)據(jù)規(guī)模也變得越來越大;同時(shí),不同的渠道往往產(chǎn)生更多的數(shù)據(jù)類型,這些衍生的數(shù)據(jù)增長(zhǎng)非常之快,規(guī)模非常之大。大數(shù)據(jù)時(shí)代各個(gè)機(jī)構(gòu)可謂是坐擁金山,然而目前大數(shù)據(jù)技術(shù)的應(yīng)用卻仍然存在眾多挑戰(zhàn),主要出現(xiàn)在數(shù)據(jù)收集、存儲(chǔ)、處理和可視化幾個(gè)過程。
1. 數(shù)據(jù)收集
Gartner的Merv Adrian對(duì)大數(shù)據(jù)有這樣一個(gè)定義:“大數(shù)據(jù)讓常用硬件軟件工具無法在用戶可容忍時(shí)間內(nèi)對(duì)數(shù)據(jù)進(jìn)行采集、管理和處理。”[1]麥肯錫全球研究院在2011年5月也有這樣一個(gè)概念:“大數(shù)據(jù)是指超出典型數(shù)據(jù)庫(kù)軟件工具采集、存儲(chǔ)、管理和分析能力的數(shù)據(jù)集。”[2]從上面的定義可以看出,大數(shù)據(jù)最大的挑戰(zhàn)在于如何在有限時(shí)間內(nèi)對(duì)數(shù)據(jù)進(jìn)行處理和分析,并得到有用信息。
2. 數(shù)據(jù)處理
大數(shù)據(jù)處理中最著名的工具是Hadoop,不過它并不是一套實(shí)時(shí)系統(tǒng)。為了解決這個(gè)問題,計(jì)算機(jī)工程師們又開發(fā)了Storm和Kafka。Apache Storm是一套開源的分布式實(shí)時(shí)計(jì)算系統(tǒng)。最早由Nathan Marz[3]開發(fā),在被Twitter收購(gòu)后開源,并在2014年9月起成為Apache頂級(jí)開源項(xiàng)目。Storm被廣泛用于各種商業(yè)網(wǎng)站,包括Twitter、Yelp、Groupon、百度、淘寶等。Storm的使用場(chǎng)景非常廣泛,例如實(shí)時(shí)分析、在線機(jī)器學(xué)習(xí)、連續(xù)計(jì)算、分部署RPC、ET|等。Storm有著非?斓奶幚硭俣,單節(jié)點(diǎn)可以達(dá)到百萬個(gè)元組每秒,此外它還具有高擴(kuò)展、容錯(cuò)、保證數(shù)據(jù)處理等特性。圖1是Storm的一個(gè)簡(jiǎn)單的架構(gòu)。
圖1 Storm架構(gòu)
Apache Kafka也是一個(gè)開源的系統(tǒng),旨在提供一個(gè)統(tǒng)一的,高吞吐、低延遲的分布式消息處理平臺(tái)來對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行處理。它最早由LinkedIn開發(fā),開源于2011年并被貢獻(xiàn)給了Apache。Kafka區(qū)別于傳統(tǒng)RabbitMQ、Apache ActiveMQ等消息系統(tǒng)的地方主要在于:分布式系統(tǒng)特性,易于擴(kuò)展;為發(fā)布和訂閱提供高吞吐量;支持多訂閱,可以自動(dòng)平衡消費(fèi)者;可以將消息持久化到磁盤,可以用于批量消費(fèi),例如ETL等。
圖2 Kafka架構(gòu)
二、在阿里云上部署Storm和Kafka
我們需要設(shè)計(jì)一個(gè)實(shí)時(shí)車輛監(jiān)控系統(tǒng),這個(gè)系統(tǒng)要將汽車駕駛過程中實(shí)時(shí)的位置,速度,轉(zhuǎn)速,油耗以及轉(zhuǎn)速發(fā)送到系統(tǒng)中,從而可以實(shí)時(shí)計(jì)算出車流量和污染物排放量。該系統(tǒng)的目標(biāo)是要能同事支持10萬輛車同時(shí)發(fā)送消息,在最高峰能滿足100萬輛車。為了實(shí)現(xiàn)如此規(guī)模的消息分發(fā)和吞吐,我們基于Kafka和Storm來設(shè)計(jì)實(shí)現(xiàn)。同時(shí)為了滿足高擴(kuò)展性,我們將Storm和Kafka分別部署到不同的服務(wù)器上,如果需要更多的計(jì)算能力,可以隨時(shí)通過創(chuàng)建新的服務(wù)器的方式來完成。此外為了滿足高可用性,每臺(tái)相同功能的服務(wù)器也需要至少部署2臺(tái),這樣一旦一臺(tái)服務(wù)器出現(xiàn)問題,另外一臺(tái)服務(wù)器也可以持續(xù)提供服務(wù)。
在實(shí)體服務(wù)器上部署Storm和Kafka等系統(tǒng)涉及到大量服務(wù)器集群和軟件的安裝部署,這個(gè)過程需要花費(fèi)大量時(shí)間,而
云計(jì)算則很好的彌補(bǔ)了這一點(diǎn)——提供各種虛擬服務(wù)器和鏡像功能,加快基礎(chǔ)設(shè)施和軟件的部署過程。
圖3 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)架構(gòu)
我們需要2臺(tái)服務(wù)器來構(gòu)建Kafka代理服務(wù)器,在Storm中還需要2臺(tái)服務(wù)器來運(yùn)行Spout和2個(gè)Bolt,另外在Redis層則需要2臺(tái)服務(wù)器來部署緩存,再加上2臺(tái)服務(wù)器作為Web服務(wù)器。服務(wù)器架構(gòu)圖如圖4所示。
圖4 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)架構(gòu)
在部署車聯(lián)網(wǎng)監(jiān)控系統(tǒng)之前,我們首先需要在每臺(tái)服務(wù)器上部署相應(yīng)的軟件,包括Git、Libzmq、Java、G++等,用于代碼編譯和相關(guān)軟件安裝?梢允褂肧SH連接到相應(yīng)的機(jī)器。用戶名密碼則會(huì)由阿里云以郵件或者短消息的方式提供。
在車聯(lián)網(wǎng)實(shí)時(shí)監(jiān)控系統(tǒng)中,我們需要部署4種不同類型的服務(wù)器,分別是網(wǎng)站前臺(tái)服務(wù)器、Kafka服務(wù)器、Storm服務(wù)器和緩存服務(wù)器,以滿足上面提到的高擴(kuò)展性的要求。在每一種類型的服務(wù)器部署完成之后,都可以通過阿里云鏡像的功能,創(chuàng)建一個(gè)能隨時(shí)使用的鏡像,這樣在擴(kuò)展服務(wù)器的時(shí)候就不需要重新安裝軟件,直接通過鏡像創(chuàng)建服務(wù)器就可以了。
以下命令需要在所有服務(wù)器上運(yùn)行以安裝相應(yīng)的軟件:
以下命令安裝在緩存服務(wù)器和Kafka服務(wù)器上:
另外,我們還需要在Storm的服務(wù)器安裝maven和lein用于代碼編譯:
在Kafka服務(wù)器上安裝Kafka:
對(duì)于Storm和Kafka的安裝,到這一步已基本完成,接下去需要分別創(chuàng)建鏡像。創(chuàng)建鏡像的方法是先創(chuàng)建阿里云快照,然后通過將快照轉(zhuǎn)換為鏡像的方式完成。具體步驟如下:
在阿里云的管理界面選擇云服務(wù)器,隨后選擇該服務(wù)器的磁盤列表,點(diǎn)擊創(chuàng)建快照。
輸入快照名稱并確認(rèn)。
阿里云會(huì)自動(dòng)為云服務(wù)器的系統(tǒng)盤創(chuàng)建快照,當(dāng)創(chuàng)建完成以后,會(huì)出現(xiàn)“創(chuàng)建自定義鏡像”按鈕。
點(diǎn)擊“創(chuàng)建自定義鏡像”的按鈕,阿里云就會(huì)將這個(gè)快照轉(zhuǎn)換為鏡像,可以在阿里云ECS管理界面的自定義鏡像欄中看到。
接下來,我們通過鏡像可以直接創(chuàng)建相同配置的ECS服務(wù)器。
圖5 從自定義鏡像中創(chuàng)建云服務(wù)器
當(dāng)然,在自動(dòng)擴(kuò)展實(shí)現(xiàn)上,云服務(wù)并不需要用戶去手動(dòng)執(zhí)行,這里我們使用阿里云的ECS REST API自動(dòng)通過鏡像創(chuàng)建機(jī)器?梢詤⒖家韵翽ython代碼,自動(dòng)創(chuàng)建阿里云ECS虛擬機(jī):
三、基于Storm和Kafka的車輛信息實(shí)時(shí)監(jiān)控系統(tǒng)打造
接下來做的就是將車輛信息實(shí)時(shí)監(jiān)控系統(tǒng)部署到系統(tǒng)中。這個(gè)系統(tǒng)演示了如何編寫一個(gè)Storm的Topology,從Kafka消息系統(tǒng)中將信息讀取出來。我們使用Kafka的客戶端模擬從世界各地發(fā)送車輛實(shí)時(shí)信息給Kafka集群,然后Storm Topology會(huì)把這些消息通過Bolts將坐標(biāo)轉(zhuǎn)換為Json對(duì)象,并且使用GeoJSON在Bing Map上顯示車輛的實(shí)時(shí)位置、溫度、轉(zhuǎn)速以及速度等等信息。Topology還會(huì)將信息寫到Redis緩存中,然后Node.js通過socket.io讀取Redis中的信息,并且使用d3js顯示在頁(yè)面上。
首先,我們需要編寫Kafka 生產(chǎn)者的部分代碼,主要是模擬讀取汽車的實(shí)時(shí)數(shù)據(jù)并向Kafka集群進(jìn)行發(fā)送,我們實(shí)現(xiàn)了一個(gè)KafkaCarDataProducer類,通過配置ProducerConfig來創(chuàng)建一個(gè)Producer對(duì)象來發(fā)送數(shù)據(jù)。它可以用來連接到Zookeeper,或者直接是Kafka 代理。例如:kafkaclient.cloudapp.net:2181或者0:kafkaclient.cloudapp.net:9092。代碼中我們根據(jù)不同的連接字符串設(shè)置不同配置。偽代碼如下:
然后就可以直接通過下面代碼來發(fā)送消息:
接下來我們需要編寫3個(gè)Storm類,首先是創(chuàng)建Storm的Topology,這個(gè)類叫KafkaCarTopology,我們創(chuàng)建了一個(gè)叫car的topic,然后定義本機(jī)一個(gè)hosts和Zookeeper hosts,最后創(chuàng)建一個(gè)Spout,叫做KafkaSpout,然后添加ParseCarDataBolt連接到KafkaSout,再創(chuàng)建一個(gè)RedisCarBolt,用于將結(jié)果寫入Redis緩存。最后根據(jù)參數(shù)創(chuàng)建3個(gè)Worker,提交Storm Topology。
在這個(gè)拓?fù)浣Y(jié)構(gòu)中,我們有2個(gè)Bolt用于數(shù)據(jù)的處理,第一個(gè)叫ParserCarDataBolt,這個(gè)Bolt主要將Kafka傳出的消息轉(zhuǎn)換為Json格式,它繼承BaseBasicBolt,在execute函數(shù)中通過collector提交數(shù)據(jù),同時(shí)重載了declareOutputFields函數(shù),通知下一個(gè)Bolt的數(shù)據(jù)格式。代碼如下:
數(shù)據(jù)會(huì)被寫入RedisCarBolt,再寫入到Redis緩存中。它繼承自BaseRichBolt,需要重載prepare和excute方法來處理消息元組。此外還需要重載prepare和cleanup函數(shù),幾個(gè)關(guān)鍵的函數(shù)如下:
最后我們還需要編寫一些Node.js的代碼,保證在頁(yè)面上通過socket.io進(jìn)行通訊,實(shí)時(shí)將最終數(shù)據(jù)從Redis里面讀取出來,并在BingMap上顯示。
到此為止,一個(gè)簡(jiǎn)單的車輛信息實(shí)時(shí)監(jiān)控系統(tǒng)就實(shí)現(xiàn)了,我們通過bash腳本進(jìn)行編譯,并安裝到相應(yīng)的服務(wù)器上,比如下列代碼需要被安裝在Storm的服務(wù)器上:
有一點(diǎn)需要注意的是,由于在編譯過程中需要自動(dòng)下載Storm庫(kù),在阿里云的國(guó)內(nèi)機(jī)房的虛擬機(jī)很有可能需要設(shè)置代理進(jìn)行。設(shè)置代理的方法也很簡(jiǎn)單,通過對(duì)lein命令增加以下參數(shù)就可以了:http_proxy=http://URL:PORT
接著我們?cè)诰W(wǎng)頁(yè)上訪問http://webhostname或者運(yùn)行node.js的服務(wù)器,就會(huì)看到下面的網(wǎng)頁(yè),同時(shí)發(fā)現(xiàn)網(wǎng)頁(yè)將同步刷新汽車的實(shí)時(shí)位置、速度、轉(zhuǎn)速等。
圖6 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)演示頁(yè)面
四、對(duì)車聯(lián)網(wǎng)監(jiān)控系統(tǒng)的性能測(cè)試
接下來我們對(duì)這個(gè)系統(tǒng)進(jìn)行了一個(gè)簡(jiǎn)單的吞吐量測(cè)試。我們只有1個(gè)Topic,使用5個(gè)partition、3個(gè)worker、1個(gè)Spout和2個(gè)Bolt,在一臺(tái)2核2GB的ECS上運(yùn)行。我們使用了另外4臺(tái)客戶端,每個(gè)客戶端有4核8G內(nèi)存,分別啟動(dòng)40個(gè)線程不斷向這個(gè)系統(tǒng)實(shí)時(shí)發(fā)送汽車信息,模擬160臺(tái)汽車發(fā)送的情況,其消息發(fā)送數(shù)量和CPU占用率情況如圖7所示。
圖7 車聯(lián)網(wǎng)監(jiān)控系統(tǒng)性能分析
從圖7中可以看出,平均每輛汽車客戶端會(huì)模擬每秒給系統(tǒng)發(fā)送了1000條消息,總的吞吐量達(dá)到16萬條左右,此時(shí)平均的CPU占用率大約在30%左右。如果系統(tǒng)是完全線性的,在系統(tǒng)CPU占用率達(dá)到90%的情況下,大約能處理48萬條消息。不過實(shí)際情況中,在阿里云ECS上,卻發(fā)現(xiàn)CPU達(dá)到50%以后,就不再上升,而客戶端發(fā)送消息的延時(shí)也逐步增加。
經(jīng)過分析以后發(fā)現(xiàn),由于ECS的磁盤性能無法和物理機(jī)的SSD磁盤相比,所以在Kafka消息大量寫入磁盤的過程中,吞吐量下降,磁盤讀寫負(fù)擔(dān)變得非常大。這時(shí)我們?cè)黾恿薑afka的Broker和Storm的Spout的數(shù)量,將消息分布式地分發(fā)到多臺(tái)ECS上,從而實(shí)現(xiàn)了消息吞吐量的線性增加。
在這個(gè)系統(tǒng)中,我們不推薦使用大核和大內(nèi)存的機(jī)器,而推薦使用多臺(tái)2核2GB的服務(wù)器分布式地處理消息。這也是云計(jì)算處理大數(shù)據(jù)的原則所在,使用橫向擴(kuò)展而不用縱向擴(kuò)展。
五、結(jié)論
至此我們介紹了利用Storm和Kafka實(shí)現(xiàn)大數(shù)據(jù)的實(shí)時(shí)處理,并且介紹了如何在云上通過鏡像快速地創(chuàng)建這套系統(tǒng)。此外,我們還介紹了如何對(duì)Storm、Kafka、Redis以及Node.js開發(fā)出一個(gè)實(shí)時(shí)的車輛信息監(jiān)控系統(tǒng)。這個(gè)系統(tǒng)能夠?qū)崿F(xiàn)高性能、大吞吐量和高并發(fā)。當(dāng)然,隨著大數(shù)據(jù)的快速發(fā)展,我們相信還會(huì)有越來越多好的工具和產(chǎn)品出現(xiàn)在市場(chǎng)上,到那時(shí)我們從大數(shù)據(jù)中獲取有效的信息將會(huì)變得更加容易和便捷。有了云計(jì)算的幫助,開發(fā)的周期也會(huì)變得越來越短。
核心關(guān)注:拓步ERP系統(tǒng)平臺(tái)是覆蓋了眾多的業(yè)務(wù)領(lǐng)域、行業(yè)應(yīng)用,蘊(yùn)涵了豐富的ERP管理思想,集成了ERP軟件業(yè)務(wù)管理理念,功能涉及供應(yīng)鏈、成本、制造、CRM、HR等眾多業(yè)務(wù)領(lǐng)域的管理,全面涵蓋了企業(yè)關(guān)注ERP管理系統(tǒng)的核心領(lǐng)域,是眾多中小企業(yè)信息化建設(shè)首選的ERP管理軟件信賴品牌。
轉(zhuǎn)載請(qǐng)注明出處:拓步ERP資訊網(wǎng)http://www.ezxoed.cn/
本文標(biāo)題:在云上搭建大規(guī)模實(shí)時(shí)數(shù)據(jù)流處理系統(tǒng)
本文網(wǎng)址:http://www.ezxoed.cn/html/consultation/10839717486.html