KoP 架構(gòu)介紹
KoP 是 Kafka on Pulsar 的簡(jiǎn)寫,顧名思義就是如何在 Pulsar 上實(shí)現(xiàn)對(duì) Kafka 數(shù)據(jù)的讀寫。KoP 將 Kafka 協(xié)議處理插件引入 Pulsar Broker 來(lái)實(shí)現(xiàn) Apache Pulsar 對(duì) Apache Kafka 協(xié)議的支持。將 KoP 協(xié)議處理插件添加到現(xiàn)有 Pulsar 集群后,用戶不用修改代碼就可以將現(xiàn)有的 Kafka 應(yīng)用程序和服務(wù)遷移到 Pulsar。
Apache Pulsar 主要特點(diǎn)如下:
利用企業(yè)級(jí)多租戶特性簡(jiǎn)化運(yùn)營(yíng)。
避免數(shù)據(jù)搬遷,簡(jiǎn)化操作。
利用 Apache BookKeeper 和分層存儲(chǔ)持久保留事件流。
利用 Pulsar Functions 進(jìn)行無(wú)服務(wù)器化事件處理。
KoP 架構(gòu)如下圖,通過(guò)圖可以看到 KoP 引入一個(gè)新的協(xié)議處理插件,該協(xié)議處理插件利用 Pulsar 的現(xiàn)有組件(例如 Topic 發(fā)現(xiàn)、分布式日志庫(kù)-ManagedLedger、cursor 等)來(lái)實(shí)現(xiàn) Kafka 傳輸協(xié)議。

