天津11选5几点开始:Apache?Kafka簡介

Kafka是一個流處理服務平臺。其中,生產者(Producers)向主題(Topic)中發布消息,消費者(Consumers)讀取并處理發布到主題中的消息。Kafka的主題是已發布消息的日志副本集合,這些日志都具有時間戳??梢遠災魈飩蟹智?,以增加存儲容量并提高并行度。

如圖1所示,向同一個主題發布消息的不同生產者進程可以:
a) 將消息寫入特定的分區(藍色箭頭);或
b) 允許Kafka在可用的分區間均衡地分發/加載消息(橙色箭頭)。

此外,Kafka的各個副本作為一個已發布的消息,僅出現在一個分區中。

從消費者的角度看,消費者可以屬于一個消費者組(consumer group),也可以不屬于任何消費者組。如果不屬于任何消費者組,則所有訂閱了主題的消費者都能夠讀取發布在主題中的某一條消息(如圖1中的黑色箭頭c所示)。

如果屬于消費者組,則該組中的各消費者將被分配到一個或多個分區中進行讀取。如果組中的消費者數量超過了主題的分區數量,則一個消費者最多只能被分配到一個分區(如圖1中紅色箭頭d所示)。當所有分區都分配到了消費者時,部分消費者將閑置,直到需要時才會被使用。如果組中的消費者數量小于主題的分區數量,則一個消費者可以被分配到多個分區。在任何情況下,每組最多只能有一個消費者被分配到某個特定分區。Kafka知道,各分區的哪一條消息是最后發出的。新的訂閱者將從這條消息開始讀取,除非其定位到消息日志中靠前的位置。

Trafodion的Kafka消費者

本博客旨在介紹可以將Kafka數據導入Trafodion數據庫的技術,同時說明為什么常用的方法不一定是最佳的方法。

Kafka消費者是一個Java程序,可以“讀取”發布至Kafka的消息并根據需要對數據進行格式化,以便通過JDBC將數據“寫入”Trafodion的數據庫表。程序中的Kafka處理可以簡要地概括如下:

  • 實例化一個Kafka類(例如,KafkaConsumer),向程序提供輸入數據
  • 為該類設置屬性值(例如,主題名稱topic name、起始位置starting position、服務器位置broker location、超時限制timeout limits、zookeeper位置zookeeper location等)
  • 從主題中讀?。╬oll)消息。

通常,根據具有一個或多個存儲分區的主題來組織Kafka的消息發布,以便增加容量。Kafka會跟蹤已成功提交至各個分區的消息的最高偏移量(offset)。當在分區上重啟一個新的消費者進程時,消息提交將恢復到記錄的偏移量位置。關于更多信息,請參閱https://kafka.apache.org/documentation/。因此,主題、分區數量和偏移量共同組成了分區的“重啟點”。KafkaConsumer類屬性設置enable.auto.commit或對KafkaConsumer.commitSync()的顯式調用都會影響保存的實際偏移量值。

下面,我們來看看如何將Kafka消息導入Trafodion表。

Trafodion表

Trafodion表是一個SQL表,它是關系型SQL表到HBase表的映射。此處,我們使用了由以下DDL定義的表。

