成人精品毛片_久久精品男人的天堂_久久午夜影院_国产视频亚洲视频

當前位置: 首頁 安卓手游資訊 手游資訊

kafka副本參數

Kafka動態調整topic副本因子replication-factor

實際項目中我們可能在創建topic時沒有設置好正確的replication-factor,導致kafka集群雖然是高可用的,但是該topic在有broker宕機時,可能發生無法使用的情況。topic一旦使用又不能輕易刪除重建,因此動態增加副本因子就成為最終的選擇。

原因分析:

假設我們有3個kafka broker分別brokerA、brokerB、brokerC.

如何動態給已經創建的topic添加replication-factor?

可能很多人想使用kafka-topics.sh腳本,那么事情情況如何了?

截圖

可以看出kafka-topics.sh不能用來增加副本因子replication-factor。實際應該使用kafka bin目錄下面的kafka-reassign-partitions.sh。

a,首先我們配置topic的副本,保存為json文件()

我們想把yqtopic01的部分設置為3,(我的kafka集群有3個broker,id分別為0,1,2), json文件名稱為increase-replication-factor.json

{"version":1,

"partitions":[

{"topic":"yqtopic01","partition":0,"replicas":[0,1,2]},

{"topic":"yqtopic01","partition":1,"replicas":[0,1,2]},

{"topic":"yqtopic01","partition":2,"replicas":[0,1,2]}

]}

b,然后執行腳本

./kafka-reassign-partitions.sh-zookeeper 127.0.0.1:2181--reassignment-json-file increase-replication-factor.json--execute

kafka-reassign-partitions.sh執行截圖

我們可以通過執行

kafka-topics.sh--describe--zookeeper localhost:2181--topic yqtopic01查看現在該topic的副本因子。

總結

所有文檔官方文檔最權威。

摘錄如下:

Increasing replication factor

Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the--execute option to increase the replication factor of the specified partitions.

For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.

The first step is to hand craft the custom reassignment plan in a json file:

Then, use the json file with the--execute option to start the reassignment process:

The--verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json(used with the--execute option) should be used with the--verify option:

You can also verify the increase in replication factor with the kafka-topics tool:

如何確定Kafka的分區數,key和consumer線程數

一、客戶端/服務器端需要使用的內存就越多

先說說客戶端的情況。Kafka 0.8.2之后推出了Java版的全新的producer,這個producer有個參數batch.size,默認是16KB。它會為每個分區緩存消息,一旦滿了就打包將消息批量發出??瓷先ミ@是個能夠提升性能的設計。不過很顯然,因為這個參數是分區級別的,如果分區數越多,這部分緩存所需的內存占用也會更多。假設你有10000個分區,按照默認設置,這部分緩存需要占用約157MB的內存。而consumer端呢?我們拋開獲取數據所需的內存不說,只說線程的開銷。如果還是假設有10000個分區,同時consumer線程數要匹配分區數(大部分情況下是最佳的消費吞吐量配置)的話,那么在consumer client就要創建10000個線程,也需要創建大約10000個Socket去獲取分區數據。這里面的線程切換的開銷本身已經不容小覷了。

服務器端的開銷也不小,如果閱讀Kafka源碼的話可以發現,服務器端的很多組件都在內存中維護了分區級別的緩存,比如controller,FetcherManager等,因此分區數越多,這種緩存的成本越久越大。

二、文件句柄的開銷

每個分區在底層文件系統都有屬于自己的一個目錄。該目錄下通常會有兩個文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager會為每個broker都保存這兩個文件句柄(file handler)。很明顯,如果分區數越多,所需要保持打開狀態的文件句柄數也就越多,最終可能會突破你的ulimit-n的限制。

三、降低高可用性

Kafka通過副本(replica)機制來保證高可用。具體做法就是為每個分區保存若干個副本(replica_factor指定副本數)。每個副本保存在不同的broker上。期中的一個副本充當leader副本,負責處理producer和consumer請求。其他副本充當follower角色,由Kafka controller負責保證與leader的同步。如果leader所在的broker掛掉了,contorller會檢測到然后在zookeeper的幫助下重選出新的leader——這中間會有短暫的不可用時間窗口,雖然大部分情況下可能只是幾毫秒級別。但如果你有10000個分區,10個broker,也就是說平均每個broker上有1000個分區。此時這個broker掛掉了,那么zookeeper和controller需要立即對這1000個分區進行leader選舉。比起很少的分區leader選舉而言,這必然要花更長的時間,并且通常不是線性累加的。如果這個broker還同時是controller情況就更糟了。

