postProcessBeanFactory這個方法沒名字跟BeanFactoryPostProcessor接口中的方法一樣,但是他的功能是提供給子類進行添加一些額外的功能,比如添加BeanPostProcessor接口的實現,或者定制一些其他的功能也是可以的,因為這個方法你可以拿到BeanFactory,自然是可以對他進行一些功能的定制的。
這里看下Spring 提供的子類GenericWebApplicationContext是如何實現的:
@Override
protected void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
if (this.servletContext != null) {
beanFactory.addBeanPostProcessor(new ServletContextAwareProcessor(this.servletContext));
beanFactory.ignoreDependencyInterface(ServletContextAware.class);
}
WebApplicationContextUtils.registerWebApplicationScopes(beanFactory, this.servletContext);
WebApplicationContextUtils.registerEnvironmentBeans(beanFactory, this.servletContext);
}
這里他注冊了一個ServletContextAwreProcessor 到beanFactory中,ServletContexAwareProcessor是一個BeanPostProcessor接口的子類。
接下來分析AbstractApplicationContext#refresh中的invokeBeanFactoryPostProcessors方法,這個方法用來注冊和執行BeanFactoryPostProcessor的。
直接上源碼:
protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
// 執行所有的BeanFactoryPostProcessor
PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());
// Detect a LoadTimeWeaver and prepare for weaving, if found in the meantime
// (e.g. through an @Bean method registered by ConfigurationClassPostProcessor)
// aop的處理
if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
}
}
重點在這里:
PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());
首先獲取BeanFactoryPostProcessor的集合,這里獲取到都是用戶在定制BeanFactory時add加入進去的,進入這個方法:
public static void invokeBeanFactoryPostProcessors(
ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {
// Invoke BeanDefinitionRegistryPostProcessors first, if any.
// 已經處理的Bean
Set<String> processedBeans = new HashSet<>();
// 先進性外部BFPP的處理,并且判斷當前Factory是否是BeanDefinitionRegistry
if (beanFactory instanceof BeanDefinitionRegistry) {
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
// 保存BFPP的Bean
List<BeanFactoryPostProcessor> regularPostProcessors = new ArrayList<>();
// 保存BDRPP的Bean
List<BeanDefinitionRegistryPostProcessor> registryProcessors = new ArrayList<>();
// 開始處理外部傳入的BFPP
for (BeanFactoryPostProcessor postProcessor : beanFactoryPostProcessors) {
// 先處理BDRPP
if (postProcessor instanceof BeanDefinitionRegistryPostProcessor) {
BeanDefinitionRegistryPostProcessor registryProcessor =
(BeanDefinitionRegistryPostProcessor) postProcessor;
// 直接調用BDRPP的接口方法,后面的postProcessBeanFactory 方法后面統一處理
registryProcessor.postProcessBeanDefinitionRegistry(registry);
// 加入到BFPP的集合中
registryProcessors.add(registryProcessor);
}
else {
// 加入到BDRPP的集合中
regularPostProcessors.add(postProcessor);
}
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let the bean factory post-processors apply to them!
// Separate between BeanDefinitionRegistryPostProcessors that implement
// PriorityOrdered, Ordered, and the rest.
// 保存當前的BDRPP
List<BeanDefinitionRegistryPostProcessor> currentRegistryProcessors = new ArrayList<>();
// First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered.
// 按類型獲取BeanName
String[] postProcessorNames =
beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
// 判斷當前的beanName是都是實現了PriorityOrdered
if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
// 加入到當前注冊的BDRPP集合中
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
// 加入到已經處理的bean集合中
processedBeans.add(ppName);
}
}
// 對當前的BDRPP進行排序
sortPostProcessors(currentRegistryProcessors, beanFactory);
// 將當前的BDRPP全部加入到最前面定義的BDRPP的集合中
registryProcessors.addAll(currentRegistryProcessors);
// 執行當前的BDRPP的postProcessBeanDefinitionRegistry方法
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
// 清空當前的BDRPP
currentRegistryProcessors.clear();
// Next, invoke the BeanDefinitionRegistryPostProcessors that implement Ordered.
// 再次獲取bdrpp,因為上面的執行可能還會加入新的bdrpp進來
postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
// 判斷是否已經處理過,并且是否實現了Ordered接口
if (!processedBeans.contains(ppName) && beanFactory.isTypeMatch(ppName, Ordered.class)) {
// 加入到當前的BDRPP的集合中
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
// 添加到已經處理的集合中
processedBeans.add(ppName);
}
}
// 排序
sortPostProcessors(currentRegistryProcessors, beanFactory);
// 加入到BDRPP集合中
registryProcessors.addAll(currentRegistryProcessors);
// 執行bdrpp的postProcessBeanDefinitionRegistry方法
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
// 清空當前bdrpp集合
currentRegistryProcessors.clear();
// Finally, invoke all other BeanDefinitionRegistryPostProcessors until no further ones appear.
boolean reiterate = true;
// 循環去獲取BDRPP,然后進行排序、執行操作,直到所有的BDRPP全部執行完
while (reiterate) {
reiterate = false;
// 獲取BDRPP
postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
for (String ppName : postProcessorNames) {
// 如果已經處理過,就執行BDRPP,并且退出循環,否則繼續循環
if (!processedBeans.contains(ppName)) {
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
processedBeans.add(ppName);
reiterate = true;
}
}
// 排序
sortPostProcessors(currentRegistryProcessors, beanFactory);
// 加入到BDRPP集合中
registryProcessors.addAll(currentRegistryProcessors);
// 執行bdrpp
invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
currentRegistryProcessors.clear();
}
// Now, invoke the postProcessBeanFactory callback of all processors handled so far.
// 執行bdrpp 中的postProcessBeanFactory方法
invokeBeanFactoryPostProcessors(registryProcessors, beanFactory);
// 執行bfpp 中的postProcessBeanFactory方法
invokeBeanFactoryPostProcessors(regularPostProcessors, beanFactory);
}
else {
// 如果不是bdrpp,那么直接執行bfpp的postProcessBeanFactory
// Invoke factory processors registered with the context instance.
invokeBeanFactoryPostProcessors(beanFactoryPostProcessors, beanFactory);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let the bean factory post-processors apply to them!
// 獲取BFPP的beanName集合
String[] postProcessorNames =
beanFactory.getBeanNamesForType(BeanFactoryPostProcessor.class, true, false);
// Separate between BeanFactoryPostProcessors that implement PriorityOrdered,
// Ordered, and the rest.
// 定義實現了PriorityOrdered的BFPP
List<BeanFactoryPostProcessor> priorityOrderedPostProcessors = new ArrayList<>();
// 定義實現了Ordered接口的集合
// List<String> orderedPostProcessorNames = new ArrayList<>();
List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>();
// 定義沒有排序的集合
// List<String> nonOrderedPostProcessorNames = new ArrayList<>();
List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>();
for (String ppName : postProcessorNames) {
// 如果已經處理過了就不做處理
if (processedBeans.contains(ppName)) {
// skip - already processed in first phase above
}
else if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
priorityOrderedPostProcessors.add(beanFactory.getBean(ppName, BeanFactoryPostProcessor.class));
}
else if (beanFactory.isTypeMatch(ppName, Ordered.class)) {
// orderedPostProcessorNames.add(ppName);
orderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
}
else {
// nonOrderedPostProcessorNames.add(ppName);
nonOrderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
}
}
// First, invoke the BeanFactoryPostProcessors that implement PriorityOrdered.
// 排序
sortPostProcessors(priorityOrderedPostProcessors, beanFactory);
// 先執行PriorityOrdered接口的bfpp
invokeBeanFactoryPostProcessors(priorityOrderedPostProcessors, beanFactory);
// Next, invoke the BeanFactoryPostProcessors that implement Ordered.
// 這里將上面獲取到Ordered接口的BFPP進行集合轉換,然后排序,然后執行,這里其實可以直接合并,
// 在上述進行獲取時就放在這個orderedPostProcessors集合中
// List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>(orderedPostProcessorNames.size());
// for (String postProcessorName : orderedPostProcessorNames) {
// orderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
// }
sortPostProcessors(orderedPostProcessors, beanFactory);
invokeBeanFactoryPostProcessors(orderedPostProcessors, beanFactory);
// Finally, invoke all other BeanFactoryPostProcessors.
// 處理沒有排序的
// List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>(nonOrderedPostProcessorNames.size());
// for (String postProcessorName : nonOrderedPostProcessorNames) {
// nonOrderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
// }
invokeBeanFactoryPostProcessors(nonOrderedPostProcessors, beanFactory);
// Clear cached merged bean definitions since the post-processors might have
// modified the original metadata, e.g. replacing placeholders in values...
// 清除緩存的元數據,因為經過BFPP的執行,可能BeanDefinition的屬性值已經個變化,比如使用占位符的屬性值
beanFactory.clearMetadataCache();
}
這個方法大概很長,實際上就做了一下這么幾點事情:
這里大概邏輯就是這個,看起來可能不是很懂,畫個流程圖:
通過流程圖可以簡化為:先遍歷執行外部傳入的BFPP,再執行BDRPP,再執行BFPP三部分,處理每一部分可能會進行排序操作,排序按照PriorityOrdered,Ordered,noSort進行排序再執行。
這里解釋下BeanDefinitionRegistryPostProcessor,這個接口是BeanFactoryPostProcessor,它里面包含一個方法叫postProcessBeanDefinitionRegistry,這個方法非常重要,在實現類ConfigurationClassPostProcessor中就是使用這個方法進行注解的解析的,而且這個類也是實現SpringBoot自動裝配的關鍵。
ConfigurationClassPostProcessor這個類是什么時候加入到Spring容器的呢?
在我們啟動容器的時候,Spring會進行BeanDefinition的掃描,如果我們在xml配置文件中開啟了注解掃描:
<context:component-scan base-package="com.redwinter.test"/>
那么這個時候就會自動添加多個BeanDefinition到Spring容器中,beanName為org.springframework.context.annotation.internalConfigurationAnnotationProcessor,其他還有幾個:
前面的文章 https://www.cnblogs.com/redwinter/p/16165878.html 講到自定義標簽,在spring解析xml時分為默認的命名空間和自定義的命名空間的,而context就是自定義的命名空間,這個標簽的解析器為ComponentScanBeanDefinitionParser,這個類中的parse方法就是解析邏輯處理:
@Override
@Nullable
public BeanDefinition parse(Element element, ParserContext parserContext) {
String basePackage = element.getAttribute(BASE_PACKAGE_ATTRIBUTE);
basePackage = parserContext.getReaderContext().getEnvironment().resolvePlaceholders(basePackage);
String[] basePackages = StringUtils.tokenizeToStringArray(basePackage,
ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
// Actually scan for bean definitions and register them.
// 配置掃描器
ClassPathBeanDefinitionScanner scanner = configureScanner(parserContext, element);
// 掃描BeanDefinition,在指定的包下
Set<BeanDefinitionHolder> beanDefinitions = scanner.doScan(basePackages);
// 注冊組件
registerComponents(parserContext.getReaderContext(), beanDefinitions, element);
return null;
}
這個方法執行流程:
registerComponents方法里面就是添加ConfigurationClassPostProcessor的地方,由于代碼太多這里只貼部分代碼:
// ...省略部分代碼
Set<BeanDefinitionHolder> beanDefs = new LinkedHashSet<>(8);
// 判斷注冊器中個是否包含org.springframework.context.annotation.internalConfigurationAnnotationProcessor
// 不包含就加入一個ConfigurationClassPostProcessor的BeanDefinition
// 用于解析注解
if (!registry.containsBeanDefinition(CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME)) {
// 創建一個BeanDefinition為ConfigurationClassPostProcessor
RootBeanDefinition def = new RootBeanDefinition(ConfigurationClassPostProcessor.class);
def.setSource(source);
// 注冊一個beanName為org.springframework.context.annotation.internalConfigurationAnnotationProcessor
// 的BeanDefinition,class為ConfigurationClassPostProcessor
beanDefs.add(registerPostProcessor(registry, def, CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME));
}
// 創建一個AutowiredAnnotationBeanPostProcessor的BeanDefinition
// 用于自動裝配
if (!registry.containsBeanDefinition(AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME)) {
RootBeanDefinition def = new RootBeanDefinition(AutowiredAnnotationBeanPostProcessor.class);
def.setSource(source);
beanDefs.add(registerPostProcessor(registry, def, AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME));
}
// ...省略部分代碼
源碼中注冊了一個beanName為CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME常量的名字,這個常量就是org.springframework.context.annotation.internalConfigurationAnnotationProcessor,class為ConfigurationClassPostProcessor
那注解的解析是如何進行解析的呢?由于篇幅過長,下一篇再來解析。
如果本文對你有幫助,別忘記給我個3連 ,點贊,轉發,評論,,咱們下期見。
收藏 等于白嫖,點贊才是真情。
原文 https://www.cnblogs.com/redwinter/p/16196359.html
做過微信或支付寶支付的童鞋,可能遇到過這種問題,就是填寫支付結果回調,就是在支付成功之后,支付寶要根據我們給的地址給我們進行通知,通知我們用戶是否支付成功,如果成功我們就要去處理下面相應的業務邏輯,如果在測試服務,那么這個回調地址我們就需要填寫測試服務的,如果發布到線上那么我們就需要改成線上的地址。
針對上面的場景,我們一般都會通過如下的方式,進行一個動態配置,不需要每次去改,防止出現問題。
public class PayTest {
@Value("${spring.profiles.active}")
private String environment;
public Object notify(HttpServletRequest request) {
if ("prod".equals(environment)) {
// 正式環境
} else if ("test".equals(environment)) {
// 測試環境
}
return "SUCCESS";
}
}
復制代碼
上面的代碼看起來沒有一點問題,但是身為搬磚的我們咋可能這樣搬,姿勢不對呀!
擴展性太差,如果這個參數我們還需要在別的地方用到,那么我們是不是還要使用@Value的注解獲取一遍,假如有天我們的leader突然說嗎,test這個單詞看著太low了,換個高端一點的,換成dev,那么我們是不是要把項目中所有的test都要改過來,如果少還好,要是很多,那我們怕不是涼了。
所以我們能不能將這些配置參數搞成一個全局的靜態變量,這樣的話我們直接飲用就好了,哪怕到時候真的要改,那我也只需要改動一處就好了。
有的朋友可能就比較自信了,那我直接加個static修飾下不就好了,如果你真是打算這樣做,那你就準備卷好鋪蓋走人吧。直接加static獲取到的值其實是一個null,至于原因,大家復習下類以及靜態變量變量的加載順序。
那么既然說出了問題,肯定就有解決方法,不然你以為我跟你玩呢。
首先這個注解是由Java提供的,它用來修飾一個非靜態的void方法。它會在服務器加載Servlet的時候運行,并且只運行一次。
@Component
public class SystemConstant {
public static String surroundings;
@Value("${spring.profiles.active}")
public String environment;
@PostConstruct
public void initialize() {
System.out.println("初始化環境...");
surroundings = this.environment;
}
}
復制代碼
我們可以看到在項目啟動的時候進行了初始化
到這里我們已經可以拿到當前運行的環境是測試還是正式,這樣就可以做到動態配置
其實這個注解遠不止這點用處,像我之前寫的Redis工具類,我使用的是RedisTemplate操作Redis,導致寫出來的方法沒辦法用static修飾,每次使用Redis工具類只能先注入到容器然后再調用,使用了這個注解就可以完美的解決這種尷尬的問題。代碼如下。
這個系列是我學習Flink之后,想到加強一下我的FLink能力,所以開始一系列的自己擬定業務場景,進行開發。
這里更類似筆記,而不是教學,所以不會特別細致,敬請諒解。
這里是實戰的,具體一些環境,代碼基礎知識不會講解,例如docker,flink語法之類的,看情況做具體講解,所以需要一些技術門檻。
這里就不做下載地址的分享了,大家自行下載吧。
maven pom依賴,別問為啥這么多依賴,問我就說不知道,你就復制吧。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.iolo</groupId>
<artifactId>flink_study</artifactId>
<version>1.0.0</version>
<!-- 指定倉庫位置,依次為aliyun、apache和cloudera倉庫 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.12.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink執行計劃,1.11+默認的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink連接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.11</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 編譯插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(會包含所有依賴) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 設置jar包的入口類(可選) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
下面是flink的,具體講解都在代碼里
package com.iolo.flink.cases;
import com.alibaba.fastjson.JSONObject;
import com.iolo.common.util.DingDingUtil;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.hadoop2.com.google.gson.Gson;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* @author iolo
* @date 2021/3/17
* 監控日志實時報警
* <p>
* 準備環境
* 1 - flink 1.12
* 2 - kafka
* 3 - filebeat
* 4 - Springboot服務(可以產生各類級別日志的接口)
* 5 - es+kibana
* <p>
* filebeat 監控Springboot服務日志 提交給kafka(主題sever_log_to_flink_consumer)
* flink消費kafka主題日志 ,整理收集,如果遇到error日志發送郵件,或者發釘釘(這里可以調用Springboot服務,或者直接flink發送)
* 然后將所有日志存入es 進行 kibana分析
**/
public class case_1_kafka_es_log {
public static void main(String[] args) throws Exception {
// TODO env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
Properties props = new Properties();
//集群地址
props.setProperty("bootstrap.servers", "127.0.0.1:9092");
//消費者組id
props.setProperty("group.id", "test-consumer-group");
//latest有offset記錄從記錄位置開始消費,沒有記錄從最新的/最后的消息開始消費
//earliest有offset記錄從記錄位置開始消費,沒有記錄從最早的/最開始的消息開始消費
props.setProperty("auto.offset.reset", "latest");
//會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況,實現動態分區檢測
props.setProperty("flink.partition-discovery.interval-millis", "5000");
//自動提交(提交到默認主題,后續學習了Checkpoint后隨著Checkpoint存儲在Checkpoint和默認主題中)
props.setProperty("enable.auto.commit", "true");
//自動提交的時間間隔
props.setProperty("auto.commit.interval.ms", "2000");
// TODO source
FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("sever_log_to_flink_consumer", new SimpleStringSchema(), props);
DataStreamSource<String> ds = env.addSource(source);
// TODO transformation
SingleOutputStreamOperator<Tuple3<String, String, String>> result = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, String>>() {
@Override
public void flatMap(String s, Collector<Tuple3<String, String, String>> collector) throws Exception {
JSONObject json = (JSONObject) JSONObject.parse(s);
String timestamp = json.getString("@timestamp");
String message = json.getString("message");
String[] split = message.split(" ");
String level = split[3];
if ("[ERROR]".equalsIgnoreCase(level)) {
System.out.println("error!");
DingDingUtil.dingdingPost("error");
}
collector.collect(Tuple3.of(timestamp, level, message));
}
});
// TODO sink
result.print();
/**
* https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/elasticsearch.html
*/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<Tuple3<String, String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Tuple3<String, String, String>>() {
@Override
public void process(Tuple3<String, String, String> stringStringStringTuple3, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
Map<String, String> json = new HashMap<>();
json.put("@timestamp", stringStringStringTuple3.f0);
json.put("level", stringStringStringTuple3.f1);
json.put("message", stringStringStringTuple3.f2);
IndexRequest item = Requests.indexRequest()
.index("my-log")
.source(json);
requestIndexer.add(item);
}
});
esSinkBuilder.setBulkFlushMaxActions(1);
result.addSink(esSinkBuilder.build());
// TODO execute
env.execute("case_1_kafka_es_log");
}
}
其中為了告警通知,做了個釘釘自定義機器人通知,需要的可以去百度查看一下,很方便。
https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2
package com.iolo.common.util;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import java.io.IOException;
/**
* @author iolo
* @date 2021/3/30
* https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2
**/
@Slf4j
public class DingDingUtil {
private static final String url = "https://oapi.dingtalk.com/robot/send?access_token=你自己的token替換";
/**
* 秘鑰token
*
* @param
* @return java.lang.String
* @author fengxinxin
* @date 2021/3/30 下午5:03
**/
public static void dingdingPost(String text) throws Exception {
MediaType JSON = MediaType.parse("application/json");
OkHttpClient client = new OkHttpClient();
String json = "{\"msgtype\": \"text\",\"text\": {\"content\": \"FlinkLog:" + text + "\"}}";
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
String responseBody = response.body().string();
log.info(responseBody);
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
然后可以直接在控制面板直接啟動這個main方法
gitee地址直接下載,不做詳細講解
接口地址 http://127.0.0.1:8080/test/log?level=error&count=10
操作命令,這些命令都是在Kafka里的bin目錄下,Zookeeper是kafka自帶的那個
# Zookeeper 啟動命令
./zookeeper-server-start.sh ../config/zookeeper.properties
# Kafka 啟動命令
./kafka-server-start.sh ../config/server.properties
# 創建 topic sever_log_to_flink_consumer
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sever_log_to_flink_consumer
# 查看是否創建成功
./kafka-topics.sh --list --zookeeper localhost:2181
# 這是生產者
./kafka-console-producer.sh --broker-list localhost:9092 --topic sever_log_to_flink_consumer
# 這是消費者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sever_log_to_flink_consumer --from-beginning
這里開始使用docker,具體環境可以自行搭建,并且以后docker的場景會越來越多,直接上命令。
docker run \
--name fxx-es \
-p 9200:9200 \
-p 9300:9300 \
-v /Users/iOLO/dev/docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-e "discovery.type=single-node" \
docker.elastic.co/elasticsearch/elasticsearch:7.12.0
驗證
docker run \
--name fxx-kibana \
--link fxx-es:elasticsearch \
-p 5601:5601 \
docker.elastic.co/kibana/kibana:7.12.0
我這里去容器內部設置中文,你可以不做
設置的方法,在配置文件kibana.yml增加i18n.locale: "zh-CN"
驗證 地址 是 127.0.0.1:5601
具體操作的時候進行圖文講解
下載地址 https://www.elastic.co/cn/downloads/beats/filebeat
選擇自己電腦環境進行下載,我是MAC
解壓之后修改配置文件里,直接上配置文件
# ============================== Filebeat inputs ===============================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
- type: log
# Change to true to enable this input configuration. 這里是需要修改
enabled: true
# Paths that should be crawled and fetched. Glob based paths. 這里改成你本地下載那個Springboot的log文件地址
paths:
- /Users/iOLO/dev/Java/flinklog/logs/flink-log.*.log
# ------------------------------ Kafka Output -------------------------------
output.kafka:
# initial brokers for reading cluster metadata kafka的連接地址,這是直接從官網粘貼過來的,
# https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
hosts: ["127.0.01:9092"]
# message topic selection + partitioning 然后就是消費topic,其他都是官網的默認值,我就沒做改動
topic: 'sever_log_to_flink_consumer'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
環境和程序都準備好了之后,別忘了啟動Springboot服務
然后通過請求接口服務 127.0.0.1:8080/test/log?level=error&count=10 來產生日志
通過查看釘釘 看是否有報警信息
釘釘成功!!!
然后就是Kibana的操作
直接上結果頁面
然后就是操作步驟
第一先去es選擇index
第二步根據紅框點擊進去es查詢index頁面
最后在輸入框里查詢你剛才的index ,咱們的代碼就是my-index,根據提示進行下一步,我這里已經創建過了,所以就不再演示。最后就可以會有之前頁面的效果。
整體就是這樣,很多人肯定會提出質疑,說直接filebeat+ELK 也能完成這類效果,好的,你別杠,我這是學習flink之后,然后自己出業務場景,進行flink的實戰總結,如果你有更好的方案,就干你的。
然后如果大家有啥想要的,遇到的場景,都可以提出來,我會斟酌后進行采納進行實戰實施。
最后感謝閱讀。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。