如果要對表進行分區,則需要使用PRIMARY KEY或STORE BY子句(請參閱//trafodion.apache.org/docs/sql_reference/Trafodion_SQL_Reference_Manual.pdf。

此處,該行中的五列對應了發布的Kafka消息的五個字段。

JDBC技術

在如今的IT環境中,Kafka、Trafodion和Kafka消費者可能分別在不同的集群上運行。本次測試使用了一個四節點的集群,CDH 5.9和EsgynDB v2.3在集群的所有節點上運行,Kafka 2.11安裝在一個節點上,而Java消費者程序在另一個節點上運行。

Trafodion SQL支持INSERT和UPSERT DML語法。INSERT用于添加新行,UPSERT用于更新現有的匹配行(如果不存在匹配行,則添加新行)。我們比較了INSERT和UPSERT語法,還探索了INSERT-SELECT技術(使用用戶定義函數,可能適用于某些情況)。

諸如INSERT和UPSERT的DML語法會對數據庫進行更改。在JDBC中,數據庫更改受到事務(transaction)的?;?。在默認情況下,各DML語句都作為獨立的事務運行,除非連接被設置為僅在顯式請求commit()上終止事務。例如,在默認情況下,10個獨立的INSERT是10個事務,各INSERT執行之后,添加的各個新行立即可見。然而,如果設置了autoCommit=off,這10個INSERT還是會對表進行更新,但是只有當執行到第10個INSERT之后調用了commit()時,才能訪問新添加的10行?;瘓浠八?,如果在執行第6條INSERT之前,程序發生故障:在第一種情況下,之前新添加的5行依然可見;在第二種情況下,之前添加的行均不可訪問。在JDBC中,一次性提交一組記錄稱為批處理(batching)。

本次測試都是在這些技術的基礎上進行的,以下的代碼片段是這些技術的使用示例。

測試結果

在實際的操作中,Kafka的消息到達率覆蓋了流式應用到涓流饋送。如果要比較數據攝入技術的差異,使用較大的數據量可以產生較好的對比效果。因此,我們向主題的單個分區發布了300,000條144字節的消息,每次測試都使用相同的Kafka消息。

關于目標Trafodion表,請見上文。

哪條語句的性能更好?是INSERT還是UPSERT?表1顯示了測試的屬性設置和相應的結果。

作為基準參考,每次都要對行的添加進行計時。因此,相比逐條發送記錄,批量發送記錄塊的好處是顯而易見的。

很明顯,無論塊因子是多少,批量UPSERT的性能總是優于INSERT。插入300,000行的最快運行時間僅為7.8秒。根據消息大小、表的列數量、記錄數量等其他因素的不同,統計數據會有所不同,但是UPSERT的性能始終優于INSERT。在我們的測試中,當批大小為4000時,UPSERT的性能最佳。您可能需要反復測試,才能找到應用程序的“性能較優”設置。

添加所有消息所需的運行時間是通過編程的方式捕獲的,該總運行時間包括了Kafka poll的1秒超時值(消息流耗盡時)。如果設置較低的超時值,UPSERT的性能數據將進一步提高。

為何會有這樣的性能區別?因為INSERT語句的SQL語義要求在插入新行之前檢查是否已存在該行。由于Trafodion表是HBase表,因此該操作的HBase接口一次僅允許一行。相反,由于UPSERT語句覆寫所有現有的行,因此不需要進行檢查。另外,UPSERT的HBase接口可以接受一組行。

關于數據加載的Trafodion技術,請參閱https://cwiki.apache.org/confluence/display/TRAFODION/Data+Loading//trafodion.apache.org/docs/load_transform/index.html#introduction-insert-types(Trafodion加載和轉換指南)。

關于JDBC批處理的評價:在這些測試中,由于已經創建了Kafka數據,因此在用數據填充批次時沒有延遲(延遲不利于計時數據)。在操作中,Kafka數據流的到達率可能是不規則的。無論將批大小設置為多少,在達到批處理限制之前總是會發生事務延遲。您可以通過類似于以下的方法減輕事務延遲:降低批大小,進行更頻繁的提交;或在發生一定次數的Kafka poll超時(即,無數據)之后,提交當前批的行——實際上,就是根據Kafka的輸入負載,動態地調整批大小。

另一種方法?

UPSERT語法有另一種變體——UPSERT USING LOAD。

根據Trafodion SQL參考手冊//trafodion.apache.org/docs/sql_reference/Trafodion_SQL_Reference_Manual.pdf,該操作在沒有事務的情況下發生,用于將數據導入空表。該語句的典型用法為:

UPSERT USING LOAD INTO target_table SELECT * FROM source_table

如何將其應用于Kafka場景?源表是什么?

通過Trafodion的用戶定義函數(UDF)架構,即可實現應用。用戶編碼的Java過程(TMUDF)遵循指定的編碼協議,以生成一個虛擬的行列組(即,一個表,該表可以在SQL查詢的任何位置使用)。在這種情況下,我們需要一個TMUDF將具有消息字段的Kafka消息轉換為多行的數據列。關于Trafodion UDF架構的更多信息,請參閱。https://cwiki.apache.org/confluence/display/TRAFODION/Tutorial%3A+The+object-oriented+UDF+interface。

編寫了一個運行正常的Kafka消費者程序,大部分的代碼就已經完成。UDF中不需要進行數據庫處理(只有部分Kafka需要)。為了符合UDF的編程模型,要進行一些修改。因此,創建一個TMUDF用于讀取Kafka主題的一個分區。

我們并沒有深究具體的UDF邏輯,但是已經證明了Kafka日志中的300,000條消息被讀取并生成了300,000行(每行包含5列)。消費者程序使用的UPSERT語句如下:

UPSERT USING LOAD INTO blogdata SELECT * FROM udf(kafread( …))

其中,UDF代碼使用值列表,用于Kafka連接。

當在消費者程序中執行了該語句時,結果如表2所示:

在本例中,使用UDF是一項原子操作(所有或全部沒有)。與可以被打斷/恢復并進行不斷控制的行/批插入邏輯不同,UDF Insert-Select是不能被打斷的。

因此,雖然可以接受加快加載速度,但是進行恢復和重啟是不利的(Kafka消息隊列的位置信息無法通過UDF傳回至調用者)。

結論

對于消息速率較低的Kafka應用程序,每次通過INSERT或UPSERT向Trafodion數據庫插入或更新消息可能是差不多的。但是對于消息速率較高的Kafka應用程序,UPSERT的性能比INSERT更佳。即使JDBC批處理可以提高INSERT的頻次,但是UPSERT真正的優勢是其面向集合的處理。以往的習慣可能很難改變,但是在考慮通過INSERT插入數據時,也不要忘記還可以選擇UPSERT。

關于消費者程序的源代碼和Kafka UDF,請參閱https://github.com/esgyn/code-examples/tree/master/kafkablog。