說了這么多“廢話”,很多人肯定已經不耐煩了。那你說到底要怎么確定分區數呢?答案就是:視情況而定。基本上你還是需要通過一系列實驗和測試來確定。當然測試的依據應該是吞吐量。雖然LinkedIn這篇文章做了Kafka的基準測試,但它的結果其實對你意義不大,因為不同的硬件、軟件、負載情況測試出來的結果必然不一樣。我經常碰到的問題類似于,官網說每秒能到10MB,為什么我的producer每秒才1MB?——且不說硬件條件,最后發現他使用的消息體有1KB,而官網的基準測試是用100B測出來的,因此根本沒有可比性。不過你依然可以遵循一定的步驟來嘗試確定分區數:創建一個只有1個分區的topic,然后測試這個topic的producer吞吐量和consumer吞吐量。假設它們的值分別是Tp和Tc,單位可以是MB/s。然后假設總的目標吞吐量是Tt,那么分區數= Tt/ max(Tp, Tc)

Tp表示producer的吞吐量。測試producer通常是很容易的,因為它的邏輯非常簡單,就是直接發送消息到Kafka就好了。Tc表示consumer的吞吐量。測試Tc通常與應用的關系更大,因為Tc的值取決于你拿到消息之后執行什么操作,因此Tc的測試通常也要麻煩一些。

Kafka并不能真正地做到線性擴展(其實任何系統都不能),所以你在規劃你的分區數的時候最好多規劃一下,這樣未來擴展時候也更加方便。

消息-分區的分配

默認情況下,Kafka根據傳遞消息的key來進行分區的分配,即hash(key)% numPartitions,如下圖所示:

def partition(key: Any, numPartitions: Int): Int={

Utils.abs(key.hashCode)% numPartitions

}

這就保證了相同key的消息一定會被路由到相同的分區。如果你沒有指定key,那么Kafka是如何確定這條消息去往哪個分區的呢?

復制代碼

if(key== null){//如果沒有指定key

val id= sendPartitionPerTopicCache.get(topic)//先看看Kafka有沒有緩存的現成的分區Id

id match{

case Some(partitionId)=>

partitionId//如果有的話直接使用這個分區Id就好了

case None=>//如果沒有的話,

val availablePartitions= topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)//找出所有可用分區的leader所在的broker

if(availablePartitions.isEmpty)

throw new LeaderNotAvailableException("No leader for any partition in topic"+ topic)

val index= Utils.abs(Random.nextInt)% availablePartitions.size//從中隨機挑一個

val partitionId= availablePartitions(index).partitionId

sendPartitionPerTopicCache.put(topic, partitionId)//更新緩存以備下一次直接使用

partitionId

}

}

復制代碼

可以看出,Kafka幾乎就是隨機找一個分區發送無key的消息,然后把這個分區號加入到緩存中以備后面直接使用——當然了,Kafka本身也會清空該緩存(默認每10分鐘或每次請求topic元數據時)

如何設定consumer線程數

我個人的觀點,如果你的分區數是N,那么最好線程數也保持為N,這樣通常能夠達到最大的吞吐量。超過N的配置只是浪費系統資源,因為多出的線程不會被分配到任何分區。讓我們來看看具體Kafka是如何分配的。

topic下的一個分區只能被同一個consumer group下的一個consumer線程來消費,但反之并不成立,即一個consumer線程可以消費多個分區的數據,比如Kafka提供的ConsoleConsumer,默認就只是一個線程來消費所有分區的數據?!鋵岰onsoleConsumer可以使用通配符的功能實現同時消費多個topic數據,但這和本文無關。

再討論分配策略之前,先說說KafkaStream——它是consumer的關鍵類,提供了遍歷方法用于consumer程序調用實現數據的消費。其底層維護了一個阻塞隊列,所以在沒有新消息到來時,consumer是處于阻塞狀態的,表現出來的狀態就是consumer程序一直在等待新消息的到來?!惝斎豢梢耘渲贸蓭С瑫r的consumer,具體參看參數consumer.timeout.ms的用法。

下面說說Kafka提供的兩種分配策略: range和roundrobin,由參數partition.assignment.strategy指定,默認是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。舉個例子就明白了,假設你有10個分區,P0~ P9,consumer線程數是3, C0~ C2,那么每個線程都分配哪些分區呢?

C0消費分區 0, 1, 2, 3

C1消費分區 4, 5, 6

C2消費分區 7, 8, 9

具體算法就是:

復制代碼

val nPartsPerConsumer= curPartitions.size/ curConsumers.size//每個consumer至少保證消費的分區數

