作者:京東物流 吳云濤
前言
提交一個DataSteam 的 Flink應用,需要經(jīng)過 StreamGraph、JobGraph、ExecutionGraph 三個階段的轉換生成可成執(zhí)行的有向無環(huán)圖(DAG),并在 Flink 集群上運行。而提交一個 Flink SQL 應用,其執(zhí)行流程也類似,只是多了一步使用 flink-table-planer 模塊從SQL轉換成 StreamGraph 的過程。以下是利用Flink的 StreamGraph 通過低代碼的方式,來實現(xiàn)StreamGraph的生成,并最終實現(xiàn) Flink 程序零代碼開發(fā)的解決方案。
一、Flink 相關概念
在Flink程序中,每個算子被稱作Operator,通過各個算子的處理最終得到期望的加工后數(shù)據(jù)。比如下面這段程序中,增加了Source, Fiter, Map, Sink 4個算子。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env.addSource(new FlinkKafkaConsumer("topic")); DataStream filteredStream = dataStream.filter(new FilterFunction() { @Override public boolean filter(Object value) throws Exception {return true;} }); DataStream mapedStream = filteredStream.map(new MapFunction() { @Override public Object map(Object value) throws Exception {return value;} }); mapedStream.addSink(new DiscardingSink()); env.execute("test-job");
StreamGraph
Flink的邏輯執(zhí)行圖,描述了整個流處理任務的流程和數(shù)據(jù)流轉遞規(guī)則,包括了數(shù)據(jù)源(Source)、轉換算子(Transform)、數(shù)據(jù)目的端(Sink)等元素,以及它們之間的依賴關系和傳輸規(guī)則。StreamGraph是通過Flink的API或者DSL來構建的向無環(huán)圖(DAG),它與JobGraph之間是一一對應的關系。StreamGraph中的頂點稱為streamNode,是用來表示Operator算子的類,包含了算子uid、并行度,是否共享slot(SlotSharingGroup)等信息。邊稱作streamEdge。通過StreamingJobGraphGenerator類生成JobGraph。

JobGraph
StreamGraph 經(jīng)過 flink-optimizer 模塊優(yōu)化后生成 JobGraph。生成 JobGraph 時,會將多個滿足條件的算子chain 鏈接到一起作為一個頂點(JobVertex), 在運行時對應1個 Task。Task 是 Flink 程序的基本執(zhí)行單元,任務調度時將Task分配到TaskManager上執(zhí)行。

ExecutionGraph
物理執(zhí)行圖是由JobGraph轉換而來,描述了整個流處理任務的物理執(zhí)行細節(jié),包括了任務的調度、任務的執(zhí)行順序、任務之間的數(shù)據(jù)傳輸、任務的狀態(tài)管理等。Task會在步驟中拆分為多個SubTask。對應Task中的每個并行度。

Physical Graph
PhysicalGraph是在執(zhí)行時的ExecutionGraph。ExecutionGraph中的每一個頂點ExecutionJobVertex都對應一個或多個頂點ExecutionVertex,它們是物理執(zhí)行圖中的節(jié)點。
二、畫布模式實現(xiàn)思路
實現(xiàn)流程
首先,我們采用畫布模式(拖拉拽方式)來實現(xiàn)Flink程序的組裝,將極大程度上方便我們復用部分加工的算子,最終實現(xiàn)零代碼的Flink應用開發(fā)。我們通過繪圖的方式,直接將內置的算子繪制在圖標上。如下所示:

構建有向無環(huán)圖(DAG),并持久化。通過拖拉拽的方式(畫布模式)構建你的Flink應用,后端的持久化存儲采用鄰接表方式。我們在 mysql 關系數(shù)據(jù)庫中將 Node(算子:Source、Sink、中間加工邏輯算子)存儲到 flink_node 表中;將邊存到一張 flink_realation 表中。
重新組將Flink作業(yè)
要組裝以上畫布模式的Flink應用,首先需要初始化好 StreamExecutionEnvironment 相關參數(shù),其次將上述表中的 flink_node 和flink_edge 轉化為DataStream,并將轉化出的 DataStream 合理地拼接成一個 DataStream API Flink 應用程序。
在將flink_node、flink_edge轉為為DataStream時選擇何種遍歷算法來組裝呢?我們知道有向無環(huán)圖的遍歷最常用的有:深度優(yōu)先遍歷(DFS)和廣度優(yōu)先遍歷(BFS)。這里我們采用了BFS算法+層序遍歷的方式,BFS便于在組裝的過程中將已visit到的node節(jié)點拼裝到其parent 的節(jié)點上。
總結
在實際的實現(xiàn)過程中,遇到的問題往往比以上復雜很多。比如需要將更多的信息存儲在node節(jié)點和edge邊上。node上需要存儲并行度、算子處理前后的表schema等;edge需要存儲keyby的字段、上下游之間的數(shù)據(jù)shuffle的方式等等。此外在內置的算子無法滿足用戶需求時,還需要考慮如何友好的支持自定義算子(UDF)的嵌入等問題。
審核編輯 黃宇
-
開發(fā)
+關注
關注
0文章
378瀏覽量
42148 -
代碼
+關注
關注
30文章
4968瀏覽量
73984
發(fā)布評論請先 登錄
隔空科技聯(lián)合涂鴉智能推出微波雷達感應燈零代碼實現(xiàn)方案
什么是零代碼平臺?
RA-RTT體驗零代碼點亮LED燈
什么是零代碼應用開發(fā)平臺?它有哪些功能模塊
實現(xiàn)零代碼開發(fā)還需要多長時間
零代碼開發(fā)平臺工作原理
零代碼與低代碼快速開發(fā)平臺有什么區(qū)別
零代碼開發(fā)平臺能夠給企業(yè)帶來哪些好處
淺談零代碼開發(fā)的價值在哪里
零代碼平臺和低代碼平臺分別適合開發(fā)哪些應用程序
零代碼開發(fā)平臺為什么會受到企業(yè)管理者的歡迎
零代碼如何實現(xiàn)造數(shù)據(jù)
基于圖遍歷的Flink任務畫布模式下零代碼開發(fā)實現(xiàn)方案
評論