馨提示:要看高清無碼套圖,請使用手機打開并單擊圖片放大查看。
1.文檔編寫目的
Kafka從0.8版本以后出了新的API接口,用于異步方式發送消息,性能優于舊的API,本篇文章主要使用新的API接口進行測試。繼上一篇文章
如何通過Cloudera Manager為Kafka啟用Kerberos及使用
,本篇文章主要講述如何使用Java連接Kerberos的Kafka集群生產和消費消息。
1.環境準備
2.創建Java工程
3.編寫生產消息代碼
4.編寫消費消息代碼
5.測試
1.RedHat7.2
2.CM和CDH版本為5.11.2
3.Kafka2.2.0-0.10.2
1.Intellij已安裝且正常運行
2.Maven環境正常
2.環境準備
1.創建topic,test3有3個replication,3個partition
[ec2-user@ip-172-31-22-86~]$ kafka-topics --create --zookeeper ip-172-31-22-86.ap-southeast-1.compute.internal:2181 --replication-factor 3 --partitions 3 --topic test3
2.krb5.conf配置(直接使用CDH集群的Kerberos配置)
# Configuration snippets may beplaced in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default=FILE:/var/log/krb5libs.log
kdc=FILE:/var/log/krb5kdc.log
admin_server=FILE:/var/log/kadmind.log
[libdefaults]
dns_lookup_realm=false
ticket_lifetime=24h
renew_lifetime=7d
forwardable=true
rdns=false
default_realm=CLOUDERA.COM
#default_ccache_name=KEYRING:persistent:%{uid}
[realms]
CLOUDERA.COM={
kdc=ip-172-31-22-86.ap-southeast-1.compute.internal
admin_server=ip-172-31-22-86.ap-southeast-1.compute.internal
}
[domain_realm]
.ip-172-31-22-86.ap-southeast-1.compute.internal=CLOUDERA.COM
ip-172-31-22-86.ap-southeast-1.compute.internal=CLOUDERA.COM
3.Kerberos的keytab文件
使用kadmin為Kerberos賬號生成keytab,fayson.keytab文件生成在當前目錄下。
[ec2-user@ip-172-31-22-86~]$ sudo kadmin.local
Authenticating as principal hdfs/admin@CLOUDERA.COM with password.
kadmin.local: xst -norandkey -k fayson.keytab fayson@CLOUDERA.COM
...
kadmin.local: exit
[ec2-user@ip-172-31-22-86~]$
4.jaas-cache.conf配置文件
KafkaClient{
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/Volumes/Transcend/keytab/fayson.keytab"
principal="fayson@CLOUDERA.COM";
};
5.在當前開發環境下配置集群的主機信息到hosts文件
在/etc/hosts文件中添加
提示:Fayson使用的AWS環境,所以使用公網IP和hostname對應。如果你的開發環境可以直連Hadoop集群,可以直接配置Hadoop內網IP和hostname對應即可。
3.創建Java工程
1.使用Intellij創建Java Maven工程
2.在pom.xml配置文件中增加Kafka API的Maven依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency>
4.編寫生產消息代碼
package com.cloudera;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by fayson on 2017/10/24.
*/
public class MyProducer {
public static String TOPIC_NAME ="test3";
public static void main(String[] args){
System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
// System.setProperty("sun.security.krb5.debug","true");
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
Producer<String,String> producer=new KafkaProducer<String,String>(props);
for (int i=0; i < 10; i++) {
String key="key-"+ i;
String message="Message-"+ i;
ProducerRecord record=new ProducerRecord<String, String>(TOPIC_NAME, key, message);
producer.send(record);
System.out.println(key + "----"+ message);
}
producer.close();
}
}
5.編寫消費消息代碼
package com.cloudera;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
/**
* Created by fayson on 2017/10/24.
*/
public class MyConsumer {
private static String TOPIC_NAME ="test3";
public static void main(String[] args){
System.setProperty("java.security.krb5.conf", "/Volumes/Transcend/keytab/krb5.conf");
System.setProperty("java.security.auth.login.config", "/Volumes/Transcend/keytab/jaas-cache.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
Properties props=new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip-172-31-21-45.ap-southeast-1.compute.internal:9092,ip-172-31-26-102.ap-southeast-1.compute.internal:9020,ip-172-31-26-80.ap-southeast-1.compute.internal:9020");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String,String>(props);
TopicPartition partition0=new TopicPartition(TOPIC_NAME, 0);
TopicPartition partition1=new TopicPartition(TOPIC_NAME, 1);
TopicPartition partition2=new TopicPartition(TOPIC_NAME, 2);
consumer.assign(Arrays.asList(partition0,partition1, partition2));
ConsumerRecords<String,String> records=null;
while (true){
try {
Thread.sleep(10000l);
System.out.println();
records=consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Receivedmessage: (" + record.key() + "," + record.value() + ") at offset " + record.offset());
}
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
6.代碼測試
1.執行消費程序,消費topic為test3的所有partition消息
啟動成功,等待消費test3的消息
2.執行生產消息程序,向test3的topic生產消息
向test3的topic發送的消息
3.查看消費程序讀取到的消息
7.總結
在開發環境下通過Java代碼直接連接到已啟用Kerberos的Kafka集群時,則需要將krb5.conf和jaas.conf配置加載到程序運行環境中。至于使用Kerberos密碼的方式Fayson也不會。
測試使用的topic有3個partiton,如果沒有將所有的broker列表配置到bootstrap.servers中,會導致部分消息丟失。
參考文檔:
http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
http://kafka.apache.org/documentation/#api
為天地立心,為生民立命,為往圣繼絕學,為萬世開太平。
溫馨提示:要看高清無碼套圖,請使用手機打開并單擊圖片放大查看。
您可能還想看
安裝
CENTOS6.5安裝CDH5.12.1(一)
CENTOS6.5安裝CDH5.12.1(二)
CENTOS7.2安裝CDH5.10和Kudu1.2(一)
CENTOS7.2安裝CDH5.10和Kudu1.2(二)
如何在CDH中安裝Kudu&Spark2&Kafka
如何升級Cloudera Manager和CDH
如何卸載CDH(附一鍵卸載github源碼)
如何遷移Cloudera Manager節點
如何在Windows Server2008搭建DNS服務并配置泛域名解析
安全
如何在CDH集群啟用Kerberos
如何在Hue中使用Sentry
如何在CDH啟用Kerberos的情況下安裝及使用Sentry(一)
如何在CDH啟用Kerberos的情況下安裝及使用Sentry(二)
如何在CDH未啟用認證的情況下安裝及使用Sentry
如何使用Sentry管理Hive外部表權限
如何使用Sentry管理Hive外部表(補充)
如何在Kerberos與非Kerberos的CDH集群BDR不可用時復制數據
Windows Kerberos客戶端配置并訪問CDH
數據科學
如何在CDSW中使用R繪制直方圖
如何使用Python Impyla客戶端連接Hive和Impala
如何在CDH集群安裝Anaconda&搭建Python私有源
如何使用CDSW在CDH中分布式運行所有R代碼
如何使用CDSW在CDH集群通過sparklyr提交R的Spark作業
如何使用R連接Hive與Impala
如何在Redhat中安裝R的包及搭建R的私有源
如何在Redhat中配置R環境
什么是sparklyr
其他
CDH網絡要求(Lenovo參考架構)
大數據售前的中年危機
如何實現CDH元數據庫MySQL的主備
如何在CDH中使用HPLSQL實現存儲過程
如何在Hive&Impala中使用UDF
Hive多分隔符支持示例
推薦關注Hadoop實操,第一時間,分享更多Hadoop干貨,歡迎轉發和分享。
原創文章,歡迎轉載,轉載請注明:轉載自微信公眾號Hadoop實操
不管是那種編程語言, 代碼的注釋都是必備的語法功能, 并且一個好的程序的指標之一,就是能有一個好的注釋。 那 Java 中的注釋是怎么定義的呢? 我們來說說。
單行注釋: Java 中最簡單的注釋方法, 使用兩個反斜杠 // 就可以了。注釋的內容從 // 開始。
舉個例子:
單行注釋不僅可以注釋備注信息, 并且也可以注釋代碼內容。
// 返回一個字符串
//return "苗子說全棧 ";
return "";
在學習初期可以使用 System.out 輸出字符串進行一些調試, 初學可以考慮這個, 后期學習日志框架之后, 推薦使用日志的方式輸出調試信息。這里只是演示單行注釋的用法。
單行注釋的推薦寫法就是寫在代碼的上一行中。
多行注釋:Java 中使用以 /* 開始, 以 */ 結束的注釋方式。
舉個例子:
需要注意:
對于多行注釋開始和結束中間不要再有結束符。為了好看會在注釋內容首字母寫入 * 舉例說明:
//正確的
/*
* 多行注釋
*/
//錯誤的實例
/*
*這個是多行注釋
/*
*多行注釋內容
*/
*/ 這里就錯誤了。
匹配規則就是 /* 與之對應的最近的 */ 結束符。
除了單行注釋和多行注釋,還有一種是文檔注釋。
編寫任何的代碼,都少不了編寫程序的文檔, 怎么高效的編寫文檔內容,并且文檔的內容能夠隨著版本的更新進行更新。Java 語言中有一種注釋規則, 就是文檔注釋類型。對于文檔注釋的內容,是可以通過 Java 內置命令行工具 javadoc 生成對應的文檔內容的。并且Java 的 API 也是基于這種機制生成的文檔。
使用文檔注釋的方式 以 /** 開頭 */ 結尾。包含在這之內的內容就是文檔注釋的內容。 而且針對文檔注釋有一系列的標記。該注釋一般會放在 類、方法、變量、常量上。
演示案例:
這里就使用了文檔的注釋方法, 并且使用了 @author 作者標記, @since 該類從哪個版本開始實現的。
在方法上我們使用了 @param 參數標記, 主要是給參數加上一個說明。
標記分類 “類標記” 和 “方法標記”常用的有以下列表,只是通用這樣并不是非要這樣寫,這樣寫可以減少再次溝通的成本。
當然也有通用的用法, 可以標記上面說的四種類型。
把上面的源碼生成以下文檔, 我們看一下大概的內容。
javadoc -encoding utf-8 -d docs Hello.java
進入到 docs 打開index.html。效果如下:
這樣你開發的這個文件的開發文檔就編寫完成。非常 easy, 非常 nice ! 我們來看看 Java 17 的 Object.java 的源碼。
找到安裝路徑, 然后根目錄中找到 lib/src.zip 這個壓縮包就是 JDK 17 的源碼。
解壓這個 src.zip 文件。 因為現在 JDK 都是基于模塊開發。 找到基礎模塊 java.base 并進入到 java.lang 目錄, 并找到 Object.java 類。這個是類中之王。 除了基礎的類所有的類都默認實現該類。
不需要理解這里面的含義, 只需要理解注釋的使用方式。 package 是包名, 后續會針對這個單獨開一章, 你可以認為這是一個文件的層次劃分。主要是為了解決類名重復的問題。package 上的內容是針對 Java 的開源協議的一個說明。 以后你如果想開源軟件, 可以參考這種方式編寫你的開源協議內容。
進入到 Java 17 的文檔 API 官方頁面。
地址為:https://docs.oracle.com/en/java/javase/17/docs/api/index.html
同樣的方式進入到 java.base 的模塊中。 Java.base 是 Java SE 的基礎 API。
進入到 Package 的 java.lang 中。并翻閱到 All Classes and Interfaces 中, 找到 Object。
或者直接進入該地址: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Object.html
可以看到如下界面:
可以看到從版面UI, 還是風格都和 javadoc 生成的一模一樣, 從這里也能看出來 Java 的 api 文檔,就是使用這種方式進行生成的。 從源碼中直接獲得注釋, 這樣你的注釋才顯得可靠有據。
從該類的描述中我們也能看到 說是 Object 的這個類是所有的類的根類。 也就是后面說所有的對象類都有一個超類。也都繼承了該類的相關方法和實現。
扯得有點遠了。 先知道有 3 種注釋方法, 知道有 Java 的 API 存在, 這篇內容對于初學的你來說。 也就足夠了。
后續更多內容。 關注我。
Kafka 最佳實踐,涉及
Kafka 能夠對接到 Spark、Flink、Flume 等多個主流的流數據處理技術。利用 Kafka 高吞吐量的特點,客戶可以通過 Kafka 建立傳輸通道,把應用側的海量數據傳輸到流數據處理引擎中,數據經過處理分析后,可支持后端大數據分析,AI 模型訓練等多種業務。
Kafka 最常用也是我最熟悉的場景是日志分析系統。典型的實現方式是在客戶端部署 日志收集器(如 Fluentd、Filebeat 或者 Logstash 等)進行日志采集,并將數據發送到 Kafka,之后通過后端的 ES 等進行數據運算,再搭建一個展示層如 Kibana 進行統計分析數據的展示。
隨著有價值的用例的出現,物聯網(IoT)正得到越來越多的關注。然而,一個關鍵的挑戰是整合設備和機器來實時和大規模地處理數據。Apache Kafka?及其周邊的生態系統,包括Kafka Connect、Kafka Streams,已經成為集成和處理這類數據集的首選技術。
Kafka 已經被用于許多物聯網部署,包括消費者物聯網和工業物聯網(IIoT)。大多數場景都需要可靠、可伸縮和安全的端到端集成,從而支持實時的雙向通信和數據處理。一些具體的用例是:
具體的實現架構如下圖所示:
生產者需要設置 request.required.acks=ALL,服務端主節點寫成功且備節點同步成功才 返回 Response。
消費者接收消息后,應先進行對應業務操作,隨后再進行 commit 標識消息已被處理,通過這種處理方式可以確保一條消息在業務處理失敗時,能夠重新被消費。注意消費者的 enable.auto.commit 參數需要設置為 False,確保 commit 動作手工控制。
保障一條消息最多投放一次,需要設置 request.required.acks=0,同時設置 retries=0。這里的原理是生產者遇到任何異常都不重試,并且不考慮 broker 是否響應寫入成功。
保障一條消息最多被消費一次,需要消費者在收到消息后先進行 commit 標識消息已被處理,隨后再進行對應業務操作。這里的原理是消費者不需要管實際業務的處理結果,拿到消息以后立刻 commit 告訴 broker 消息處理成功。 注意消費者的 enable.auto.commit 參數需要設置為 False,確保 commit 動作手工控制。
Kafka 0.11 版本起新增了冪等消息的語義,通過設置 enable.idempotence=true 參數,可以實現單個分區的消息冪等。
如果 Topic 涉及多個分區或者需要多條消息封裝成一個事務保障冪等,則需要增加 Transaction 控制,樣例如下:
// 開啟冪等控制參數
producerProps.put("enbale.idempotence", "true");
// 初始化事務
producer.initTransactions();
// 設置事務 ID
producerProps.put("transactional.id", "id-001");
try{
// 開始事務,并在事務中發送 2 條消息
producer.beginTranscation();
producer.send(record0);
producer.send(record1);
// 提交事務
producer.commitTranscation();
} catch( Exception e ) {
producer.abortTransaction();
producer.close();
}
需要設置 isolation.level=read_committed,并設置 enable.auto.commit=false,確保消費者只消費生產者已經提交事務的消息,消費者業務需要確保事務性避免重復處理消息,比如說把消息持久化到數據庫,然后向服務端提交 commit。
At Least Once 是最常用的語義,可確保消息只多不少的發送和消費,性能和可靠性上有較好的平衡,可以作為默認選用的模式。業務側也可以通過在消息體加入唯一的業務主鍵自行保障冪等性,在消費側確保同一個業務主鍵的消息只被處理一次。
Exactly Once 語義一般用絕對不容許重復的關鍵業務,典型案例是訂單和支付相關場景。
At Most Once 語義一般用在非關鍵業務,業務對于消息丟失并不敏感,只需要盡量確保消息成功生產消費即可。典型使用 At Most Once 語義的場景是消息通知,出現少量遺漏消息影響不大,相比之下重復發送通知會造成較壞的用戶體驗。
以下匯總了通過 partition 調優性能建議考慮的維度,建議您根據理論分析配合壓力測試對系統整體性能進行調優。
考慮維度 | 說明 |
吞吐量 | 增加 partition 的數量可以消息消費的并發度,當系統瓶頸在于消費端,而消費端又可以水平擴展的時候,增加 partition 可以增加系統吞吐量。 在 Kafka 內部每個 Topic 下的每個 partition 都是一個獨立的消息處理通道 , 一個 partition 內的消息只能被同時被一個 consumer group 消費,當 consumer group 數量多于partition的數量時,多余的 consumer group 會出現空閑。 |
消息順序 | Kafka 可以保障一個 partition 內的消息順序性,partition 之間的消息順序無法保證,增加 partition 的時候需要考慮消息順序對業務的影響。 |
實例 Partition 上限 | Partition 增加會消耗底層更多的內存,IO 和文件句柄等資源。在規劃 Topic 的 partition 數量時需要考慮 Kafka 集群能支持的 partition 上限。 |
生產者,消費者與 partition 的關系說明。
如果 Topic 設置了多個分區,生產者發送消息時需要先確認往哪個分區發送。在給同一個分區發送多條消息時,Producer 客戶端會將相關消息打包成一個 Batch,批量發送到服務端。一般情況下,小 Batch 會導致 Producer 客戶端產生大量請求,造成請求隊列在客戶端和服務端的排隊,從而整體推高了消息發送和消費延遲。
一個合適的 batch 大小,可以減少發送消息時客戶端向服務端發起的請求次數,在整體上提高消息發送的吞吐和延遲。
Batch 參數說明如下:
參數 | 說明 |
batch.size | 發往每個分區(Partition)的消息緩存量(消息內容的字節數之和,不是條數)。達到設置的數值時,就會觸發一次網絡請求,然后 Producer 客戶端把消息批量發往服務器。 |
linger.ms | 每條消息在緩存中的最長時間。若超過這個時間,Producer 客戶端就會忽略 batch.size 的限制,立即把消息發往服務器。 |
buffer.memory | 所有緩存消息的總體大小超過這個數值后,就會觸發把消息發往服務器,此時會忽略 batch.size 和 linger.ms 的限制。buffer.memory 的默認數值是 32MB,對于單個 Producer 而言,可以保證足夠的性能。 |
Batch 相關參數值的選擇并沒有通用的方法,建議針對性能敏感的業務場景進行壓測調優。
Kafka 生產者與服務端發送消息時有批量發送的機制,只有發送到相同 Partition 的消息才會被放到同一個 Batch 中。在大批量發送場景,如果消息散落到多個 Partition 當中就可能會形成多個小 Batch,導致批量發送機制失效而降低性能。
Kafka 默認選擇分區的策略如下
場景 | 策略 |
消息指定 Key | 對消息的 Key 進行哈希,然后根據哈希結果選擇分區,保證相同 Key 的消息會發送到同一個分區。 |
消息沒有指定 Key | 默認策略是循環使用主題的所有分區,將消息以輪詢的方式發送到每一個分區上。 |
從默認機制可見 partition 的選擇隨機性很強,因此在大批量傳輸的場景下,推薦設置 partitioner.class參數,指定自定義的分區選擇算法實現 粘性分區。
其中一種實現方法是在固定的時間段內使用同一個 partition,過一段時間切換到下一個分區,避免數據散落到多個不同 partition。
Kafka 會在同一個 partition 內保障消息順序,如果 Topic 存在多個 partition 則無法確保全局順序。如果需要保障全局順序,則需要控制 partition 數量為 1 個。
消息隊列 Kafka 的消息有 Key(消息標識)和 Value(消息內容)兩個字段。為了便于追蹤,建議為消息設置一個唯一的 Key。之后可以通過 Key 追蹤某消息,打印發送日志和消費日志,了解該消息的生產和消費情況。
分布式環境下,由于網絡等原因,消息偶爾會出現發送失敗的情況,其原因可能是消息已經發送成功但是 ACK 機制失敗或者消息確實沒有發送成功。默認的參數能滿足大部分場景,但可以根據業務需求,按需設置以下重試參數:
參數 | 說明 |
retries | 重試次數,默認值為 3,但對于數據丟失零容忍的應用而言,請考慮設置為 Integer.MAX_VALUE(有效且最大)。 |
retry.backoff.ms | 重試間隔,建議設置為 1000。 |
? 注意:
如果希望實現 At Most Once 語義,重試需要關閉。
Spark Streaming 是 Spark Core 的一個擴展,用于高吞吐且容錯地處理持續性的數據,目前支持的外部輸入有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。
Spark Streaming 將連續數據抽象成 DStream(Discretized Stream),而 DStream 由一系列連續的 RDD(彈性分布式數據集)組成,每個 RDD 是一定時間間隔內產生的數據。使用函數對 DStream 進行處理其實即為對這些 RDD 進行處理。
使用 Spark Streaming 作為 Kafka 的數據輸入時,可支持 Kafka 穩定版本與實驗版本:
Kafka Version | spark-streaming-kafka-0.8 | spark-streaming-kafka-0.10 |
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
Api Maturity | Deprecated | Stable |
Language Support | Scala、Java、Python | Scala、Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit Api | No | Yes |
Dynamic Topic Subscription | No | Yes |
本次實踐使用 0.10.2.1 版本的 Kafka 依賴。
創建 Kafka 集群的步驟略,再創建一個名為 test 的 Topic。
Centos6.8 系統
package | version |
sbt | 0.13.16 |
hadoop | 2.7.3 |
spark | 2.1.0 |
protobuf | 2.5.0 |
ssh | CentOS 默認安裝 |
Java | 1.8 |
具體安裝步驟略,包括以下步驟:
這里使用 0.10.2.1 版本的 Kafka 依賴。
name :="Producer Example"
version :="1.0"
scalaVersion :="2.11.8"
libraryDependencies +="org.apache.kafka" % "kafka-clients" % "0.10.2.1"
import java.util.Properties
import org.apache.kafka.clients.producer._
object ProducerExample extends App {
val props=new Properties()
props.put("bootstrap.servers", "172.0.0.1:9092") //實例信息中的內網 IP 與端口
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer=new KafkaProducer[String, String](props)
val TOPIC="test" //指定要生產的 Topic
for(i<- 1 to 50){
val record=new ProducerRecord(TOPIC, "key", s"hello $i") //生產 key 是"key",value 是 hello i 的消息
producer.send(record)
}
val record=new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close() //最后要斷開
}
更多有關 ProducerRecord 的用法請參考 ProducerRecord 文檔。
####### DirectStream
name :="Consumer Example"
version :="1.0"
scalaVersion :="2.11.8"
libraryDependencies +="org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies +="org.apache.spark" %% "spark-streaming" % "2.1.0"
libraryDependencies +="org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0"
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams=Map[String, Object](
"bootstrap.servers" -> "172.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream_test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> "false"
)
val sparkConf=new SparkConf()
sparkConf.setMaster("local")
sparkConf.setAppName("Kafka")
val ssc=new StreamingContext(sparkConf, Seconds(5))
val topics=Array("spark_test")
val offsets : Map[TopicPartition, Long]=Map()
for (i <- 0 until 3){
val tp=new TopicPartition("spark_test", i)
offsets.updated(tp , 0L)
}
val stream=KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
println("directStream")
stream.foreachRDD{ rdd=>
//輸出獲得的消息
rdd.foreach{iter=>
val i=iter.value
println(s"${i}")
}
//獲得offset
val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter=>
val o: OffsetRange=offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
####### RDD
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import collection.JavaConversions._
import Array._
object Kafka {
def main(args: Array[String]) {
val kafkaParams=Map[String, Object](
"bootstrap.servers" -> "172.0.0.1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sc=new SparkContext("local", "Kafka", new SparkConf())
val java_kafkaParams : java.util.Map[String, Object]=kafkaParams
//按順序向 parition 拉取相應 offset 范圍的消息,如果拉取不到則阻塞直到超過等待時間或者新生產消息達到拉取的數量
val offsetRanges=Array[OffsetRange](
OffsetRange("spark_test", 0, 0, 5),
OffsetRange("spark_test", 1, 0, 5),
OffsetRange("spark_test", 2, 0, 5)
)
val range=KafkaUtils.createRDD[String, String](
sc,
java_kafkaParams,
offsetRanges,
PreferConsistent
)
range.foreach(rdd=>println(rdd.value))
sc.stop()
}
}
更多 kafkaParams 用法參考 kafkaParams 文檔。
Apache Flume 是一個分布式、可靠、高可用的日志收集系統,支持各種各樣的數據來源(如 HTTP、Log 文件、JMS、監聽端口數據等),能將這些數據源的海量日志數據進行高效收集、聚合、移動,最后存儲到指定存儲系統中(如 Kafka、分布式文件系統、Solr 搜索服務器等)。
Flume 基本結構如下:
Flume 以 agent 為最小的獨立運行單位。一個 agent 就是一個 JVM,單個 agent 由 Source、Sink 和 Channel 三大組件構成。
Flume 與 Kafka
把數據存儲到 HDFS 或者 HBase 等下游存儲模塊或者計算模塊時需要考慮各種復雜的場景,例如并發寫入的量以及系統承載壓力、網絡延遲等問題。Flume 作為靈活的分布式系統具有多種接口,同時提供可定制化的管道。
在生產處理環節中,當生產與處理速度不一致時,Kafka 可以充當緩存角色。Kafka 擁有 partition 結構以及采用 append 追加數據,使 Kafka 具有優秀的吞吐能力;同時其擁有 replication 結構,使 Kafka 具有很高的容錯性。
所以將 Flume 和 Kafka 結合起來,可以滿足生產環境中絕大多數要求。
Kafka 可作為 Source 或者 Sink 端對消息進行導入或者導出。
配置 kafka 作為消息來源,即將自己作為消費者,從 Kafka 中拉取數據傳入到指定 Sink 中。主要配置選項如下:
配置項 | 說明 |
channels | 自己配置的 Channel |
type | 必須為:org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | Kafka Broker 的服務器地址 |
kafka.consumer.group.id | 作為 Kafka 消費端的 Group ID |
kafka.topics | Kafka 中數據來源 Topic |
batchSize | 每次寫入 Channel 的大小 |
batchDurationMillis | 每次寫入最大間隔時間 |
示例:
tier1.sources.source1.type=org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels=channel1
tier1.sources.source1.batchSize=5000
tier1.sources.source1.batchDurationMillis=2000
tier1.sources.source1.kafka.bootstrap.servers=localhost:9092
tier1.sources.source1.kafka.topics=test1, test2
tier1.sources.source1.kafka.consumer.group.id=custom.g.id
更多內容請參考 Apache Flume 官網。
配置 Kafka 作為內容接收方,即將自己作為生產者,推到 Kafka Server 中等待后續操作。主要配置選項如下:
配置項 | 說明 |
channel | 自己配置的 Channel |
type | 必須為:org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | Kafka Broker 的服務器 |
kafka.topics | Kafka 中數據來源 Topic |
kafka.flumeBatchSize | 每次寫入的 Bacth 大小 |
kafka.producer.acks | Kafka 生產者的生產策略 |
示例:
a1.sinks.k1.channel=c1
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=mytopic
a1.sinks.k1.kafka.bootstrap.servers=localhost:9092
a1.sinks.k1.kafka.flumeBatchSize=20
a1.sinks.k1.kafka.producer.acks=1
更多內容請參考 Apache Flume 官網。
Storm 是一個分布式實時計算框架,能夠對數據進行流式處理和提供通用性分布式 RPC 調用,可以實現處理事件亞秒級的延遲,適用于對延遲要求比較高的實時數據處理場景。
在 Storm 的集群中有兩種節點,控制節點Master Node和工作節點Worker Node。Master Node上運行Nimbus進程,用于資源分配與狀態監控。Worker Node上運行Supervisor進程,監聽工作任務,啟動executor執行。整個 Storm 集群依賴zookeeper負責公共數據存放、集群狀態監聽、任務分配等功能。
用戶提交給 Storm 的數據處理程序稱為topology,它處理的最小消息單位是tuple,一個任意對象的數組。topology由spout和bolt構成,spout是產生tuple的源頭,bolt可以訂閱任意spout或bolt發出的tuple進行處理。
Storm 可以把 Kafka 作為spout,消費數據進行處理;也可以作為bolt,存放經過處理后的數據提供給其它組件消費。
Centos6.8系統
package | version |
maven | 3.5.0 |
storm | 2.1.0 |
ssh | 5.3 |
Java | 1.8 |
pom.xml 配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>storm</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>ExclamationTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
topology 代碼:
//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerSpout {
//申請的kafka實例ip:port
private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
//指定要將消息寫入的topic
private final static String TOPIC="storm_test";
public static void main(String[] args) throws Exception {
//設置producer屬性
//函數參考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//屬性參考:http://kafka.apache.org/0102/documentation.html
Properties properties=new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//創建寫入kafka的bolt,默認使用fields("key" "message")作為生產消息的key和message,也可以在FieldNameBasedTupleToKafkaMapper()中指定
KafkaBolt kafkaBolt=new KafkaBolt()
.withProducerProperties(properties)
.withTopicSelector(new DefaultTopicSelector(TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
TopologyBuilder builder=new TopologyBuilder();
//一個順序生成消息的spout類,輸出field是sentence
SerialSentenceSpout spout=new SerialSentenceSpout();
AddMessageKeyBolt bolt=new AddMessageKeyBolt();
builder.setSpout("kafka-spout", spout, 1);
//為tuple加上生產到kafka所需要的fields
builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
//寫入kafka
builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");
Config config=new Config();
if (args !=null && args.length > 0) {
//集群模式,用于打包jar,并放到storm運行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} else {
//本地模式
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
創建一個順序生成消息的 spout 類:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class SerialSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector=spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
//生產一個UUID字符串發送給下一個組件
spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
為 tuple 加上 key、message 兩個字段,當 key 為 null 時,生產的消息均勻分配到各個 partition,指定了 key 后將按照 key 值 hash 到特定 partition 上:
//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class AddMessageKeyBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//取出第一個filed值
String messae=tuple.getString(0);
//System.out.println(messae);
//發送給下一個組件
basicOutputCollector.emit(new Values(null, messae));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//創建發送給下一個組件的schema
outputFieldsDeclarer.declare(new Fields("key", "message"));
}
}
使用 trident 類生成 topology:
//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerTrident {
//申請的kafka實例ip:port
private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
//指定要將消息寫入的topic
private final static String TOPIC="storm_test";
public static void main(String[] args) throws Exception {
//設置producer屬性
//函數參考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//屬性參考:http://kafka.apache.org/0102/documentation.html
Properties properties=new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//設置Trident
TridentKafkaStateFactory stateFactory=new TridentKafkaStateFactory()
.withProducerProperties(properties)
.withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
//設置使用fields("key", "value")作為消息寫入 不像FieldNameBasedTupleToKafkaMapper有默認值
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
TridentTopology builder=new TridentTopology();
//一個批量產生句子的spout,輸出field為sentence
builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
.each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
.partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());
Config config=new Config();
if (args !=null && args.length > 0) {
//集群模式,用于打包jar,并放到storm運行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
} else {
//本地模式
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("test", config, builder.build());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
private static class AddMessageKey extends BaseFunction {
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
//取出第一個filed值
String messae=tridentTuple.getString(0);
//System.out.println(messae);
//發送給下一個組件
//tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
tridentCollector.emit(new Values(null, messae));
}
}
}
創建一個批量生成消息的 spout 類:
//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class TridentSerialSentenceSpout implements IBatchSpout {
private final int batchCount;
public TridentSerialSentenceSpout(int batchCount) {
this.batchCount=batchCount;
}
@Override
public void open(Map map, TopologyContext topologyContext) {
}
@Override
public void emitBatch(long l, TridentCollector tridentCollector) {
Utils.sleep(1000);
for(int i=0; i < batchCount; i++){
tridentCollector.emit(new Values(UUID.randomUUID().toString()));
}
}
@Override
public void ack(long l) {
}
@Override
public void close() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf=new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
@Override
public Fields getOutputFields() {
return new Fields("sentence");
}
}
//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerSpout {
//申請的kafka實例ip:port
private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
//指定要將消息寫入的topic
private final static String TOPIC="storm_test";
public static void main(String[] args) throws Exception {
//設置重試策略
KafkaSpoutRetryService kafkaSpoutRetryService=new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
);
ByTopicRecordTranslator<String, String> trans=new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//設置consumer參數
//函數參考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//參數參考http://kafka.apache.org/0102/documentation.html
KafkaSpoutConfig spoutConfig=KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //設置group
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //設置session超時
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //設置請求超時
}})
.setOffsetCommitPeriodMs(10_000) //設置自動確認時間
.setFirstPollOffsetStrategy(LATEST) //設置拉取最新消息
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator(trans)
.build();
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("bolt", new BaseRichBolt(){
private OutputCollector outputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector=outputCollector;
}
@Override
public void execute(Tuple tuple) {
System.out.println(tuple.getStringByField("value"));
outputCollector.ack(tuple);
}
}, 1).shuffleGrouping("kafka-spout");
Config config=new Config();
config.setMaxSpoutPending(20);
if (args !=null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
}
else {
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(20000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerTrident {
//申請的kafka實例ip:port
private final static String BOOTSTRAP_SERVERS="xx.xx.xx.xx:xxxx";
//指定要將消息寫入的topic
private final static String TOPIC="storm_test";
public static void main(String[] args) throws Exception {
ByTopicRecordTranslator<String, String> trans=new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//設置consumer參數
//函數參考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//參數參考http://kafka.apache.org/0102/documentation.html
KafkaTridentSpoutConfig spoutConfig=KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //設置group
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //設置自動確認
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //設置session超時
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //設置請求超時
}})
.setFirstPollOffsetStrategy(LATEST) //設置拉取最新消息
.setRecordTranslator(trans)
.build();
TridentTopology builder=new TridentTopology();
// Stream spoutStream=builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); //事務型
Stream spoutStream=builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
System.out.println(tridentTuple.getStringByField("value"));
tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
}
}, new Fields("message"));
Config conf=new Config();
conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
if (args !=null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
}
else {
StormTopology stormTopology=builder.build();
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("test", conf, stormTopology);
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();stormTopology.clear();
}
}
}
使用 mvn package 編譯后,可以提交到本地集群進行 debug 測試,也可以提交到正式集群進行運行。
storm jar your_jar_name.jar topology_name
storm jar your_jar_name.jar topology_name tast_name
Logstash 是一個開源的日志處理工具,可以從多個源頭收集數據、過濾收集的數據并對數據進行存儲作為其他用途。
Logstash 靈活性強,擁有強大的語法分析功能,插件豐富,支持多種輸入和輸出源。Logstash 作為水平可伸縮的數據管道,與 Elasticsearch 和 Kibana 配合,在日志收集檢索方面功能強大。
Logstash 數據處理可以分為三個階段:inputs → filters → outputs。
同時 Logstash 支持編碼解碼,可以在 inputs 和 outputs 端指定格式。
? 注意:
Logstash 過濾消耗資源,如果部署在生產 server 上會影響其性能。
創建一個名為 logstash_test的 Topic。
Beats 平臺 集合了多種單一用途數據采集器。這些采集器安裝后可用作輕量型代理,從成百上千或成千上萬臺機器向目標發送采集數據。
Beats 有多種采集器,您可以根據自身的需求下載對應的采集器。本文以 Filebeat(輕量型日志采集器)為例,向您介紹 Filebeat 接入 Kafka 的操作指方法,及接入后常見問題的解決方法。
創建一個名為 test的 Topic。
進入 Filebeat 的安裝目錄,創建配置監控文件 filebeat.yml。
##=======Filebeat prospectors==========filebeat.prospectors:
- input_type: log
## 此處為監聽文件路徑
paths:
- /var/log/messages
##=======Outputs=========##------------------ kafka -------------------------------------
output.kafka:
version:0.10.2 // 根據不同 Kafka 集群版本配置
# 設置為Kafka實例的接入地址
hosts: ["xx.xx.xx.xx:xxxx"]
# 設置目標topic的名稱
topic: 'test'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: none
max_message_bytes: 1000000
# SASL 需要配置下列信息,如果不需要則下面兩個選項可不配置
username: "yourinstance#yourusername" //username 需要拼接實例ID和用戶名
password: "yourpassword"
{"@timestamp":"2017-09-29T10:01:27.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka1","offset":500,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:30.936Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka2","offset":508,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
{"@timestamp":"2017-09-29T10:01:33.937Z","beat":{"hostname":"10.193.9.26","name":"10.193.9.26","version":"5.6.2"},"input_type":"log","message":"ckafka3","offset":516,"source":"/data/ryanyyang/hcmq/beats/filebeat-5.6.2-linux-x86_64/testlog","type":"log"}
如果您需要進行 SALS/PLAINTEXT 配置,則需要配置用戶名與密碼。 在 Kafka 配置區域新增加 username 和 password 配置即可。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。