91欧美超碰AV自拍|国产成年人性爱视频免费看|亚洲 日韩 欧美一厂二区入|人人看人人爽人人操aV|丝袜美腿视频一区二区在线看|人人操人人爽人人爱|婷婷五月天超碰|97色色欧美亚州A√|另类A√无码精品一级av|欧美特级日韩特级

您好,歡迎來電子發(fā)燒友網(wǎng)! ,新用戶?[免費注冊]

您的位置:電子發(fā)燒友網(wǎng)>源碼下載>數(shù)值算法/人工智能>

在Spark Streaming中實現(xiàn)快速狀態(tài)流處理

大小:0.5 MB 人氣: 2017-10-12 需要積分:1
許多復(fù)雜流處理流水線程序必須將狀態(tài)保持一段時間,例如,如果你想實時了解網(wǎng)站用戶行為,你需要將網(wǎng)站上各“用戶會話(user session)”信息保存為持久狀態(tài)并根據(jù)用戶的行為對這一狀態(tài)進行持續(xù)更新。這種有狀態(tài)的流計算可以在Spark Streaming中使用updateStateByKey 方法實現(xiàn)。
  在Spark 1.6 中,我們通過使用新API mapWithState極大地增強對狀態(tài)流處理的支持。該新的API提供了通用模式的內(nèi)置支持,而在以前使用updateStateByKey 方法來實現(xiàn)這一相同功能(如會話超時)需要進行手動編碼和優(yōu)化。因此,mapWithState 方法較之于updateStateByKey方法,有十倍之多的性能提升。在本博文當中,我們將對mapWithState方法進行深入講解,同時提前感受后續(xù)新版本中將加入的特性。
  使用mapWithState方法進行狀態(tài)流處理
  Spark Streaming中最強大的特性之一是簡單的狀態(tài)流處理API及相關(guān)聯(lián)的本地化、可容錯的狀態(tài)管理能力。開發(fā)人員僅需要指定狀態(tài)的結(jié)構(gòu)和更新邏輯,Spark Streaming便能夠接管集群中狀態(tài)的分發(fā)、管理,在程序出錯時自動進行恢復(fù)并提供端到端的容錯保障。盡管現(xiàn)有DStream中updateStateByKey方法能夠允許用戶執(zhí)行狀態(tài)計算,但使用mapWithState方法能夠讓用戶更容易地表達程序邏輯,同時讓性能提升10倍之多。讓我們通過一個例子對mapWithState方法的優(yōu)勢進行闡述。
  假設(shè)我們要根據(jù)用戶歷史動作對某一網(wǎng)站的用戶行為進行實時分析,對各個用戶,我們需要保持用戶動作的歷史信息,然后根據(jù)這些歷史信息得到用戶的行為模型并輸出到下游的數(shù)據(jù)存儲當中。
  在Spark Streaming中構(gòu)建此應(yīng)用程序時,我們首先需要獲取用戶動作流作為輸入(例如通過Kafka或Kinesis),然后使用mapWithState 方法對輸入進行轉(zhuǎn)換操作以生成用戶模型流,最后將處理后的數(shù)據(jù)流保存到數(shù)據(jù)存儲當中。
  在Spark Streaming中實現(xiàn)快速狀態(tài)流處理
  在Spark Streaming中使用狀態(tài)流處理進行用戶會話維護
  mapWithState方法可以通過下面的抽象方式進行理解,假設(shè)它是將用戶動作和當前用戶會話作為輸入的一個算子(operator),基于某個輸入動作,該算子能夠有選擇地更新用戶會話,然后輸出更新后的用戶模型作為下游操作的輸入。開發(fā)人員在定義mapWithState方法時可以指定該更新函數(shù)。
  現(xiàn)在讓我們轉(zhuǎn)入到具體代碼來說明,首先我們定義狀態(tài)數(shù)據(jù)結(jié)構(gòu)及狀態(tài)更新函數(shù):
  def stateUpdateFunction( userId: UserId, newData: UserAction, stateData: State[UserSession]): UserModel = { val currentSession = stateData.get()// 獲取當前會話數(shù)據(jù) val updatedSession = 。..// 使用newData計算更新后的會話 stateData.update(updatedSession) // 更新會話數(shù)據(jù) val userModel = 。..// 使用updatedSession計算模型 returnuserModel // 將模型發(fā)送給下游操作 }
  然后,在用戶動作DStream上定義mapWithState 方法,通過創(chuàng)建StateSpec對象來實現(xiàn),該對象中包含所有前述指定的操作。
  // 用去動作構(gòu)成的Stream,用戶ID作為key val userActions = 。..// key-value元組(UserId, UserAction)構(gòu)成的stream // 待提交的數(shù)據(jù)流 val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
  mapWithState的新特性和性能改進
  通過前面的例子,我們已經(jīng)明白其使用方式,現(xiàn)在讓我們再深入理解使用該新的API所帶來的特定優(yōu)勢。
  1. 原生支持會話超時
  許多基于會話的應(yīng)用程序要求具備超時機制,當某個會話在一定的時間內(nèi)(如用戶沒有顯式地注銷而結(jié)束會話)沒有接收到新數(shù)據(jù)時就應(yīng)該將其關(guān)閉,與使用updateStateByKey方法時需要手動進行編碼實現(xiàn)所不同的是,開發(fā)人員可以通過mapWithState方法直接指定其超時時間。
  userActions.mapWithState(StateSpec.function(stateUpdateFunction).timeout(Minutes(10)))
  除超時機制外,開發(fā)人員也可以設(shè)置程序啟動時的分區(qū)模式和初始狀態(tài)信息。
  2. 任意數(shù)據(jù)都能夠發(fā)送到下游
  與updateStateByKey方法不同,任意數(shù)據(jù)都可以通過狀態(tài)更新函數(shù)將數(shù)據(jù)發(fā)送到下游操作,這一點已經(jīng)在前面的例子中有說明(例如通過用戶會話狀態(tài)返回用戶模型),此外,最新狀態(tài)的快照也能夠被訪問。
  val userSessionSnapshots = userActions.mapWithState(statSpec).snapshotStream()
  變量userSessionSnapshots 為一個DStream,其中各個RDD為各批(batch)數(shù)據(jù)處理后狀態(tài)更新會話的快照,該DStream與updateStateByKey方法返回的DStream是等同的。
  3. 更高的性能
  最后,與updateStateByKey方法相比,使用mapWithState方法能夠得到6倍的低延遲同時維護的key狀態(tài)數(shù)量要多10倍,這一性能提升和擴展性可從后面的基準測試結(jié)果得到驗證,所有的結(jié)果全部在時間間隔為1秒的batch和相同大小的集群中生成。
  下圖比較的是mapWithState 方法和updateStateByKey 方法處理1秒的batch所消耗的平均時間,在本例中,我們?yōu)橥瑯訑?shù)量(從0.25~1百萬)的key保存其狀態(tài),然后以同樣的速率(30k個更新/s)對其進行更新,如下圖所示,mapWithState方法比updateStateByKey方法的處理時間快8倍,從而允許更低的端到端延遲。

非常好我支持^.^

(0) 0%

不好我反對

(0) 0%

在Spark Streaming中實現(xiàn)快速狀態(tài)流處理下載

相關(guān)電子資料下載

      發(fā)表評論

      用戶評論
      評價:好評中評差評

      發(fā)表評論,獲取積分! 請遵守相關(guān)規(guī)定!

      ?