一個Apache Storm v2.0流計算入門項目的開發、測試和運行(IDEA/Maven)
關于流計算框架Apache Storm最新版的安裝,可以參考:
流計算框架-最新版Apache Storm v2.0單機模式安裝詳細步驟
流計算框架Apache Storm核心概念、架構設計
應用名稱:firststorm
storm-client 依賴包信息,添加到項目的pom.xml文件中。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
<version>2.0.0</version>
</dependency>
maven會自動下載相關依賴并放到Maven Dependencies下,這些jar包可以點擊下拉查看,并且會自動添加到項目classpath中,作為編譯使用,等jar包全部下載完畢,現在開始編寫具體的計算邏輯了,在這個項目中我們把所有的類都建立在包com.rickie.bigdata.firststorm下。
storm提供了兩種運行模式:本地模式(Local Mode)和分布式模式。本地模式針對開發調試storm topologies非常有用。
因為多數程序開發者都是使用windows系統進行程序開發,如果在本機不安裝storm環境的情況下,如何在本地開發、調試storm程序呢?你可以參考本文提供的解決方案。
如下來自Storm 官方文檔:
http://storm.apache.org/releases/2.0.0-SNAPSHOT/Local-mode.html
Local Mode
Local mode simulates a Storm cluster in process and is useful for developing and testing topologies. Running topologies in local mode is similar to running topologies on a cluster.
To run a topology in local mode you have two options. The most common option is to run your topology with storm local instead of storm jar.
This will bring up a local simulated cluster and force all interactions with nimbus to go through the simulated cluster instead of going to a separate process.
If you want to do some automated testing but without actually launching a storm cluster you can use the same classes internally that storm local does.
To do this you first need to pull in the dependencies needed to access these classes. For the java API you should depend on storm-server as a test dependency.
To create an in-process cluster, simply use the LocalCluster class.
如上文所述,使用本地模式(Local Mode),需要先引入storm-server 依賴包。
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-server</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
引入的storm-server 依賴包。
在本地模式上運行topology類似在一個集群上運行topology。
創建一個本地集群,大致代碼如下所示:
完整代碼可以參考下面。
(1)首先建立RandomSpout類作為數據源,并且繼承于父類BaseRichSpout,確定后可以看到系統自動補全3個方法:nextTuple,open和declareOutputFields。
我們現在就需要重寫這3個方法,open方法是數據源的初始化,nextTuple的作用是把Tuple發送至下游,declareOutputFields用來定義輸出字段,下面我們手動分配一個數組,并且隨機取里面的元素,代碼如下:
package com.rickie.bigdata;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class RandomSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static String[] words = {"Rickie", "Hadoop", "MapReduce", "Storm", "Spark", "Spark Streaming", "Flink"};
@Override
public void open(Map<String, Object> map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
String word = words[new Random().nextInt(words.length)];
collector.emit(new Values(word));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomString"));
}
}
(2)然后新建一個類SenqueceBolt,繼承于BaseBasicBolt類,并且重寫方法execute和declareOutputFields。
這個類就是用于執行具體的作業,準確的說是execute方法用來執行相關的計算,這里只是簡單的輸出,代碼如下:
package com.rickie.bigdata;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class SequenceBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word = (String) tuple.getValue(0);
String out = "Hello " + word +"!";
System.out.println(out);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
(3)最后建立一個類FirstStorm。
這個類是主類,在main方法中定義Topology,并且綜合設置Spout和Bolt,從而調用其中的方法,這里流式計算時間設置為30s,代碼如下:
package com.rickie.bigdata;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
public class FirstStorm
{
public static void main( String[] args ) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSpout());
builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
String name = "firststorm";
if(args != null && args.length >0){
name = args[0];
conf.setNumWorkers(3);
try {
StormSubmitter.submitTopology(name, conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
try(LocalCluster cluster = new LocalCluster()) {
cluster.submitTopology("firststorm", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("firststorm");
cluster.shutdown();
}
}
System.out.println( "Well done!" );
}
}
可以用本地模式運行,在IDEA中直接運行即可,方便開發調試。
在Console可以看到如下輸出信息:
接下來我們將這個項目放到Storm服務器集群中運行。
storm jar 命令用于啟動一個Topology。
(1)以本地模式(Local Mode)運行
storm local firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm
運行結果和之前IDEA本地模式運行輸出類似。
(2)以分布式模式運行
storm jar firststorm-1.0-SNAPSHOT.jar com.rickie.bigdata.FirstStorm RickieStorm
可以查看worker日志,看到SequenceBolt 線程輸出的信息。只有kill 這個topology,日志輸出信息才會終止。
在Storm架構中,Topology代表的并不是確定的作業,而是持續的計算過程,在確定的業務邏輯處理框架下,輸入數據源源不斷地進入系統,經過流式處理后以較低的延遲產生輸出。如果不主動結束這個Topology或者關閉Storm集群,那么數據處理的過程就會持續地進行下去。
另外需要注意的是:由于在分布式模式下運行,worker工作在獨立的進程中,因此無法直接在storm jar命令行輸出窗口,看到上述SequenceBolt組件的輸出信息。
(3)storm list 查看正在運行的topologies和它們的狀態
storm list
也可以通過訪問http://192.168.56.103:8080/ storm server,查看并操作正在運行的 topology。
(4)kill正在運行的topology
storm kill [storm name]
運行“storm kill”這個命令,僅僅只是調用Nimbus的Thirft接口去kill掉相對應的Topology。
Nimbus接受到kill命令,會將”kill”事務應用到topology上,修改Topology的狀態為”killed”以及將“remove”事件列入到未來幾秒鐘的計劃中,即未來幾秒后會觸發remove時間。這里的kill實際上停止相關的Worker。
默認kill的等待時間是Topology消息的超時時間,但是可以通過storm kill命令中的-w標志對其進行重寫,設置了以上參數之后,topology會在指定的等待時間停止運行。這樣給了Topology一個機會在shutdown workers之后完成當前沒有處理完成的任務;刪除Topology以及清理zookeeper中的分配信息和靜態信息;清理存儲在本地的心跳dir和jar/configs。
現在,第一個Storm入門項目的開發和測試運行都完畢了,更復雜的流計算邏輯模式也基本相同,主要就是Maven項目中出現了更復雜的模塊和調用,整個運行的流程其實都是差不多的。恭喜你,現在算是步入Storm流式計算的殿堂的大門了。
程鏈接:https://www.itwangzi.cn/4907.html
Storm 實現了低延遲,還做不到高吞吐,也不能在故障發生時準確地處理計算狀態;Spark Streaming通過采用微批處理方法實現了高吞吐和容錯性,但是犧牲了低延遲和實時處理能力,也不能使窗口與自然時間相匹配,并且表現力欠佳。而flink就是目前為止的最佳答案。
我們在選擇一個新的技術框架的時候,首先考慮的是他的應用場景,再牛逼的框架沒有應用場景也是一無是處,當然牛逼的框架大多都是基于某一個或者某一類應用場景而產生,而flink主要應用于以下三個場景:
Flink 主要應用場景有三類:
1.Event-driven Applications【事件驅動】
2.Data Analytics Applications【分析】
3.Data Pipeline Applications【管道式ETL】
上圖包含兩塊:
Traditional transaction Application(傳統事務應用)
Event-driven Applications(事件驅動應用)
Traditional transaction Application執行流程:
比如點擊流Events可以通過Application寫入Transaction DB(數據庫),同時也可以通過Application從Transaction DB將數據讀出,并進行處理,當處理結果達到一個預警值就會觸發一個Action動作,這種方式一般為事后諸葛亮。
Event-driven Applications執行流程:
比如采集的數據Events可以不斷的放入消息隊列,Flink應用會不斷ingest(消費)消息隊列中的數據,Flink 應用內部維護著一段時間的數據(state),隔一段時間會將數據持久化存儲(Persistent sstorage),防止Flink應用死掉。Flink應用每接受一條數據,就會處理一條數據,處理之后就會觸發(trigger)一個動作(Action),同時也可以將處理結果寫入外部消息隊列中,其他Flink應用再消費。
典型的事件驅動類應用:
1.欺詐檢測(Fraud detection)
2.異常檢測(Anomaly detection)
3.基于規則的告警(Rule-based alerting)
4.業務流程監控(Business process monitoring)
5.Web應用程序(社交網絡)
Data Analytics Applications包含Batch analytics(批處理分析)和Streaming analytics(流處理分析)。
Batch analytics可以理解為周期性查詢:
比如Flink應用凌晨從Recorded Events中讀取昨天的數據,然后做周期查詢運算,最后將數據寫入Database或者HDFS,或者直接將數據生成報表供公司上層領導決策使用。
Streaming analytics可以理解為連續性查詢:
比如實時展示雙十一天貓銷售GMV,用戶下單數據需要實時寫入消息隊列,Flink 應用源源不斷讀取數據做實時計算,然后不斷的將數據更新至Database或者K-VStore,最后做大屏實時展示。
Data Pipeline Applications包含Periodic (周期性)ETL和Data Pipeline(管道)
Periodic ETL:比如每天凌晨周期性的啟動一個Flink ETL Job,讀取傳統數據庫中的數據,然后做ETL,最后寫入數據庫和文件系統。
Data Pipeline:比如啟動一個Flink 實時應用,數據源(比如數據庫、Kafka)中的數據不斷的通過Flink Data Pipeline流入或者追加到數據倉庫(數據庫或者文件系統),或者Kafka消息隊列。
阿里在Flink的應用主要包含四個模塊:實時監控、實時報表、流數據分析和實時倉庫。
實時監控:
用戶行為預警、app crash 預警、服務器攻擊預警
對用戶行為或者相關事件進行實時監測和分析,基于風控規則進行預警
實時報表:
雙11、雙12等活動直播大屏
對外數據產品:生意參謀等
數據化運營
流數據分析:
實時計算相關指標反饋及時調整決策
內容投放、無線智能推送、實時個性化推薦等
實時倉庫:
數據實時清洗、歸并、結構化
數倉的補充和優化
欺詐檢測
背景:
假設你是一個電商公司,經常搞運營活動,但收效甚微,經過細致排查,發現原來是羊毛黨在薅平臺的羊毛,把補給用戶的補貼都薅走了,錢花了不少,效果卻沒達到。怎么辦呢?
你可以做一個實時的異常檢測系統,監控用戶的高危行為,及時發現高危行為并采取措施,降低損失。
系統流程:
1.用戶的行為經由app 上報或web日志記錄下來,發送到一個消息隊列里去;
2.然后流計算訂閱消息隊列,過濾出感興趣的行為,比如:購買、領券、瀏覽等;
3.流計算把這個行為特征化;
4.流計算通過UDF調用外部一個風險模型,判斷這次行為是否有問題(單次行為);
5.流計算里通過CEP功能,跨多條記錄分析用戶行為(比如用戶先做了a,又做了b,又做了3次c),整體識別是否有風險;
6.綜合風險模型和CEP的結果,產出預警信息。
《大數據和人工智能交流》頭條號向廣大初學者新增C 、Java 、Python 、Scala、javascript 等目前流行的計算機、大數據編程語言,希望大家以后關注本頭條號更多的內容。
一、搭建zookeeper集群
注意:為了大家學習的方便,這里在一臺機器上搭建zookeeper集群,在一個機器搭建集群和在多臺機器搭建集群原理是相同的。搭建單節點(一臺主機放3個服務【1個leader,2個flower】)zk集群
在/home建立zk1、zk2、zk3三個目錄,3個目錄搭建過程類似。
(一)搭建過程
【1】搭建節點1
(1)在zk1建立zkdata ,在zkdata下存放myid文件
[root@node1 zkdata]# more myid
1
(2) tar -zxvf zookeeper-3.4.5.tar.gz
[root@node1 zk1]# ls
zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz
(3)修改配置文件
[root@node1 zk1]# cd zookeeper-3.4.5
[root@node1 zookeeper-3.4.5]# cd conf
[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vi zoo.cfg
注意:修改的配置信息
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
dataDir=/home/zk1/zkdata
server.1=node1:2888:3888
server.2=node1:2889:3889
server.3=node1:2890:3890
【2】搭建節點2
(1)在zk1建立zkdata ,在zkdata下存放myid文件
[root@node1 zkdata]# more myid
2
(2) tar -zxvf zookeeper-3.4.5.tar.gz
[root@node1 zk1]# ls
zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz
(3)修改配置文件
[root@node1 zk1]# cd zookeeper-3.4.5
[root@node1 zookeeper-3.4.5]# cd conf
[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vi zoo.cfg
注意:修改的配置信息
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2182
dataDir=/home/zk2/zkdata
server.1=node1:2888:3888
server.2=node1:2889:3889
server.3=node1:2890:3890
【3】搭建節點3
(1)在zk1建立zkdata ,在zkdata下存放myid文件
[root@node1 zkdata]# more myid
3
(2) tar -zxvf zookeeper-3.4.5.tar.gz
[root@node1 zk1]# ls
zkdata zookeeper-3.4.5 zookeeper-3.4.5.tar.gz
(3)修改配置文件
[root@node1 zk1]# cd zookeeper-3.4.5
[root@node1 zookeeper-3.4.5]# cd conf
[root@node1 zookeeper-3.4.5]# mv zoo_sample.cfg zoo.cfg
[root@node1 conf]# vi zoo.cfg
注意:修改的配置信息
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2183
dataDir=/home/zk3/zkdata
server.1=node1:2888:3888
server.2=node1:2889:3889
server.3=node1:2890:3890
(二)、啟動集群
1、啟動各個
[root@node1 bin]# ./zkServer.sh start
JMX enabled by default
Using config: /home/zk1/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... ./zkServer.sh: line 103: [: /tmp/zookeeper: binary operator expected
STARTED
[root@node1 bin]#
2、在zk1、zk2、zk3各個目錄查看節點狀態
[root@node1 bin]# ./zkServer.sh status
JMX enabled by default
Using config: /home/zk1/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: follower
二、搭建storm集群
(一)、準備工作
1、zk必須正常,3個節點聯網通信正常。我這里是3個節點:
192.168.100.1是主節點
192.168.100.2是從節點
192.168.100.3是從節點
將包上傳-->解壓 -->改名-->這里改為storm
[root@node1 storm001]# ls
apache-storm-0.9.3.tar.gz storm
2、修改配置文件
[root@node1 storm001]# cd storm
[root@node1 storm]# ls
bin conf examples lib logback NOTICE README.markdown SECURITY.md
CHANGELOG.md DISCLAIMER external LICENSE logs public RELEASE storm.jar
[root@node1 storm]# cd conf
[root@node1 conf]# ls
storm_env.ini storm.yaml
[root@node1 conf]# vi storm.yaml
storm.zookeeper.servers:
- "192.168.100.1"
- "192.168.100.1"
- "192.168.100.1"
nimbus.host: "192.168.100.1"
storm.local.dir: "/usr/local/storm/tmp"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
解釋:
(1)storm.local.dir:存儲目錄
(2)nimbus.host: "192.168.100.1"代表選擇主機
(3)supervisor.slots.ports:對于每個supervisor機器,通過這項配置可以決定運行多少個worker進程在
這臺機器上,每個worker使用一個單獨的port來接受消息。supervisor并不會立即啟動這個4個worker進程,
當接收到分配的任務的時候會啟動,具體啟動多少個worker根據我們Topology在這個supervisor需要幾個worker
來確定
3、把storm目錄拷貝到其它2個節點
[root@node1 storm001]# ls
apache-storm-0.9.3.tar.gz storm
scp -r storm node2:/home/storm001
scp -r storm node3:/home/storm001
(二)、啟動集群
1、啟動主節點nimbus
[root@node1 bin]# ls
storm storm.cmd storm-config.cmd
[root@node1 bin]# ./storm nimbus
可以使用linux命令將其放到后臺./storm nimbus >/dev/null 2>&1 &
在主節點通過web前端訪問的進程
./storm ui
./storm ui >/dev/null 2>&1 &
2、啟動從節點node2
./storm supervisor
./storm supervisor >/dev/null 2>&1 &
從節點需要啟動日志進程
./storm logviewer
3、啟動從節點node3
./storm supervisor
./storm supervisor >/dev/null 2>&1 &
從節點需要啟動日志進程
./storm logviewer
4、進入storm的ui界面:
http://192.168.100.1:8080/index.html
5、將java開發的storm應用打包上傳服務器
bin/storm jar storm.jar com.test.A001
《大數據和人工智能交流》的宗旨
1、將大數據和人工智能的專業數學:概率數理統計、線性代數、決策論、優化論、博弈論等數學模型變得通俗易懂。
2、將大數據和人工智能的專業涉及到的數據結構和算法:分類、聚類 、回歸算法、概率等算法變得通俗易懂。
3、最新的高科技動態:數據采集方面的智能傳感器技術;醫療大數據智能決策分析;物聯網智慧城市等等。
根據初學者需要會有C語言、Java語言、Python語言、Scala函數式等目前主流計算機語言。
根據讀者的需要有和人工智能相關的計算機科學與技術、電子技術、芯片技術等基礎學科通俗易懂的文章。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。