如何為Kafka集群選擇合適的主題和分區數量
如何決定kafka集群中topic,partition的數量,這是許多kafka用戶經常遇到的問題。本文列舉闡述幾個重要的決定因素,以提供一些參考。
分區多吞吐量更高
一個話題topic的各個分區partiton之間是并行的。在producer和broker方面,寫不同的分區是完全并行的。因此一些昂貴的操作比如壓縮,可以獲得更多的資源,因為有多個進程。在consumer方面,一個分區的數據可以由一個consumer線程在拉去數據。分區多,并行的consumer(同一個消費組)也可以多。因此通常,分區越多吞吐量越高。
基于吞吐量可以獲得一個粗略的計算公式。先測量得到在只有一個分區的情況下,Producer的吞吐量(P)和Consumer的吞吐量(C)。那如果總的目標吞吐量是T的話,max(T/P,T/C)就是需要的最小分區數。在單分區的情況下,Producer的吞吐量可以通過一些配置參數,比如bath的大小、副本的數量、壓縮格式、ack類型來測得。而Consumer的吞吐量通常取決于應用程序處理每一天消息邏輯。這些都是需要切合實際測量。
隨著時間推移數據量的增長可能會需要增加分區。有一點Producer者發布消息通過key取哈希后映射分發到一個指定的分區,當分區數發生變化后,會帶來key和分區映射關系發生變化。可能某些應用程序依賴key和分區映射關系,映射關系變化了,程序就需要做相應的調整。為了避免這種key和分區關系帶來的應用程序修改。所以在分區的時候盡量提前考慮,未來一年或兩年的對分區數據量的要求。
除了吞吐量,還有一些其他的因素,在定分區的數目時是值得考慮的。在某些情況下,太多的分區也可能會產生負面影響。
分區多需要的打開的文件句柄也多
每個分區都映射到broker上的一個目錄,每個log片段都會有兩個文件(一個是索引文件,另一個是實際的數據文件)。分區越多所需要的文件句柄也就越多,可以通過配置操作系統的參數增加打開文件句柄數。
分區多增加了不可用風險
kafka支持主備復制,具備更高的可用性和持久性。一個分區(partition)可以有多個副本,這些副本保存在不同的broker上。每個分區的副本中都會有一個作為Leader。當一個broker失敗時,Leader在這臺broker上的分區都會變得不可用,kafka會自動移除Leader,再其他副本中選一個作為新的Leader。Producer和Consumer都只會與Leader相連。
一般情況下,當一個broker被正常關機時,controller主動地將Leader從正在關機的broker上移除。移動一個Leader只需要幾毫秒。然當broker出現異常導致關機時,不可用會與分區數成正比。假設一個boker上有2000個分區,每個分區有2個副本,那這樣一個boker大約有1000個Leader,當boker異常宕機,會同時有1000個分區變得不可用。假設恢復一個分區需要5ms,1000個分區就要5s。
分區越多,在broker異常宕機的情況,恢復所需時間會越長,不可用風險會增加。
分區多會增加點到點的延遲
這個延遲需要體現在兩個boker間主備數據同步。在默認情況下,兩個boker只有一個線程負責數據的復制。
根據經驗,每個boker上的分區限制在100*b*r內(b指集群內boker的數量,r指副本數量)。
分區多會增加客戶端的內存消耗
kafka0.8.2后有個比較好的特色,新的Producer可以允許用戶設置一個緩沖區,緩存一定量的數據。當緩沖區數據到達設定量或者到時間,數據會從緩存區刪除發往broker。如果分區很多,每個分區都緩存一定量的數據量在緩沖區,很可能會占用大量的內存,甚至超過系統內存。
Consumer也存在同樣的問題,會從每個分區拉一批數據回來,分區越多,所需內存也就越大。
根據經驗,應該給每個分區分配至少幾十KB的內存。
總結
在通常情況下,增加分區可以提供kafka集群的吞吐量。也應該意識到集群的總分區數或是單臺服務器上的分區數過多,會增加不可用及延遲的風險。
Kafka 源碼解析之 Topic 的新建/擴容/刪除
[TOC]
本篇接著講述 Controller的功能方面的內容,在 Kafka中,一個 Topic的新建、擴容或者刪除都是由 Controller來操作的,本篇文章也是主要聚焦在 Topic的操作處理上(新建、擴容、刪除),實際上 Topic的創建在 Kafka源碼解析之 topic創建過程(三)中已經講述過了,本篇與前面不同的是,本篇主要是從 Controller角度來講述,而且是把新建、擴容、刪除這三個 Topic級別的操作放在一起做一個總結。
這里把 Topic新建與擴容放在一起講解,主要是因為無論 Topic是新建還是擴容,在 Kafka內部其實都是 Partition的新建,底層的實現機制是一樣的,Topic的新建與擴容的整體流程如下圖所示:
Topic新建與擴容觸發條件的不同如下所示:
下面開始詳細講述這兩種情況。
Topic擴容
Kafka提供了 Topic擴容工具,假設一個 Topic(topic_test)只有一個 partition,這時候我們想把它擴容到兩個 Partition,可以通過下面兩個命令來實現:
這兩種方法的區別是:第二種方法直接指定了要擴容的 Partition 2的副本需要分配到哪臺機器上,這樣的話我們可以精確控制到哪些 Topic放下哪些機器上。
無論是使用哪種方案,上面兩條命令產生的結果只有一個,將 Topic各個 Partition的副本寫入到 ZK對應的節點上,這樣的話/brokers/topics/topic_test節點的內容就會發生變化,PartitionModificationsListener監聽器就會被觸發,該監聽器的處理流程如下:
其 doHandleDataChange()方法的處理流程如下:
下面我們看下 onNewPartitionCreation()方法,其實現如下:
關于 Partition的新建,總共分了以下四步:
經過上面幾個階段,一個 Partition算是真正創建出來,可以正常進行讀寫工作了,當然上面只是講述了 Controller端做的內容,Partition副本所在節點對 LeaderAndIsr請求會做更多的工作,這部分會在后面關于 LeaderAndIsr請求的處理中只能夠詳細講述。
Topic新建
Kafka也提供了 Topic創建的工具,假設我們要創建一個名叫 topic_test,Partition數為2的 Topic,創建的命令如下:
跟前面的類似,方法二是可以精確控制新建 Topic每個 Partition副本所在位置,Topic創建的本質上是在/brokers/topics下新建一個節點信息,并將 Topic的分區詳情寫入進去,當/brokers/topics有了新增的 Topic節點后,會觸發 TopicChangeListener監聽器,其實現如下:
只要/brokers/topics下子節點信息有變化(topic新增或者刪除),TopicChangeListener都會被觸發,其 doHandleChildChange()方法的處理流程如下:
接著看下 onNewTopicCreation()方法實現
上述方法主要做了兩件事:
onNewPartitionCreation()的實現在前面 Topic擴容部分已經講述過,這里不再重復,最好參考前面流程圖來梳理 Topic擴容和新建的整個過程。
Kafka Topic刪除這部分的邏輯是一個單獨線程去做的,這個線程是在 Controller啟動時初始化和啟動的。
TopicDeletionManager初始化
TopicDeletionManager啟動實現如下所示:
TopicDeletionManager啟動時只是初始化了一個 DeleteTopicsThread線程,并啟動該線程。TopicDeletionManager這個類從名字上去看,它是 Topic刪除的管理器,它是如何實現 Topic刪除管理呢,這里先看下該類的幾個重要的成員變量:
前面一小節,簡單介紹了 TopicDeletionManager、DeleteTopicsThread的啟動以及它們之間的關系,這里我們看下一個 Topic被設置刪除后,其處理的整理流程,簡單做了一個小圖,如下所示:
這里先簡單講述上面的流程,當一個 Topic設置為刪除后:
先看下 DeleteTopicsListener的實現,如下:
其 doHandleChildChange()的實現邏輯如下:
看下 Topic刪除線程 DeleteTopicsThread的實現,如下所示:
doWork()方法處理邏輯如下:
先看下 onTopicDeletion()方法,這是 Topic最開始刪除時的實現,如下所示:
Topic的刪除的真正實現方法還是在 startReplicaDeletion()方法中,Topic刪除時,會先調用 onPartitionDeletion()方法刪除所有的 Partition,然后在 Partition刪除時,執行 startReplicaDeletion()方法刪除該 Partition的副本,該方法的實現如下:
該方法的執行邏輯如下:
在將副本狀態從 OfflineReplica轉移成 ReplicaDeletionStarted時,會設置一個回調方法 deleteTopicStopReplicaCallback(),該方法會將刪除成功的 Replica設置為 ReplicaDeletionSuccessful狀態,刪除失敗的 Replica設置為 ReplicaDeletionIneligible狀態(需要根據 StopReplica請求處理的過程,看下哪些情況下 Replica會刪除失敗,這個會在后面講解)。
下面看下這個方法 completeDeleteTopic(),當一個 Topic的所有 Replica都刪除成功時,即其狀態都在 ReplicaDeletionSuccessful時,會調用這個方法,如下所示:
當一個 Topic所有副本都刪除后,會進行如下處理:
至此,一個 Topic算是真正刪除完成。
怎么設置kafka topic數據存儲時間
1、Kafka創建topic命令很簡單,一條命令足矣:bin/kafka-topics.sh--create--zookeeper localhost:2181--replication-factor 3--partitions 3--topic test。
2.此命令將創建一個名為test的topic,其中有三個分區,每個分區需要分配三個副本。
三。topic創建主要分為兩部分:命令行controller邏輯部分。
四。后臺邏輯將監聽zookeeper下對應的目錄節點。一旦啟動topic創建命令,它將創建一個新的數據節點并觸發后臺創建邏輯。
五個。確定分區副本分配方案(即,將每個分區副本分配給哪個代理);創建zookeeper節點并將此方案寫入/brokers/topics/<topic>節點。
五個。確定分區副本分配方案(即每個分區的副本分配給哪個分區)broker上);創建zookeeper節點,把這個方案寫入/brokers/topics/<topic>節點下。
6、Kafka controller這一部分的主要任務是:創建分區;創建副本;為每個分區選擇leaderISR;;更新各種緩存。