Routine Load 訂閱 Pulsar 數(shù)據(jù)思路
Apache Doris Routine Load 支持了將 Kafka 數(shù)據(jù)接入 Apache Doris,并保障了數(shù)據(jù)接入過(guò)程中的事務(wù)性操作。Apache Pulsar 定位為一個(gè)云原生時(shí)代企業(yè)級(jí)的消息發(fā)布和訂閱系統(tǒng),已經(jīng)在很多線上服務(wù)使用。那么 Apache Pulsar 用戶如何將數(shù)據(jù)接入 Apache Doris 呢,答案是通過(guò) KoP 實(shí)現(xiàn)。
由于 KoP 直接在 Pulsar 側(cè)提供了對(duì) Kafka 的兼容,那么對(duì)于 Apache Doris 來(lái)說(shuō)可以像使用 Kafka 一樣使用 Plusar。整個(gè)過(guò)程對(duì)于 Apache Doris 來(lái)說(shuō)無(wú)需任務(wù)改變,就能將 Pulsar 數(shù)據(jù)接入 Apache Doris,并且可以獲得 Routine Load 的事務(wù)性保障。
--------------------------
| Apache Doris |
| --------------- |
| | Routine Load | |
| --------------- |
--------------------------
|Kafka Protocol(librdkafka)
------------v--------------
| --------------- |
| | KoP | |
| --------------- |
| Apache Pulsar |
--------------------------
操作實(shí)踐
Pulsar Standalone 安裝環(huán)境準(zhǔn)備:
JDK 安裝:略
下載 Pulsar 二進(jìn)制包,并解壓:
#下載
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#解壓并進(jìn)入安裝目錄
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
組件編譯和安裝
1. 下載 KoP 源碼
git clone https://github.com/streamnative/kop.git
cd kop
2. 編譯 KoP 項(xiàng)目
mvn clean install -DskipTests
3. protocols 配置:在解壓后的 apache-pulsar 目錄下創(chuàng)建 protocols文 件夾,并把編譯好的 nar 包復(fù)制到 protocols 文件夾中。
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
4. 添加后的結(jié)果查看
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
KoP 配置添加
1. 在 standalone.conf 或者 broker.conf 添加如下配置
#kop適配的協(xié)議
messagingProtocols=kafka
#kop 的NAR文件路徑
protocolHandlerDirectory=。/protocols
#是否允許自動(dòng)創(chuàng)建topic
allowAutoTopicCreationType=partitioned
2. 添加如下服務(wù)監(jiān)聽配置
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
當(dāng)出現(xiàn)如下錯(cuò)誤:
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
添加如下配置,開啟 transactionCoordinatorEnabled
kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true
Pulsar 啟動(dòng)
#前臺(tái)啟動(dòng)
#bin/pulsar standalone
#后臺(tái)啟動(dòng)
pulsar-daemon start standalone
創(chuàng)建 Doris 數(shù)據(jù)庫(kù)和建表
#進(jìn)入Doris
mysql -u root -h 127.0.0.1 -P 9030
# 創(chuàng)建數(shù)據(jù)庫(kù)
create database pulsar_doris;
#切換數(shù)據(jù)庫(kù)
use pulsar_doris;
#創(chuàng)建clicklog表
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT “點(diǎn)擊時(shí)間”,
`type` String NOT NULL COMMENT “點(diǎn)擊類型”,
`id` VARCHAR(100) COMMENT “唯一id”,
`user` VARCHAR(100) COMMENT “用戶名稱”,
`city` VARCHAR(50) COMMENT “所在城市”
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
“replication_allocation” = “tag.location.default: 1”
);
創(chuàng)建 Routine Load 任務(wù)
CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
“desired_concurrent_number”=“3”,
“max_batch_interval” = “20”,
“max_batch_rows” = “300000”,
“max_batch_size” = “209715200”,
“strict_mode” = “false”,
“format” = “json”
)
FROM KAFKA
(
“kafka_broker_list” = “127.0.0.1:9092”,
“kafka_topic” = “test”,
“property.group.id” = “doris”
);
上述命令中的參數(shù)解釋如下:
pulsar_doris :Routine Load 任務(wù)所在的數(shù)據(jù)庫(kù)
load_from_pulsar_test:Routine Load 任務(wù)名稱
clicklog:Routine Load 任務(wù)的目標(biāo)表,也就是配置 Routine Load 任務(wù)將數(shù)據(jù)導(dǎo)入到 Doris 哪個(gè)表中。
strict_mode:導(dǎo)入是否為嚴(yán)格模式,這里設(shè)置為 False。
format:導(dǎo)入數(shù)據(jù)的類型,這里配置為 Json。
kafka_broker_list:Kafka Broker 服務(wù)的地址
kafka_broker_list:Kafka Topic 名稱,也就是同步哪個(gè) Topic 上的數(shù)據(jù)。
property.group.id:消費(fèi)組 ID
數(shù)據(jù)導(dǎo)入和測(cè)試
1. 數(shù)據(jù)導(dǎo)入 構(gòu)造一個(gè) ClickLog 的數(shù)據(jù)結(jié)構(gòu),并調(diào)用 Kafka 的 Producer 發(fā)送 5000 萬(wàn)條數(shù)據(jù)到 Pulsar。 ClickLog 數(shù)據(jù)結(jié)構(gòu)如下:
public class ClickLog {
private String id;
private String user;
private String city;
private String clickTime;
private String type;
。.. //省略getter和setter
}
消息構(gòu)造和發(fā)送的核心代碼邏輯如下:
String strDateFormat = “yyyy-MM-dd HHss”;
@Autowired
private Producer producer;
try {
for(int j =0 ; j《50000;j++){
int batchSize = 1000;
for(int i = 0 ; i《batchSize ;i++){
ClickLog clickLog = new ClickLog();
clickLog.setId(UUID.randomUUID().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
clickLog.setClickTime(simpleDateFormat.format(new Date()));
clickLog.setType(“webset”);
clickLog.setUser(“user”+ new Random().nextInt(1000) +i);
producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
}
}
} catch (Exception e) {
e.printStackTrace();
}
2. ROUTINE LOAD 任務(wù)查看執(zhí)行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;命令,查看導(dǎo)入任務(wù)的狀態(tài)。
mysql》 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;
*************************** 1. row ***************************
Id: 87873
Name: load_from_pulsar_test
CreateTime: 2022-05-31 1234
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:pulsar_doris
TableName: clicklog1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {“partitions”:“*”,“columnToColumnExpr”:“clickTime,id,type,user”,“maxBatchIntervalS”:“20”,“whereExpr”:“*”,“dataFormat”:“json”,“timezone”:“Europe/London”,“send_batch_parallelism”:“1”,“precedingFilter”:“*”,“mergeType”:“APPEND”,“format”:“json”,“json_root”:“”,“maxBatchSizeBytes”:“209715200”,“exec_mem_limit”:“2147483648”,“strict_mode”:“false”,“jsonpaths”:“”,“deleteCondition”:“*”,“desireTaskConcurrentNum”:“3”,“maxErrorNum”:“0”,“strip_outer_array”:“false”,“currentTaskConcurrentNum”:“1”,“execMemLimit”:“2147483648”,“num_as_string”:“false”,“fuzzy_parse”:“false”,“maxBatchRows”:“300000”}
DataSourceProperties: {“topic”:“test”,“currentKafkaPartitions”:“0”,“brokerList”:“127.0.0.1:9092”}
CustomProperties: {“group.id”:“doris”,“kafka_default_offsets”:“OFFSET_END”,“client.id”:“doris.client”}
Statistic: {“receivedBytes”:5739001913,“runningTxns”:[],“errorRows”:0,“committedTaskNum”:168,“l(fā)oadedRows”:50000000,“l(fā)oadRowsRate”:23000,“abortedTaskNum”:1,“errorRowsAfterResumed”:0,“totalRows”:50000000,“unselectedRows”:0,“receivedBytesRate”:2675000,“taskExecuteTimeMs”:2144799}
Progress: {“0”:“51139566”}
Lag: {“0”:0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
ERROR:
No query specified
從上面結(jié)果可以看到 totalRows 為 50000000,errorRows 為 0。說(shuō)明數(shù)據(jù)不丟不重的導(dǎo)入 Apache Doris 了。
3. 數(shù)據(jù)統(tǒng)計(jì)驗(yàn)證執(zhí)行如下命令統(tǒng)計(jì)表中的數(shù)據(jù),發(fā)現(xiàn)統(tǒng)計(jì)的結(jié)果也是 50000000,符合預(yù)期。
mysql》 select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql》
通過(guò) KoP 我們實(shí)現(xiàn)了將 Apache Pulsar 數(shù)據(jù)無(wú)縫接入 Apache Doris ,無(wú)需對(duì) Routine Load 任務(wù)進(jìn)行任何修改,并保障了數(shù)據(jù)導(dǎo)入過(guò)程中的事務(wù)性。與此同時(shí),Apache Doris 社區(qū)已經(jīng)啟動(dòng)了 Apache Pulsar 原生導(dǎo)入支持的設(shè)計(jì),相信在不久后就可以直接訂閱 Pulsar 中的消息數(shù)據(jù),并保證數(shù)據(jù)導(dǎo)入過(guò)程中的 Exactly-Once 語(yǔ)義。
審核編輯:郭婷
-
服務(wù)器
+關(guān)注
關(guān)注
14文章
10251瀏覽量
91480 -
代碼
+關(guān)注
關(guān)注
30文章
4967瀏覽量
73958
原文標(biāo)題:如何將Pulsar數(shù)據(jù)快速且無(wú)縫接入Apache Doris
文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
工業(yè)數(shù)據(jù)中臺(tái)支持接入MySQL數(shù)據(jù)庫(kù)嗎
如何將某個(gè)函數(shù)或變量放在固定的地址 ?
請(qǐng)問(wèn)編譯程序時(shí)如何將數(shù)據(jù)放入Flash固定地址?
臺(tái)灣偉斯掃碼槍通過(guò)RS232轉(zhuǎn)Profinet網(wǎng)關(guān)接入西門子1200 PLC的完整指南
工業(yè)自動(dòng)化通信方案:臺(tái)灣偉斯掃碼槍通過(guò)RS232轉(zhuǎn)Profinet網(wǎng)關(guān)接入西門子S7-1200 PLC系統(tǒng)詳解
使用NVIDIA GPU加速Apache Spark中Parquet數(shù)據(jù)掃描
Modbus TCP轉(zhuǎn)Profibus網(wǎng)關(guān)如何快速把流量計(jì)接入到DCS?
如何將K230 image轉(zhuǎn)成jpg?
485自由口轉(zhuǎn)profibus網(wǎng)關(guān)快速配置案例
用MCP將百度地圖能力輕松接入DeepSeek
NVIDIA加速的Apache Spark助力企業(yè)節(jié)省大量成本
利用KoP如何將Pulsar數(shù)據(jù)快速且無(wú)縫接入Apache Doris
評(píng)論