val nConsumersWithExtraPart= curPartitions.size% curConsumers.size//還剩下多少個分區需要單獨分配給開頭的線程們

...

for(consumerThreadId<- consumerThreadIdSet){//對于每一個consumer線程

val myConsumerPosition= curConsumers.indexOf(consumerThreadId)//算出該線程在所有線程中的位置,介于[0, n-1]

assert(myConsumerPosition>= 0)

// startPart就是這個線程要消費的起始分區數

val startPart= nPartsPerConsumer* myConsumerPosition+ myConsumerPosition.min(nConsumersWithExtraPart)

// nParts就是這個線程總共要消費多少個分區

val nParts= nPartsPerConsumer+(if(myConsumerPosition+ 1> nConsumersWithExtraPart) 0 else 1)

...

}

復制代碼

針對于這個例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart為10%3=1,說明每個線程至少保證3個分區,還剩下1個分區需要單獨分配給開頭的若干個線程。這就是為什么C0消費4個分區,后面的2個線程每個消費3個分區,具體過程詳見下面的Debug截圖信息:

ctx.myTopicThreadIds

nPartsPerConsumer= 10/ 3= 3

nConsumersWithExtraPart= 10% 3= 1

第一次:

myConsumerPosition= 1

startPart= 1* 3+ min(1, 1)= 4---也就是從分區4開始讀

nParts= 3+(if(1+ 1> 1) 0 else 1)= 3讀取3個分區,即4,5,6

第二次:

myConsumerPosition= 0

startPart= 3* 0+ min(1, 0)=0---從分區0開始讀

nParts= 3+(if(0+ 1> 1) 0 else 1)= 4讀取4個分區,即0,1,2,3

第三次:

myConsumerPosition= 2

startPart= 3* 2+ min(2, 1)= 7---從分區7開始讀

nParts= 3+ if(2+ 1> 1) 0 else 1)= 3讀取3個分區,即7, 8, 9

至此10個分區都已經分配完畢

說到這里,經常有個需求就是我想讓某個consumer線程消費指定的分區而不消費其他的分區。坦率來說,目前Kafka并沒有提供自定義分配策略。做到這點很難,但仔細想一想,也許我們期望Kafka做的事情太多了,畢竟它只是個消息引擎,在Kafka中加入消息消費的邏輯也許并不是Kafka該做的事情。

Kafka - 副本機制詳解

所謂的副本機制(Replication),也可以稱之為備份機制,通常是指分布式系統在多臺網絡互聯的機器上保存有相同的數據拷貝。副本機制有什么好處呢?

這些優點都是在分布式系統教科書中最常被提及的,但是有些遺憾的是,對于 Apache Kafka而言,目前只能享受到副本機制帶來的第 1個好處,也就是提供數據冗余實現高可用性和高持久性。我會在這一講后面的內容中,詳細解釋 Kafka沒能提供第 2點和第 3點好處的原因。

不過即便如此,副本機制依然是 Kafka設計架構的核心所在,它也是 Kafka確保系統高可用和消息高持久性的重要基石。

在討論具體的副本機制之前,我們先花一點時間明確一下副本的含義。

我們之前談到過,Kafka是有主題概念的,而每個主題又進一步劃分成若干個分區。副本的概念實際上是在分區層級下定義的,每個分區配置有若干個副本。

所謂副本(Replica),本質就是一個只能追加寫消息的提交日志。根據 Kafka副本機制的定義,同一個分區下的所有副本保存有相同的消息序列,這些副本分散保存在不同的 Broker上,從而能夠對抗部分 Broker宕機帶來的數據不可用。

在實際生產環境中,每臺 Broker都可能保存有各個主題下不同分區的不同副本,單個 Broker上存有成百上千個副本的現象是非常正常的。

接下來我們來看一張圖,它展示的是一個有 3臺 Broker的 Kafka集群上的副本分布情況。從這張圖中,我們可以看到,主題 1分區 0的 3個副本分散在 3臺 Broker上,其他主題分區的副本也都散落在不同的 Broker上,從而實現數據冗余。

既然分區下能夠配置多個副本,而且這些副本的內容還要一致,那么很自然的一個問題就是:我們該如何確保副本中所有的數據都是一致的呢?特別是對 Kafka而言,當生產者發送消息到某個主題后,消息是如何同步到對應的所有副本中的呢?針對這個問題,最常見的解決方案就是采用基于領導者(Leader-based)的副本機制。Apache Kafka就是這樣的設計。

基于領導者的副本機制的工作原理如下圖所示,我來簡單解釋一下這張圖里面的內容。

