Kafka生產(chǎn)環(huán)境應(yīng)用方案:高可用集群部署與運(yùn)維實(shí)戰(zhàn)
架構(gòu)圖
┌─────────────────────────────────────────────────────────────────────────────────┐ │ Kafka生產(chǎn)環(huán)境架構(gòu) │ ├─────────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Producer1 │ │ Producer2 │ │ Producer3 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └─────────────────┼─────────────────┘ │ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ Kafka Cluster │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Broker1 │ │ Broker2 │ │ Broker3 │ │ Broker4 │ │ │ │ │ │192.168.1.11 │ │192.168.1.12 │ │192.168.1.13 │ │192.168.1.14 │ │ │ │ │ │ Port:9092 │ │ Port:9092 │ │ Port:9092 │ │ Port:9092 │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ ZooKeeper Cluster │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ ZK1 │ │ ZK2 │ │ ZK3 │ │ │ │ │ │192.168.1.21 │ │192.168.1.22 │ │192.168.1.23 │ │ │ │ │ │ Port:2181 │ │ Port:2181 │ │ Port:2181 │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Consumer1 │ │ Consumer2 │ │ Consumer3 │ │ │ │ (Group A) │ │ (Group B) │ │ (Group C) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ 監(jiān)控系統(tǒng) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Prometheus │ │ Grafana │ │ Kafka │ │ │ │ │ │ Metrics │ │ Dashboard │ │ Manager │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────────┘
引言
Apache Kafka作為分布式流處理平臺(tái),在現(xiàn)代大數(shù)據(jù)架構(gòu)中扮演著消息中間件的核心角色。本文將從運(yùn)維工程師的角度,詳細(xì)介紹Kafka在生產(chǎn)環(huán)境中的部署方案、配置優(yōu)化、監(jiān)控運(yùn)維等關(guān)鍵技術(shù)。通過(guò)實(shí)戰(zhàn)案例和代碼示例,幫助運(yùn)維團(tuán)隊(duì)構(gòu)建穩(wěn)定、高效的Kafka集群。
1. Kafka集群自動(dòng)化部署
1.1 ZooKeeper集群部署腳本
#!/bin/bash
# ZooKeeper集群自動(dòng)化部署腳本
set-e
ZK_VERSION="3.8.1"
ZK_NODES=("192.168.1.21""192.168.1.22""192.168.1.23")
ZK_DATA_DIR="/data/zookeeper"
ZK_LOG_DIR="/logs/zookeeper"
# 創(chuàng)建ZooKeeper用戶(hù)
useradd -r -s /bin/false zookeeper
# 下載安裝ZooKeeper
install_zookeeper() {
cd/tmp
wget https://archive.apache.org/dist/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz
tar -xzf apache-zookeeper-${ZK_VERSION}-bin.tar.gz
mvapache-zookeeper-${ZK_VERSION}-bin /opt/zookeeper
chown-R zookeeper:zookeeper /opt/zookeeper
}
# 配置ZooKeeper
configure_zookeeper() {
localnode_id=$1
localnode_ip=$2
# 創(chuàng)建數(shù)據(jù)目錄
mkdir-p${ZK_DATA_DIR}${ZK_LOG_DIR}
chown-R zookeeper:zookeeper${ZK_DATA_DIR}${ZK_LOG_DIR}
# 設(shè)置節(jié)點(diǎn)ID
echo${node_id}>${ZK_DATA_DIR}/myid
# 生成配置文件
cat> /opt/zookeeper/conf/zoo.cfg < /etc/systemd/system/zookeeper.service <
ZooKeeper作為Kafka的協(xié)調(diào)服務(wù),需要奇數(shù)個(gè)節(jié)點(diǎn)組成集群以保證高可用性。通過(guò)自動(dòng)化腳本可以快速部署標(biāo)準(zhǔn)化的ZooKeeper環(huán)境。
1.2 Kafka集群部署配置
#!/bin/bash
# Kafka集群部署腳本
KAFKA_VERSION="2.8.2"
KAFKA_NODES=("192.168.1.11""192.168.1.12""192.168.1.13""192.168.1.14")
KAFKA_DATA_DIR="/data/kafka"
KAFKA_LOG_DIR="/logs/kafka"
# 安裝Kafka
install_kafka() {
cd/tmp
wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-${KAFKA_VERSION}.tgz
tar -xzf kafka_2.13-${KAFKA_VERSION}.tgz
mvkafka_2.13-${KAFKA_VERSION}/opt/kafka
# 創(chuàng)建kafka用戶(hù)
useradd -r -s /bin/false kafka
chown-R kafka:kafka /opt/kafka
# 創(chuàng)建數(shù)據(jù)目錄
mkdir-p${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}
chown-R kafka:kafka${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}
}
# 生成Kafka服務(wù)器配置
generate_kafka_config() {
localbroker_id=$1
localnode_ip=$2
cat> /opt/kafka/config/server.properties < /etc/systemd/system/kafka.service <
2. 生產(chǎn)環(huán)境性能優(yōu)化
2.1 生產(chǎn)者性能調(diào)優(yōu)
#!/usr/bin/env python3
# Kafka生產(chǎn)者性能優(yōu)化配置
fromkafkaimportKafkaProducer
importjson
importtime
importthreading
fromconcurrent.futuresimportThreadPoolExecutor
classOptimizedKafkaProducer:
def__init__(self, bootstrap_servers, topic):
self.topic = topic
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
# 性能優(yōu)化配置
batch_size=16384, # 批處理大小
linger_ms=10, # 延遲發(fā)送時(shí)間
buffer_memory=33554432, # 緩沖區(qū)大小32MB
compression_type='snappy', # 壓縮算法
max_in_flight_requests_per_connection=5,
retries=3, # 重試次數(shù)
retry_backoff_ms=100,
request_timeout_ms=30000,
# 序列化配置
value_serializer=lambdav: json.dumps(v).encode('utf-8'),
key_serializer=lambdak:str(k).encode('utf-8')
)
defsend_message_sync(self, key, value):
"""同步發(fā)送消息"""
try:
future =self.producer.send(self.topic, key=key, value=value)
record_metadata = future.get(timeout=10)
return{
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset
}
exceptExceptionase:
print(f"發(fā)送消息失敗:{e}")
returnNone
defsend_message_async(self, key, value, callback=None):
"""異步發(fā)送消息"""
try:
future =self.producer.send(self.topic, key=key, value=value)
ifcallback:
future.add_callback(callback)
returnfuture
exceptExceptionase:
print(f"發(fā)送消息失敗:{e}")
returnNone
defbatch_send_performance_test(self, message_count=100000):
"""批量發(fā)送性能測(cè)試"""
start_time = time.time()
# 使用線(xiàn)程池并發(fā)發(fā)送
withThreadPoolExecutor(max_workers=10)asexecutor:
futures = []
foriinrange(message_count):
message = {
'id': i,
'timestamp': time.time(),
'data':f'test_message_{i}',
'source':'performance_test'
}
future = executor.submit(self.send_message_async,str(i), message)
futures.append(future)
# 等待所有消息發(fā)送完成
forfutureinfutures:
try:
future.result(timeout=30)
exceptExceptionase:
print(f"消息發(fā)送異常:{e}")
# 確保所有消息都發(fā)送出去
self.producer.flush()
end_time = time.time()
duration = end_time - start_time
throughput = message_count / duration
print(f"發(fā)送{message_count}條消息")
print(f"總耗時(shí):{duration:.2f}秒")
print(f"吞吐量:{throughput:.2f}消息/秒")
defclose(self):
self.producer.close()
# 使用示例
if__name__ =="__main__":
producer = OptimizedKafkaProducer(
bootstrap_servers=['192.168.1.11:9092','192.168.1.12:9092'],
topic='performance_test'
)
# 執(zhí)行性能測(cè)試
producer.batch_send_performance_test(50000)
producer.close()
2.2 消費(fèi)者性能優(yōu)化
#!/usr/bin/env python3
# Kafka消費(fèi)者性能優(yōu)化配置
fromkafkaimportKafkaConsumer
importjson
importtime
importthreading
fromconcurrent.futuresimportThreadPoolExecutor
classOptimizedKafkaConsumer:
def__init__(self, topics, group_id, bootstrap_servers):
self.topics = topics
self.group_id = group_id
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
# 性能優(yōu)化配置
fetch_min_bytes=1024, # 最小拉取字節(jié)數(shù)
fetch_max_wait_ms=500, # 最大等待時(shí)間
max_poll_records=500, # 單次拉取最大記錄數(shù)
max_poll_interval_ms=300000, # 最大輪詢(xún)間隔
session_timeout_ms=30000, # 會(huì)話(huà)超時(shí)時(shí)間
heartbeat_interval_ms=10000, # 心跳間隔
# 消費(fèi)策略
auto_offset_reset='earliest',
enable_auto_commit=False, # 手動(dòng)提交偏移量
# 反序列化配置
value_deserializer=lambdam: json.loads(m.decode('utf-8')),
key_deserializer=lambdam: m.decode('utf-8')ifmelseNone
)
defconsume_messages_batch(self, batch_size=100, timeout=5000):
"""批量消費(fèi)消息"""
message_batch = []
try:
# 批量拉取消息
message_pack =self.consumer.poll(timeout_ms=timeout)
fortopic_partition, messagesinmessage_pack.items():
formessageinmessages:
message_batch.append({
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'key': message.key,
'value': message.value,
'timestamp': message.timestamp
})
iflen(message_batch) >= batch_size:
# 處理批量消息
self.process_message_batch(message_batch)
message_batch = []
# 處理剩余消息
ifmessage_batch:
self.process_message_batch(message_batch)
# 手動(dòng)提交偏移量
self.consumer.commit()
exceptExceptionase:
print(f"消費(fèi)消息異常:{e}")
defprocess_message_batch(self, messages):
"""批量處理消息"""
withThreadPoolExecutor(max_workers=5)asexecutor:
futures = []
formessageinmessages:
future = executor.submit(self.process_single_message, message)
futures.append(future)
# 等待所有消息處理完成
forfutureinfutures:
try:
future.result(timeout=30)
exceptExceptionase:
print(f"處理消息異常:{e}")
defprocess_single_message(self, message):
"""處理單條消息"""
try:
# 模擬業(yè)務(wù)處理
time.sleep(0.001)
# 記錄處理日志
print(f"處理消息: Topic={message['topic']}, "
f"Partition={message['partition']}, "
f"Offset={message['offset']}")
exceptExceptionase:
print(f"處理單條消息異常:{e}")
defstart_consuming(self):
"""開(kāi)始消費(fèi)消息"""
print(f"開(kāi)始消費(fèi)主題:{self.topics}")
try:
whileTrue:
self.consume_messages_batch()
exceptKeyboardInterrupt:
print("停止消費(fèi)")
finally:
self.consumer.close()
# 使用示例
if__name__ =="__main__":
consumer = OptimizedKafkaConsumer(
topics=['performance_test'],
group_id='performance_consumer_group',
bootstrap_servers=['192.168.1.11:9092','192.168.1.12:9092']
)
consumer.start_consuming()
3. 監(jiān)控與運(yùn)維自動(dòng)化
3.1 Kafka集群監(jiān)控腳本
#!/bin/bash
# Kafka集群監(jiān)控腳本
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
ALERT_EMAIL="admin@company.com"
LOG_FILE="/var/log/kafka_monitor.log"
# 檢查Kafka集群狀態(tài)
check_kafka_cluster() {
echo"$(date): 檢查Kafka集群狀態(tài)">>$LOG_FILE
# 檢查broker列表
broker_list=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server${KAFKA_BROKERS}2>/dev/null | grep -c"id:")
if["$broker_list"-lt 3 ];then
echo"ALERT: Kafka集群可用broker不足:$broker_list"| mail -s"Kafka Cluster Alert"$ALERT_EMAIL
echo"$(date): ALERT - 可用broker不足:$broker_list">>$LOG_FILE
fi
}
# 檢查主題狀態(tài)
check_topic_health() {
echo"$(date): 檢查主題健康狀態(tài)">>$LOG_FILE
# 獲取主題列表
topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server${KAFKA_BROKERS}--list)
fortopicin$topics;do
# 檢查主題描述
topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server${KAFKA_BROKERS}--describe --topic$topic)
# 檢查是否有離線(xiàn)分區(qū)
offline_partitions=$(echo"$topic_desc"| grep -c"Leader: -1")
if["$offline_partitions"-gt 0 ];then
echo"ALERT: 主題$topic有$offline_partitions個(gè)離線(xiàn)分區(qū)"| mail -s"Kafka Topic Alert"$ALERT_EMAIL
echo"$(date): ALERT - 主題$topic離線(xiàn)分區(qū):$offline_partitions">>$LOG_FILE
fi
done
}
# 檢查消費(fèi)者組延遲
check_consumer_lag() {
echo"$(date): 檢查消費(fèi)者組延遲">>$LOG_FILE
# 獲取消費(fèi)者組列表
consumer_groups=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server${KAFKA_BROKERS}--list)
forgroupin$consumer_groups;do
# 獲取消費(fèi)者組詳情
group_desc=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server${KAFKA_BROKERS}--describe --group$group)
# 檢查延遲
max_lag=$(echo"$group_desc"| awk'NR>1 {print $5}'| grep -v"-"|sort-n |tail-1)
if[ -n"$max_lag"] && ["$max_lag"-gt 10000 ];then
echo"ALERT: 消費(fèi)者組$group最大延遲:$max_lag"| mail -s"Kafka Consumer Lag Alert"$ALERT_EMAIL
echo"$(date): ALERT - 消費(fèi)者組$group延遲過(guò)高:$max_lag">>$LOG_FILE
fi
done
}
# 收集性能指標(biāo)
collect_metrics() {
echo"$(date): 收集Kafka性能指標(biāo)">>$LOG_FILE
# 收集JVM指標(biāo)
forbrokerin192.168.1.11 192.168.1.12 192.168.1.13;do
kafka_pid=$(ssh$broker"pgrep -f kafka")
if[ -n"$kafka_pid"];then
# 內(nèi)存使用率
memory_usage=$(ssh$broker"ps -p$kafka_pid-o %mem --no-headers")
echo"$(date): Broker$broker內(nèi)存使用率:$memory_usage%">>$LOG_FILE
# CPU使用率
cpu_usage=$(ssh$broker"ps -p$kafka_pid-o %cpu --no-headers")
echo"$(date): Broker$brokerCPU使用率:$cpu_usage%">>$LOG_FILE
fi
done
}
# 主監(jiān)控循環(huán)
whiletrue;do
check_kafka_cluster
check_topic_health
check_consumer_lag
collect_metrics
sleep300 # 5分鐘檢查一次
done
3.2 自動(dòng)化運(yùn)維腳本
#!/usr/bin/env python3
# Kafka自動(dòng)化運(yùn)維腳本
importsubprocess
importjson
importsmtplib
fromemail.mime.textimportMIMEText
fromdatetimeimportdatetime
importlogging
classKafkaOperations:
def__init__(self, kafka_home, brokers):
self.kafka_home = kafka_home
self.brokers = brokers
self.logger =self.setup_logger()
defsetup_logger(self):
"""設(shè)置日志記錄"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/kafka_operations.log'),
logging.StreamHandler()
]
)
returnlogging.getLogger(__name__)
defcreate_topic(self, topic_name, partitions=3, replication_factor=2):
"""創(chuàng)建主題"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server",self.brokers,
"--create",
"--topic", topic_name,
"--partitions",str(partitions),
"--replication-factor",str(replication_factor)
]
result = subprocess.run(cmd, capture_output=True, text=True)
ifresult.returncode ==0:
self.logger.info(f"成功創(chuàng)建主題:{topic_name}")
returnTrue
else:
self.logger.error(f"創(chuàng)建主題失敗:{result.stderr}")
returnFalse
exceptExceptionase:
self.logger.error(f"創(chuàng)建主題異常:{e}")
returnFalse
defdelete_topic(self, topic_name):
"""刪除主題"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server",self.brokers,
"--delete",
"--topic", topic_name
]
result = subprocess.run(cmd, capture_output=True, text=True)
ifresult.returncode ==0:
self.logger.info(f"成功刪除主題:{topic_name}")
returnTrue
else:
self.logger.error(f"刪除主題失敗:{result.stderr}")
returnFalse
exceptExceptionase:
self.logger.error(f"刪除主題異常:{e}")
returnFalse
defincrease_partitions(self, topic_name, new_partition_count):
"""增加分區(qū)數(shù)"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server",self.brokers,
"--alter",
"--topic", topic_name,
"--partitions",str(new_partition_count)
]
result = subprocess.run(cmd, capture_output=True, text=True)
ifresult.returncode ==0:
self.logger.info(f"成功增加主題{topic_name}分區(qū)數(shù)到{new_partition_count}")
returnTrue
else:
self.logger.error(f"增加分區(qū)失敗:{result.stderr}")
returnFalse
exceptExceptionase:
self.logger.error(f"增加分區(qū)異常:{e}")
returnFalse
defrebalance_partitions(self, topic_name):
"""重新平衡分區(qū)"""
try:
# 生成重平衡計(jì)劃
reassignment_file =f"/tmp/reassignment-{topic_name}.json"
# 獲取當(dāng)前分區(qū)分配
cmd_current = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server",self.brokers,
"--describe",
"--topic", topic_name
]
current_result = subprocess.run(cmd_current, capture_output=True, text=True)
ifcurrent_result.returncode ==0:
# 生成重平衡計(jì)劃
cmd_generate = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server",self.brokers,
"--topics-to-move-json-file","/tmp/topics.json",
"--broker-list","0,1,2,3",
"--generate"
]
# 執(zhí)行重平衡
cmd_execute = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server",self.brokers,
"--reassignment-json-file", reassignment_file,
"--execute"
]
self.logger.info(f"開(kāi)始重平衡主題:{topic_name}")
returnTrue
else:
self.logger.error(f"獲取主題信息失敗:{current_result.stderr}")
returnFalse
exceptExceptionase:
self.logger.error(f"重平衡異常:{e}")
returnFalse
defbackup_consumer_offsets(self, group_id):
"""備份消費(fèi)者偏移量"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server",self.brokers,
"--describe",
"--group", group_id
]
result = subprocess.run(cmd, capture_output=True, text=True)
ifresult.returncode ==0:
backup_file =f"/backup/consumer_offsets_{group_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
withopen(backup_file,'w')asf:
f.write(result.stdout)
self.logger.info(f"成功備份消費(fèi)者組{group_id}偏移量到{backup_file}")
returnTrue
else:
self.logger.error(f"備份偏移量失敗:{result.stderr}")
returnFalse
exceptExceptionase:
self.logger.error(f"備份偏移量異常:{e}")
returnFalse
# 使用示例
if__name__ =="__main__":
kafka_ops = KafkaOperations(
kafka_home="/opt/kafka",
brokers="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
)
# 創(chuàng)建主題
kafka_ops.create_topic("test_topic", partitions=6, replication_factor=3)
# 增加分區(qū)
kafka_ops.increase_partitions("test_topic",12)
# 備份消費(fèi)者偏移量
kafka_ops.backup_consumer_offsets("test_consumer_group")
4. 高可用與故障恢復(fù)
4.1 集群健康檢查
#!/bin/bash
# Kafka集群健康檢查與自動(dòng)恢復(fù)
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
# 檢查并修復(fù)不同步副本
check_and_fix_isr() {
echo"檢查不同步副本..."
# 獲取所有主題
topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server${KAFKA_BROKERS}--list)
fortopicin$topics;do
# 檢查主題詳情
topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server${KAFKA_BROKERS}--describe --topic$topic)
# 檢查ISR不足的分區(qū)
isr_issues=$(echo"$topic_desc"| grep -E"Isr:|Replicas:"| awk'{
if ($1 == "Replicas:") replicas = NF-1;
if ($1 == "Isr:") isr = NF-1;
if (isr < replicas) print "ISR不足"
? ? ? ? }')
? ? ? ??
? ? ? ??if?[ -n?"$isr_issues"?];?then
? ? ? ? ? ??echo"主題?$topic?存在ISR不足問(wèn)題,嘗試修復(fù)..."
? ? ? ? ? ??
? ? ? ? ? ??# 觸發(fā)首選副本選舉
? ? ? ? ? ??${KAFKA_HOME}/bin/kafka-leader-election.sh --bootstrap-server?${KAFKA_BROKERS}?--election-type preferred --topic?$topic
? ? ? ??fi
? ??done
}
# 自動(dòng)故障恢復(fù)
auto_recovery() {
? ??echo"執(zhí)行自動(dòng)故障恢復(fù)..."
? ??
? ??# 重啟失敗的broker
? ??for?broker?in?192.168.1.11 192.168.1.12 192.168.1.13;?do
? ? ? ??if?! ssh?$broker"systemctl is-active kafka"?> /dev/null 2>&1;then
echo"重啟broker:$broker"
ssh$broker"systemctl restart kafka"
sleep30
fi
done
# 檢查并修復(fù)ISR
check_and_fix_isr
# 驗(yàn)證集群狀態(tài)
validate_cluster_state
}
validate_cluster_state() {
echo"驗(yàn)證集群狀態(tài)..."
# 檢查所有broker是否在線(xiàn)
online_brokers=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server${KAFKA_BROKERS}2>/dev/null | grep -c"id:")
if["$online_brokers"-eq 3 ];then
echo"集群恢復(fù)正常,所有broker在線(xiàn)"
else
echo"集群恢復(fù)失敗,在線(xiàn)broker數(shù)量:$online_brokers"
return1
fi
}
# 執(zhí)行健康檢查和恢復(fù)
auto_recovery
總結(jié)
Kafka生產(chǎn)環(huán)境部署涉及多個(gè)關(guān)鍵環(huán)節(jié):集群架構(gòu)設(shè)計(jì)、性能參數(shù)調(diào)優(yōu)、監(jiān)控體系建設(shè)、自動(dòng)化運(yùn)維等。通過(guò)本文介紹的方案,運(yùn)維工程師可以構(gòu)建穩(wěn)定、高效的Kafka集群。關(guān)鍵要點(diǎn)包括:合理的集群規(guī)模規(guī)劃、科學(xué)的配置參數(shù)調(diào)優(yōu)、完善的監(jiān)控告警機(jī)制、可靠的故障恢復(fù)策略。在實(shí)際生產(chǎn)環(huán)境中,還需要根據(jù)具體業(yè)務(wù)場(chǎng)景進(jìn)行針對(duì)性?xún)?yōu)化,持續(xù)監(jiān)控和改進(jìn)系統(tǒng)性能,確保消息隊(duì)列服務(wù)的穩(wěn)定性和可靠性。
-
集群
+關(guān)注
關(guān)注
0文章
142瀏覽量
17659 -
腳本
+關(guān)注
關(guān)注
1文章
409瀏覽量
29186 -
kafka
+關(guān)注
關(guān)注
0文章
55瀏覽量
5569
原文標(biāo)題:Kafka生產(chǎn)環(huán)境應(yīng)用方案:高可用集群部署與運(yùn)維實(shí)戰(zhàn)
文章出處:【微信號(hào):magedu-Linux,微信公眾號(hào):馬哥Linux運(yùn)維】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
Kafka工作流程及文件存儲(chǔ)機(jī)制
基于閃存存儲(chǔ)的Apache Kafka性能提升方法
Kafka基礎(chǔ)入門(mén)文檔
Kafka幾個(gè)比較重要的配置參數(shù)
Kafka集群環(huán)境的搭建
基于臭氧的Kafka自適應(yīng)調(diào)優(yōu)方法ENLHS
Kafka的概念及Kafka的宕機(jī)
Kafka 的簡(jiǎn)介
物通博聯(lián)5G-kafka工業(yè)網(wǎng)關(guān)實(shí)現(xiàn)kafka協(xié)議對(duì)接到云平臺(tái)
Spring Kafka的各種用法
Kafka架構(gòu)技術(shù):Kafka的架構(gòu)和客戶(hù)端API設(shè)計(jì)
Kafka生產(chǎn)環(huán)境應(yīng)用方案
評(píng)論