Kafka核心組件之控制器和協調器
[TOC]
我們已經知道Kafka的集群由n個的broker所組成,每個broker就是一個kafka的實例或者稱之為kafka的服務。其實控制器也是一個broker,控制器也叫leader broker。
他除了具有一般broker的功能外,還負責分區leader的選取,也就是負責選舉partition的leader replica。
kafka每個broker啟動的時候,都會實例化一個KafkaController,并將broker的id注冊到zookeeper,集群在啟動過程中,通過選舉機制選舉出其中一個broker作為leader,也就是前面所說的控制器。
包括集群啟動在內,有三種情況觸發控制器選舉:
1、集群啟動
2、控制器所在代理發生故障
3、zookeeper心跳感知,控制器與自己的session過期
按照慣例,先看圖。我們根據下圖來講解集群啟動時,控制器選舉過程。
假設此集群有三個broker,同時啟動。
(一)3個broker從zookeeper獲取/controller臨時節點信息。/controller存儲的是選舉出來的leader信息。此舉是為了確認是否已經存在leader。
(二)如果還沒有選舉出leader,那么此節點是不存在的,返回-1。如果返回的不是-1,而是leader的json數據,那么說明已經有leader存在,選舉結束。
(三)三個broker發現返回-1,了解到目前沒有leader,于是均會觸發向臨時節點/controller寫入自己的信息。最先寫入的就會成為leader。
(四)假設broker 0的速度最快,他先寫入了/controller節點,那么他就成為了leader。而broker1、broker2很不幸,因為晚了一步,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是zk告訴他們,此節點已經存在了。
經過以上四步,broker 0成功寫入/controller節點,其它broker寫入失敗了,所以broker 0成功當選leader。
此外zk中還有controller_epoch節點,存儲了leader的變更次數,初始值為0,以后leader每變一次,該值+1。所有向控制器發起的請求,都會攜帶此值。如果控制器和自己內存中比較,請求值小,說明kafka集群已經發生了新的選舉,此請求過期,此請求無效。如果請求值大于控制器內存的值,說明已經有新的控制器當選了,自己已經退位,請求無效。kafka通過controller_epoch保證集群控制器的唯一性及操作的一致性。
由此可見,Kafka控制器選舉就是看誰先爭搶到/controller節點寫入自身信息。
控制器的初始化,其實是初始化控制器所用到的組件及監聽器,準備元數據。
前面提到過每個broker都會實例化并啟動一個KafkaController。KafkaController和他的組件關系,以及各個組件的介紹如下圖:
圖中箭頭為組件層級關系,組件下面還會再初始化其他組件。可見控制器內部還是有些復雜的,主要有以下組件:
1、ControllerContext,此對象存儲了控制器工作需要的所有上下文信息,包括存活的代理、所有主題及分區分配方案、每個分區的AR、leader、ISR等信息。
2、一系列的listener,通過對zookeeper的監聽,觸發相應的操作,黃色的框的均為listener
3、分區和副本狀態機,管理分區和副本。
4、當前代理選舉器ZookeeperLeaderElector,此選舉器有上位和退位的相關回調方法。
5、分區leader選舉器,PartitionLeaderSelector
6、主題刪除管理器,TopicDeletetionManager
7、leader向broker批量通信的ControllerBrokerRequestBatch。緩存狀態機處理后產生的request,然后統一發送出去。
8、控制器平衡操作的KafkaScheduler,僅在broker作為leader時有效。
Kafka集群的一些重要信息都記錄在ZK中,比如集群的所有代理節點、主題的所有分區、分區的副本信息(副本集、主副本、同步的副本集)。每個broker都有一個控制器,為了管理整個集群Kafka選利用zk選舉模式,為整個集群選舉一個“中央控制器”或”主控制器“,控制器其實就是一個broker節點,除了一般broker功能外,還具有分區首領選舉功能。中央控制器管理所有節點的信息,并通過向ZK注冊各種監聽事件來管理整個集群節點、分區的leader的選舉、再平衡等問題。外部事件會更新ZK的數據,ZK中的數據一旦發生變化,控制器都要做不同的響應處理。
故障轉移其實就是leader所在broker發生故障,leader轉移為其他的broker。轉移的過程就是重新選舉leader的過程。
重新選舉leader后,需要為該broker注冊相應權限,調用的是ZookeeperLeaderElector的onControllerFailover()方法。在這個方法中初始化和啟動了一系列的組件來完成leader的各種操作。具體如下,其實和控制器初始化有很大的相似度。
1、注冊分區管理的相關監聽器
2、注冊主題管理的相關監聽
3、注冊代理變化監聽器
4、重新初始化ControllerContext,
5、啟動控制器和其他代理之間通信的ControllerChannelManager
6、創建用于刪除主題的TopicDeletionManager對象,并啟動。
7、啟動分區狀態機和副本狀態機
8、輪詢每個主題,添加監聽分區變化的PartitionModificationsListener
9、如果設置了分區平衡定時操作,那么創建分區平衡的定時任務,默認300秒檢查并執行。
除了這些組件的啟動外,onControllerFailover方法中還做了如下操作:
1、/controller_epoch值+1,并且更新到ControllerContext
2、檢查是否出發分區重分配,并做相關操作
3、檢查需要將優先副本選為leader,并做相關操作
4、向kafka集群所有代理發送更新元數據的請求。
下面來看leader權限被取消時,調用的方法onControllerResignation
1、該方法中注銷了控制器的權限。取消在zookeeper中對于分區、副本感知的相應監聽器的監聽。
2、關閉啟動的各個組件
3、最后把ControllerContext中記錄控制器版本的數值清零,并設置當前broker為RunnignAsBroker,變為普通的broker。
通過對控制器啟動過程的學習,我們應該已經對kafka工作的原理有了了解,核心是監聽zookeeper的相關節點,節點變化時觸發相應的操作。
有新的broker加入集群時,稱為代理上線。當broker關閉,推出集群時,稱為代理下線。
代理上線:
1、新代理啟動時向/brokers/ids寫數據
2、BrokerChangeListener監聽到變化。對新上線節點調用controllerChannelManager.addBroker(),完成新上線代理網絡層初始化
3、調用KafkaController.onBrokerStartup()處理
3.5恢復因新代理上線暫停的刪除主題操作線程
代理下線:
1、查找下線節點集合
2、輪詢下線節點,調用controllerChannelManager.removeBroker(),關閉每個下線節點網絡連接。清空下線節點消息隊列,關閉下線節點request請求
3、輪詢下線節點,調用KafkaController.onBrokerFailure處理
4、向集群全部存活代理發送updateMetadataRequest請求
顧名思義,協調器負責協調工作。本節所講的協調器,是用來協調消費者工作分配的。簡單點說,就是消費者啟動后,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉起來,全有賴于協調器。
主要的協調器有如下兩個:
1、消費者協調器(ConsumerCoordinator)
2、組協調器(GroupCoordinator)
kafka引入協調器有其歷史過程,原來consumer信息依賴于zookeeper存儲,當代理或消費者發生變化時,引發消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊群效應和腦裂問題。
為了解決這些問題,kafka引入了協調器。服務端引入組協調器(GroupCoordinator),消費者端引入消費者協調器(ConsumerCoordinator)。每個broker啟動的時候,都會創建GroupCoordinator實例,管理部分消費組(集群負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責同一個消費組下各個消費者和服務端組協調器之前的通信。如下圖:
消費者協調器,可以看作是消費者做操作的代理類(其實并不是),消費者很多操作通過消費者協調器進行處理。
消費者協調器主要負責如下工作:
1、更新消費者緩存的MetaData
2、向組協調器申請加入組
3、消費者加入組后的相應處理
4、請求離開消費組
5、向組協調器提交偏移量
6、通過心跳,保持組協調器的連接感知。
7、被組協調器選為leader的消費者的協調器,負責消費者分區分配。分配結果發送給組協調器。
8、非leader的消費者,通過消費者協調器和組協調器同步分配結果。
消費者協調器主要依賴的組件和說明見下圖:
可以看到這些組件和消費者協調器擔負的工作是可以對照上的。
組協調器負責處理消費者協調器發過來的各種請求。它主要提供如下功能:
組協調器在broker啟動的時候實例化,每個組協調器負責一部分消費組的管理。它主要依賴的組件見下圖:
這些組件也是和組協調器的功能能夠對應上的。具體內容不在詳述。
下圖展示了消費者啟動選取leader、入組的過程。
消費者入組的過程,很好的展示了消費者協調器和組協調器之間是如何配合工作的。leader consumer會承擔分區分配的工作,這樣kafka集群的壓力會小很多。同組的consumer通過組協調器保持同步。消費者和分區的對應關系持久化在kafka內部主題。
消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪里開始消費。如果整個環境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區變化后,消費者不再對應原來的分區,而每個消費者的offset也沒有同步到服務器,這樣就無法接著前任的工作繼續進行了。
因此只有把消費偏移量定期發送到服務器,由GroupCoordinator集中式管理,分區重分配后,各個消費者從GroupCoordinator讀取自己對應分區的offset,在新的分區上繼續前任的工作。
下圖展示了不提交offset到服務端的問題:
開始時,consumer 0消費partition 0和1,后來由于新的consumer 2入組,分區重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由于consumer之間是不能通訊的,所有consumer2并不知道從哪里開始自己的消費。
因此consumer需要定期提交自己消費的offset到服務端,這樣在重分區操作后,每個consumer都能在服務端查到分配給自己的partition所消費到的offset,繼續消費。
由于kafka有高可用和橫向擴展的特性,當有新的分區出現或者新的消費入組后,需要重新分配消費者對應的分區,所以如果偏移量提交的有問題,會重復消費或者丟消息。偏移量提交的時機和方式要格外注意!!
1、自動提交偏移量
設置 enable.auto.commit為true,設定好周期,默認5s。消費者每次調用輪詢消息的poll()方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。
這樣做很方便,但是會帶來重復消費的問題。假如最近一次偏移量提交3s后,觸發了再均衡,服務器端存儲的還是上次提交的偏移量,那么再均衡結束后,新的消費者會從最后一次提交的偏移量開始拉取消息,此3s內消費的消息會被重復消費。
2、手動提交偏移量
設置 enable.auto.commit為false。程序中手動調用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功后才往下運行。這樣會限制程序的吞吐量。如果降低提交頻次,又很容易發生重復消費。
這里我們可以使用commitAsync()異步提交偏移量。只管提交,而不會等待broker返回提交結果
commitSync只要沒有發生不可恢復錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因為重試提交時,可能已經有其它更大偏移量已經提交成功了,如果此時重試提交成功,那么更小的偏移量會覆蓋大的偏移量。那么如果此時發生再均衡,新的消費者將會重復消費消息。
Kafka - 副本機制詳解
所謂的副本機制(Replication),也可以稱之為備份機制,通常是指分布式系統在多臺網絡互聯的機器上保存有相同的數據拷貝。副本機制有什么好處呢?
這些優點都是在分布式系統教科書中最常被提及的,但是有些遺憾的是,對于 Apache Kafka而言,目前只能享受到副本機制帶來的第 1個好處,也就是提供數據冗余實現高可用性和高持久性。我會在這一講后面的內容中,詳細解釋 Kafka沒能提供第 2點和第 3點好處的原因。
不過即便如此,副本機制依然是 Kafka設計架構的核心所在,它也是 Kafka確保系統高可用和消息高持久性的重要基石。
在討論具體的副本機制之前,我們先花一點時間明確一下副本的含義。
我們之前談到過,Kafka是有主題概念的,而每個主題又進一步劃分成若干個分區。副本的概念實際上是在分區層級下定義的,每個分區配置有若干個副本。
所謂副本(Replica),本質就是一個只能追加寫消息的提交日志。根據 Kafka副本機制的定義,同一個分區下的所有副本保存有相同的消息序列,這些副本分散保存在不同的 Broker上,從而能夠對抗部分 Broker宕機帶來的數據不可用。
在實際生產環境中,每臺 Broker都可能保存有各個主題下不同分區的不同副本,單個 Broker上存有成百上千個副本的現象是非常正常的。
接下來我們來看一張圖,它展示的是一個有 3臺 Broker的 Kafka集群上的副本分布情況。從這張圖中,我們可以看到,主題 1分區 0的 3個副本分散在 3臺 Broker上,其他主題分區的副本也都散落在不同的 Broker上,從而實現數據冗余。
既然分區下能夠配置多個副本,而且這些副本的內容還要一致,那么很自然的一個問題就是:我們該如何確保副本中所有的數據都是一致的呢?特別是對 Kafka而言,當生產者發送消息到某個主題后,消息是如何同步到對應的所有副本中的呢?針對這個問題,最常見的解決方案就是采用基于領導者(Leader-based)的副本機制。Apache Kafka就是這樣的設計。
基于領導者的副本機制的工作原理如下圖所示,我來簡單解釋一下這張圖里面的內容。
第一,在 Kafka中,副本分成兩類:領導者副本(Leader Replica)和追隨者副本(Follower Replica)。每個分區在創建時都要選舉一個副本,稱為領導者副本,其余的副本自動稱為追隨者副本。
第二,Kafka的副本機制比其他分布式系統要更嚴格一些。在 Kafka中,追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產者的讀寫請求。所有的請求都必須由領導者副本來處理,或者說,所有的讀寫請求都必須發往領導者副本所在的 Broker,由該 Broker負責處理。追隨者副本不處理客戶端請求,它唯一的任務就是從領導者副本異步拉取消息,并寫入到自己的提交日志中,從而實現與領導者副本的同步。
第三,當領導者副本掛掉了,或者說領導者副本所在的 Broker宕機時,Kafka依托于 ZooKeeper提供的監控功能能夠實時感知到,并立即開啟新一輪的領導者選舉,從追隨者副本中選一個作為新的領導者。老 Leader副本重啟回來后,只能作為追隨者副本加入到集群中。
你一定要特別注意上面的第二點,即追隨者副本是不對外提供服務的。還記得剛剛我們談到副本機制的好處時,說過 Kafka沒能提供讀操作橫向擴展以及改善局部性嗎?具體的原因就在于此。
對于客戶端用戶而言,Kafka的追隨者副本沒有任何作用,它既不能像 MySQL那樣幫助領導者副本“抗讀”,也不能實現將某些副本放到離客戶端近的地方來改善數據局部性。
既然如此,Kafka為什么要這樣設計呢?其實這種副本機制有兩個方面的好處。
1.方便實現“Read-your-writes”。
所謂 Read-your-writes,顧名思義就是,當你使用生產者 API向 Kafka成功寫入消息后,馬上使用消費者 API去讀取剛才生產的消息。
舉個例子,比如你平時發微博時,你發完一條微博,肯定是希望能立即看到的,這就是典型的 Read-your-writes場景。如果允許追隨者副本對外提供服務,由于副本同步是異步的,因此有可能出現追隨者副本還沒有從領導者副本那里拉取到最新的消息,從而使得客戶端看不到最新寫入的消息。
2.方便實現單調讀(Monotonic Reads)。
什么是單調讀呢?就是對于一個消費者用戶而言,在多次消費消息時,它不會看到某條消息一會兒存在一會兒不存在。
如果允許追隨者副本提供讀服務,那么假設當前有 2個追隨者副本 F1和 F2,它們異步地拉取領導者副本數據。倘若 F1拉取了 Leader的最新消息而 F2還未及時拉取,此時如果有一個消費者先從 F1讀取消息之后又從 F2拉取消息,它可能會看到這樣的現象:第一次消費時看到的最新消息在第二次消費時不見了,這就不是單調讀一致性。如果所有的讀請求都是由 Leader來處理,那么 Kafka就很容易實現單調讀一致性。
我們剛剛反復說過,追隨者副本不提供服務,只是定期地異步拉取領導者副本中的數據而已。既然是異步的,就存在著不可能與 Leader實時同步的風險。在探討如何正確應對這種風險之前,我們必須要精確地知道同步的含義是什么。或者說,Kafka要明確地告訴我們,追隨者副本到底在什么條件下才算與 Leader同步。
基于這個想法,Kafka引入了 In-sync Replicas,也就是所謂的 ISR副本集合。ISR中的副本都是與 Leader同步的副本,相反,不在 ISR中的追隨者副本就被認為是與 Leader不同步的。到底什么副本能夠進入到 ISR中呢?
我們首先要明確的是,Leader副本天然就在 ISR中。也就是說, ISR不只是追隨者副本集合,它必然包括 Leader副本。甚至在某些情況下,ISR只有 Leader這一個副本。
能夠進入到 ISR的追隨者副本要滿足一定的條件。至于是什么條件,我先賣個關子,我們先來一起看看下面這張圖。
圖中有 3個副本:1個領導者副本和 2個追隨者副本。Leader副本當前寫入了 10條消息,Follower1副本同步了其中的 6條消息,而 Follower2副本只同步了其中的 3條消息。請你思考一下,對于這 2個追隨者副本,你覺得哪個追隨者副本與 Leader不同步?
答案是,要根據具體情況來定。換成英文,就是那句著名的“It depends”。看上去好像 Follower2的消息數比 Leader少了很多,它是最有可能與 Leader不同步的。的確是這樣的,但僅僅是可能。
這張圖中的 2個 Follower副本都有可能與 Leader不同步,但也都有可能與 Leader同步。也就是說,Kafka判斷 Follower是否與 Leader同步的標準,不是看相差的消息數,而是另有“玄機”。
這個標準就是 Broker端參數 replica.lag.time.max.ms參數值。這個參數的含義是 Follower副本能夠落后 Leader副本的最長時間間隔,當前默認值是 10秒。這就是說,只要一個 Follower副本落后 Leader副本的時間不連續超過 10秒,那么 Kafka就認為該 Follower副本與 Leader是同步的,即使此時 Follower副本中保存的消息明顯少于 Leader副本中的消息。
我們在前面說過,Follower副本唯一的工作就是不斷地從 Leader副本拉取消息,然后寫入到自己的提交日志中。如果這個同步過程的速度持續慢于 Leader副本的消息寫入速度,那么在 replica.lag.time.max.ms時間后,此 Follower副本就會被認為是與 Leader副本不同步的,因此不能再放入 ISR中。此時,Kafka會自動收縮 ISR集合,將該副本“踢出”ISR。
倘若該副本后面慢慢地追上了 Leader的進度,那么它是能夠重新被加回 ISR的。這也表明,ISR是一個動態調整的集合,而非靜態不變的。
既然 ISR是可以動態調整的,那么自然就可以出現這樣的情形:ISR為空。因為 Leader副本天然就在 ISR中,如果 ISR為空了,就說明 Leader副本也“掛掉”了,Kafka需要重新選舉一個新的 Leader。可是 ISR是空,此時該怎么選舉新 Leader呢?
Kafka把所有不在 ISR中的存活副本都稱為非同步副本。非同步副本落后 Leader太多,如果選擇這些副本作為新 Leader,就可能出現數據的丟失。畢竟,這些副本中保存的消息遠遠落后于老 Leader中的消息。在 Kafka中,選舉這種副本的過程稱為 Unclean領導者選舉。 Broker端參數 unclean.leader.election.enable控制是否允許 Unclean領導者選舉。
開啟 Unclean領導者選舉可能會造成數據丟失,但好處是,它使得分區 Leader副本一直存在,不至于停止對外提供服務,因此提升了高可用性。禁止 Unclean領導者選舉的好處在于維護了數據的一致性,避免了消息丟失,但犧牲了高可用性。
如果你聽說過 CAP理論的話,你一定知道,一個分布式系統通常只能同時滿足一致性(Consistency)、可用性(Availability)、分區容錯性(Partition tolerance)中的兩個。顯然,在這個問題上,Kafka賦予你選擇 C或 A的權利。
你可以根據你的實際業務場景決定是否開啟 Unclean領導者選舉。我強烈建議你不要開啟它,畢竟我們還可以通過其他的方式來提升高可用性。如果為了這點兒高可用性的改善,犧牲了數據一致性,那就非常不值當了。
如何確定Kafka的分區數,key和consumer線程數
一、客戶端/服務器端需要使用的內存就越多
先說說客戶端的情況。Kafka 0.8.2之后推出了Java版的全新的producer,這個producer有個參數batch.size,默認是16KB。它會為每個分區緩存消息,一旦滿了就打包將消息批量發出。看上去這是個能夠提升性能的設計。不過很顯然,因為這個參數是分區級別的,如果分區數越多,這部分緩存所需的內存占用也會更多。假設你有10000個分區,按照默認設置,這部分緩存需要占用約157MB的內存。而consumer端呢?我們拋開獲取數據所需的內存不說,只說線程的開銷。如果還是假設有10000個分區,同時consumer線程數要匹配分區數(大部分情況下是最佳的消費吞吐量配置)的話,那么在consumer client就要創建10000個線程,也需要創建大約10000個Socket去獲取分區數據。這里面的線程切換的開銷本身已經不容小覷了。
服務器端的開銷也不小,如果閱讀Kafka源碼的話可以發現,服務器端的很多組件都在內存中維護了分區級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本越久越大。
二、文件句柄的開銷
每個分區在底層文件系統都有屬于自己的一個目錄。該目錄下通常會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄(file handler)。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit-n的限制。
三、降低高可用性
Kafka通過副本(replica)機制來保證高可用。具體做法就是為每個分區保存若干個副本(replica_factor指定副本數)。每個副本保存在不同的broker上。期中的一個副本充當leader副本,負責處理producer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然后在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分區,10個broker,也就是說平均每個broker上有1000個分區。此時這個broker掛掉了,那么zookeeper和controller需要立即對這1000個分區進行leader選舉。比起很少的分區leader選舉而言,這必然要花更長的時間,并且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。
說了這么多“廢話”,很多人肯定已經不耐煩了。那你說到底要怎么確定分區數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬件、軟件、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似于,官網說每秒能到10MB,為什么我的producer每秒才1MB?——且不說硬件條件,最后發現他使用的消息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分區數:創建一個只有1個分區的topic,然后測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然后假設總的目標吞吐量是Tt,那么分區數= Tt/ max(Tp, Tc)
Tp表示producer的吞吐量。測試producer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關系更大,因為Tc的值取決于你拿到消息之后執行什么操作,因此Tc的測試通常也要麻煩一些。
Kafka并不能真正地做到線性擴展(其實任何系統都不能),所以你在規劃你的分區數的時候最好多規劃一下,這樣未來擴展時候也更加方便。
消息-分區的分配
默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key)% numPartitions,如下圖所示:
def partition(key: Any, numPartitions: Int): Int={
Utils.abs(key.hashCode)% numPartitions
}
這就保證了相同key的消息一定會被路由到相同的分區。如果你沒有指定key,那么Kafka是如何確定這條消息去往哪個分區的呢?
復制代碼
if(key== null){//如果沒有指定key
val id= sendPartitionPerTopicCache.get(topic)//先看看Kafka有沒有緩存的現成的分區Id
id match{
case Some(partitionId)=>
partitionId//如果有的話直接使用這個分區Id就好了
case None=>//如果沒有的話,
val availablePartitions= topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)//找出所有可用分區的leader所在的broker
if(availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic"+ topic)
val index= Utils.abs(Random.nextInt)% availablePartitions.size//從中隨機挑一個
val partitionId= availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)//更新緩存以備下一次直接使用
partitionId
}
}
復制代碼
可以看出,Kafka幾乎就是隨機找一個分區發送無key的消息,然后把這個分區號加入到緩存中以備后面直接使用——當然了,Kafka本身也會清空該緩存(默認每10分鐘或每次請求topic元數據時)
如何設定consumer線程數
我個人的觀點,如果你的分區數是N,那么最好線程數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的線程不會被分配到任何分區。讓我們來看看具體Kafka是如何分配的。
topic下的一個分區只能被同一個consumer group下的一個consumer線程來消費,但反之并不成立,即一個consumer線程可以消費多個分區的數據,比如Kafka提供的ConsoleConsumer,默認就只是一個線程來消費所有分區的數據。——其實ConsoleConsumer可以使用通配符的功能實現同時消費多個topic數據,但這和本文無關。
再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用于consumer程序調用實現數據的消費。其底層維護了一個阻塞隊列,所以在沒有新消息到來時,consumer是處于阻塞狀態的,表現出來的狀態就是consumer程序一直在等待新消息的到來。——你當然可以配置成帶超時的consumer,具體參看參數consumer.timeout.ms的用法。
下面說說Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0~ P9,consumer線程數是3, C0~ C2,那么每個線程都分配哪些分區呢?
C0消費分區 0, 1, 2, 3
C1消費分區 4, 5, 6
C2消費分區 7, 8, 9
具體算法就是:
復制代碼
val nPartsPerConsumer= curPartitions.size/ curConsumers.size//每個consumer至少保證消費的分區數
val nConsumersWithExtraPart= curPartitions.size% curConsumers.size//還剩下多少個分區需要單獨分配給開頭的線程們
...
for(consumerThreadId<- consumerThreadIdSet){//對于每一個consumer線程
val myConsumerPosition= curConsumers.indexOf(consumerThreadId)//算出該線程在所有線程中的位置,介于[0, n-1]
assert(myConsumerPosition>= 0)
// startPart就是這個線程要消費的起始分區數
val startPart= nPartsPerConsumer* myConsumerPosition+ myConsumerPosition.min(nConsumersWithExtraPart)
// nParts就是這個線程總共要消費多少個分區
val nParts= nPartsPerConsumer+(if(myConsumerPosition+ 1> nConsumersWithExtraPart) 0 else 1)
...
}
復制代碼
針對于這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個線程至少保證3個分區,還剩下1個分區需要單獨分配給開頭的若干個線程。這就是為什么C0消費4個分區,后面的2個線程每個消費3個分區,具體過程詳見下面的Debug截圖信息:
ctx.myTopicThreadIds
nPartsPerConsumer= 10/ 3= 3
nConsumersWithExtraPart= 10% 3= 1
第一次:
myConsumerPosition= 1
startPart= 1* 3+ min(1, 1)= 4---也就是從分區4開始讀
nParts= 3+(if(1+ 1> 1) 0 else 1)= 3讀取3個分區,即4,5,6
第二次:
myConsumerPosition= 0
startPart= 3* 0+ min(1, 0)=0---從分區0開始讀
nParts= 3+(if(0+ 1> 1) 0 else 1)= 4讀取4個分區,即0,1,2,3
第三次:
myConsumerPosition= 2
startPart= 3* 2+ min(2, 1)= 7---從分區7開始讀
nParts= 3+ if(2+ 1> 1) 0 else 1)= 3讀取3個分區,即7, 8, 9
至此10個分區都已經分配完畢
說到這里,經常有個需求就是我想讓某個consumer線程消費指定的分區而不消費其他的分區。坦率來說,目前Kafka并沒有提供自定義分配策略。做到這點很難,但仔細想一想,也許我們期望Kafka做的事情太多了,畢竟它只是個消息引擎,在Kafka中加入消息消費的邏輯也許并不是Kafka該做的事情。