第一,在 Kafka中,副本分成兩類:領導者副本(Leader Replica)和追隨者副本(Follower Replica)。每個分區在創建時都要選舉一個副本,稱為領導者副本,其余的副本自動稱為追隨者副本。

第二,Kafka的副本機制比其他分布式系統要更嚴格一些。在 Kafka中,追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產者的讀寫請求。所有的請求都必須由領導者副本來處理,或者說,所有的讀寫請求都必須發往領導者副本所在的 Broker,由該 Broker負責處理。追隨者副本不處理客戶端請求,它唯一的任務就是從領導者副本異步拉取消息,并寫入到自己的提交日志中,從而實現與領導者副本的同步。

第三,當領導者副本掛掉了,或者說領導者副本所在的 Broker宕機時,Kafka依托于 ZooKeeper提供的監控功能能夠實時感知到,并立即開啟新一輪的領導者選舉,從追隨者副本中選一個作為新的領導者。老 Leader副本重啟回來后,只能作為追隨者副本加入到集群中。

你一定要特別注意上面的第二點,即追隨者副本是不對外提供服務的。還記得剛剛我們談到副本機制的好處時,說過 Kafka沒能提供讀操作橫向擴展以及改善局部性嗎?具體的原因就在于此。

對于客戶端用戶而言,Kafka的追隨者副本沒有任何作用,它既不能像 MySQL那樣幫助領導者副本“抗讀”,也不能實現將某些副本放到離客戶端近的地方來改善數據局部性。

既然如此,Kafka為什么要這樣設計呢?其實這種副本機制有兩個方面的好處。

1.方便實現“Read-your-writes”。

所謂 Read-your-writes,顧名思義就是,當你使用生產者 API向 Kafka成功寫入消息后,馬上使用消費者 API去讀取剛才生產的消息。

舉個例子,比如你平時發微博時,你發完一條微博,肯定是希望能立即看到的,這就是典型的 Read-your-writes場景。如果允許追隨者副本對外提供服務,由于副本同步是異步的,因此有可能出現追隨者副本還沒有從領導者副本那里拉取到最新的消息,從而使得客戶端看不到最新寫入的消息。

2.方便實現單調讀(Monotonic Reads)。

什么是單調讀呢?就是對于一個消費者用戶而言,在多次消費消息時,它不會看到某條消息一會兒存在一會兒不存在。

如果允許追隨者副本提供讀服務,那么假設當前有 2個追隨者副本 F1和 F2,它們異步地拉取領導者副本數據。倘若 F1拉取了 Leader的最新消息而 F2還未及時拉取,此時如果有一個消費者先從 F1讀取消息之后又從 F2拉取消息,它可能會看到這樣的現象:第一次消費時看到的最新消息在第二次消費時不見了,這就不是單調讀一致性。如果所有的讀請求都是由 Leader來處理,那么 Kafka就很容易實現單調讀一致性。

我們剛剛反復說過,追隨者副本不提供服務,只是定期地異步拉取領導者副本中的數據而已。既然是異步的,就存在著不可能與 Leader實時同步的風險。在探討如何正確應對這種風險之前,我們必須要精確地知道同步的含義是什么。或者說,Kafka要明確地告訴我們,追隨者副本到底在什么條件下才算與 Leader同步。

基于這個想法,Kafka引入了 In-sync Replicas,也就是所謂的 ISR副本集合。ISR中的副本都是與 Leader同步的副本,相反,不在 ISR中的追隨者副本就被認為是與 Leader不同步的。到底什么副本能夠進入到 ISR中呢?

我們首先要明確的是,Leader副本天然就在 ISR中。也就是說, ISR不只是追隨者副本集合,它必然包括 Leader副本。甚至在某些情況下,ISR只有 Leader這一個副本。

能夠進入到 ISR的追隨者副本要滿足一定的條件。至于是什么條件,我先賣個關子,我們先來一起看看下面這張圖。

圖中有 3個副本:1個領導者副本和 2個追隨者副本。Leader副本當前寫入了 10條消息,Follower1副本同步了其中的 6條消息,而 Follower2副本只同步了其中的 3條消息。請你思考一下,對于這 2個追隨者副本,你覺得哪個追隨者副本與 Leader不同步?

答案是,要根據具體情況來定。換成英文,就是那句著名的“It depends”。看上去好像 Follower2的消息數比 Leader少了很多,它是最有可能與 Leader不同步的。的確是這樣的,但僅僅是可能。

這張圖中的 2個 Follower副本都有可能與 Leader不同步,但也都有可能與 Leader同步。也就是說,Kafka判斷 Follower是否與 Leader同步的標準,不是看相差的消息數,而是另有“玄機”。

這個標準就是 Broker端參數 replica.lag.time.max.ms參數值。這個參數的含義是 Follower副本能夠落后 Leader副本的最長時間間隔,當前默認值是 10秒。這就是說,只要一個 Follower副本落后 Leader副本的時間不連續超過 10秒,那么 Kafka就認為該 Follower副本與 Leader是同步的,即使此時 Follower副本中保存的消息明顯少于 Leader副本中的消息。

我們在前面說過,Follower副本唯一的工作就是不斷地從 Leader副本拉取消息,然后寫入到自己的提交日志中。如果這個同步過程的速度持續慢于 Leader副本的消息寫入速度,那么在 replica.lag.time.max.ms時間后,此 Follower副本就會被認為是與 Leader副本不同步的,因此不能再放入 ISR中。此時,Kafka會自動收縮 ISR集合,將該副本“踢出”ISR。

倘若該副本后面慢慢地追上了 Leader的進度,那么它是能夠重新被加回 ISR的。這也表明,ISR是一個動態調整的集合,而非靜態不變的。

既然 ISR是可以動態調整的,那么自然就可以出現這樣的情形:ISR為空。因為 Leader副本天然就在 ISR中,如果 ISR為空了,就說明 Leader副本也“掛掉”了,Kafka需要重新選舉一個新的 Leader??墒?ISR是空,此時該怎么選舉新 Leader呢?

Kafka把所有不在 ISR中的存活副本都稱為非同步副本。非同步副本落后 Leader太多,如果選擇這些副本作為新 Leader,就可能出現數據的丟失。畢竟,這些副本中保存的消息遠遠落后于老 Leader中的消息。在 Kafka中,選舉這種副本的過程稱為 Unclean領導者選舉。 Broker端參數 unclean.leader.election.enable控制是否允許 Unclean領導者選舉。

開啟 Unclean領導者選舉可能會造成數據丟失,但好處是,它使得分區 Leader副本一直存在,不至于停止對外提供服務,因此提升了高可用性。禁止 Unclean領導者選舉的好處在于維護了數據的一致性,避免了消息丟失,但犧牲了高可用性。

如果你聽說過 CAP理論的話,你一定知道,一個分布式系統通常只能同時滿足一致性(Consistency)、可用性(Availability)、分區容錯性(Partition tolerance)中的兩個。顯然,在這個問題上,Kafka賦予你選擇 C或 A的權利。

你可以根據你的實際業務場景決定是否開啟 Unclean領導者選舉。我強烈建議你不要開啟它,畢竟我們還可以通過其他的方式來提升高可用性。如果為了這點兒高可用性的改善,犧牲了數據一致性,那就非常不值當了。

標簽: kafka 副本 參數

聲明:

1、本文來源于互聯網,所有內容僅代表作者本人的觀點,與本網站立場無關,作者文責自負。

2、本網站部份內容來自互聯網收集整理,對于不當轉載或引用而引起的民事紛爭、行政處理或其他損失,本網不承擔責任。

3、如果有侵權內容、不妥之處,請第一時間聯系我們刪除,請聯系

手游對比

  1. 倚天神龍手游VS天下長安手游iOS版
  2. 赤血屠龍BT版VS我拼齒輪賊6
  3. 大洪水的故事VS流行戰爭大作戰
  4. 仙訣手游(暫未上線)VS封仙之封神演義手游
  5. 紫仙皓月VS網易非人學院
  6. 云創攻速VS賽馬娘日服
  7. 青云傳傲劍遮天手游VS螞蟻之地
  8. 傾世月神安卓版VS布朗熊快跑
  9. 高爆熱血傳奇VS出擊吧比卡丘手游
  10. 機甲騎士團VS顏色圖形匹配
  11. 裁決詩人手機版VS獵魔騎士團安卓版
  12. 饑餓島游戲VS仍在江湖官方版
主站蜘蛛池模板: 沾益县| 黄骅市| 黑山县| 丰顺县| 仙居县| 子洲县| 扎鲁特旗| 青铜峡市| 永吉县| 乌兰浩特市| 冕宁县| 辽宁省| 灵武市| 修文县| 南召县| 怀宁县| 茂名市| 綦江县| 泰顺县| 瑞昌市| 抚远县| 白银市| 荔浦县| 铁岭市| 汝城县| 武定县| 铁力市| 义马市| 北海市| 沁阳市| 双峰县| 原阳县| 冷水江市| 仙居县| 手机| 齐齐哈尔市| 崇仁县| 仁寿县| 安化县| 玛曲县| 聊城市|