整合營銷服務商

          電腦端+手機端+微信端=數據同步管理

          免費咨詢熱線:

          Spring 源碼-BeanFactoryPostProcessor怎么執行的(6)

          bstractApplicationContext提供的postProcessBeanFactory空方法

          postProcessBeanFactory這個方法沒名字跟BeanFactoryPostProcessor接口中的方法一樣,但是他的功能是提供給子類進行添加一些額外的功能,比如添加BeanPostProcessor接口的實現,或者定制一些其他的功能也是可以的,因為這個方法你可以拿到BeanFactory,自然是可以對他進行一些功能的定制的。

          這里看下Spring 提供的子類GenericWebApplicationContext是如何實現的:

          @Override
          protected void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
            if (this.servletContext != null) {
              beanFactory.addBeanPostProcessor(new ServletContextAwareProcessor(this.servletContext));
              beanFactory.ignoreDependencyInterface(ServletContextAware.class);
            }
            WebApplicationContextUtils.registerWebApplicationScopes(beanFactory, this.servletContext);
            WebApplicationContextUtils.registerEnvironmentBeans(beanFactory, this.servletContext);
          }
          

          這里他注冊了一個ServletContextAwreProcessor 到beanFactory中,ServletContexAwareProcessor是一個BeanPostProcessor接口的子類。

          重頭戲BeanFactoryPostProcessor

          接下來分析AbstractApplicationContext#refresh中的invokeBeanFactoryPostProcessors方法,這個方法用來注冊和執行BeanFactoryPostProcessor的。

          直接上源碼:

          protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
            // 執行所有的BeanFactoryPostProcessor
            PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());
          
            // Detect a LoadTimeWeaver and prepare for weaving, if found in the meantime
            // (e.g. through an @Bean method registered by ConfigurationClassPostProcessor)
            // aop的處理
            if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
              beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
              beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
            }
          }
          

          重點在這里:

          PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());
          

          首先獲取BeanFactoryPostProcessor的集合,這里獲取到都是用戶在定制BeanFactory時add加入進去的,進入這個方法:

          public static void invokeBeanFactoryPostProcessors(
            ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {
          
            // Invoke BeanDefinitionRegistryPostProcessors first, if any.
            // 已經處理的Bean
            Set<String> processedBeans = new HashSet<>();
            // 先進性外部BFPP的處理,并且判斷當前Factory是否是BeanDefinitionRegistry
            if (beanFactory instanceof BeanDefinitionRegistry) {
              BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
              // 保存BFPP的Bean
              List<BeanFactoryPostProcessor> regularPostProcessors = new ArrayList<>();
              // 保存BDRPP的Bean
              List<BeanDefinitionRegistryPostProcessor> registryProcessors = new ArrayList<>();
              // 開始處理外部傳入的BFPP
              for (BeanFactoryPostProcessor postProcessor : beanFactoryPostProcessors) {
                // 先處理BDRPP
                if (postProcessor instanceof BeanDefinitionRegistryPostProcessor) {
                  BeanDefinitionRegistryPostProcessor registryProcessor =
                    (BeanDefinitionRegistryPostProcessor) postProcessor;
                  // 直接調用BDRPP的接口方法,后面的postProcessBeanFactory 方法后面統一處理
                  registryProcessor.postProcessBeanDefinitionRegistry(registry);
                  // 加入到BFPP的集合中
                  registryProcessors.add(registryProcessor);
                }
                else {
                  // 加入到BDRPP的集合中
                  regularPostProcessors.add(postProcessor);
                }
              }
          
              // Do not initialize FactoryBeans here: We need to leave all regular beans
              // uninitialized to let the bean factory post-processors apply to them!
              // Separate between BeanDefinitionRegistryPostProcessors that implement
              // PriorityOrdered, Ordered, and the rest.
              // 保存當前的BDRPP
              List<BeanDefinitionRegistryPostProcessor> currentRegistryProcessors = new ArrayList<>();
          
              // First, invoke the BeanDefinitionRegistryPostProcessors that implement PriorityOrdered.
              // 按類型獲取BeanName
              String[] postProcessorNames =
                beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
              for (String ppName : postProcessorNames) {
                // 判斷當前的beanName是都是實現了PriorityOrdered
                if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
                  // 加入到當前注冊的BDRPP集合中
                  currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
                  // 加入到已經處理的bean集合中
                  processedBeans.add(ppName);
                }
              }
              // 對當前的BDRPP進行排序
              sortPostProcessors(currentRegistryProcessors, beanFactory);
              // 將當前的BDRPP全部加入到最前面定義的BDRPP的集合中
              registryProcessors.addAll(currentRegistryProcessors);
              // 執行當前的BDRPP的postProcessBeanDefinitionRegistry方法
              invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
              // 清空當前的BDRPP
              currentRegistryProcessors.clear();
          
              // Next, invoke the BeanDefinitionRegistryPostProcessors that implement Ordered.
              // 再次獲取bdrpp,因為上面的執行可能還會加入新的bdrpp進來
              postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
              for (String ppName : postProcessorNames) {
                // 判斷是否已經處理過,并且是否實現了Ordered接口
                if (!processedBeans.contains(ppName) && beanFactory.isTypeMatch(ppName, Ordered.class)) {
                  // 加入到當前的BDRPP的集合中
                  currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
                  // 添加到已經處理的集合中
                  processedBeans.add(ppName);
                }
              }
              // 排序
              sortPostProcessors(currentRegistryProcessors, beanFactory);
              // 加入到BDRPP集合中
              registryProcessors.addAll(currentRegistryProcessors);
              // 執行bdrpp的postProcessBeanDefinitionRegistry方法
              invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
              // 清空當前bdrpp集合
              currentRegistryProcessors.clear();
          
              // Finally, invoke all other BeanDefinitionRegistryPostProcessors until no further ones appear.
              boolean reiterate = true;
              // 循環去獲取BDRPP,然后進行排序、執行操作,直到所有的BDRPP全部執行完
              while (reiterate) {
                reiterate = false;
                // 獲取BDRPP
                postProcessorNames = beanFactory.getBeanNamesForType(BeanDefinitionRegistryPostProcessor.class, true, false);
                for (String ppName : postProcessorNames) {
                  // 如果已經處理過,就執行BDRPP,并且退出循環,否則繼續循環
                  if (!processedBeans.contains(ppName)) {
                    currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
                    processedBeans.add(ppName);
                    reiterate = true;
                  }
                }
                // 排序
                sortPostProcessors(currentRegistryProcessors, beanFactory);
                // 加入到BDRPP集合中
                registryProcessors.addAll(currentRegistryProcessors);
                // 執行bdrpp
                invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
                currentRegistryProcessors.clear();
              }
          
              // Now, invoke the postProcessBeanFactory callback of all processors handled so far.
              // 執行bdrpp 中的postProcessBeanFactory方法
              invokeBeanFactoryPostProcessors(registryProcessors, beanFactory);
              // 執行bfpp 中的postProcessBeanFactory方法
              invokeBeanFactoryPostProcessors(regularPostProcessors, beanFactory);
            }
          
            else {
              // 如果不是bdrpp,那么直接執行bfpp的postProcessBeanFactory
              // Invoke factory processors registered with the context instance.
              invokeBeanFactoryPostProcessors(beanFactoryPostProcessors, beanFactory);
            }
          
            // Do not initialize FactoryBeans here: We need to leave all regular beans
            // uninitialized to let the bean factory post-processors apply to them!
            // 獲取BFPP的beanName集合
            String[] postProcessorNames =
              beanFactory.getBeanNamesForType(BeanFactoryPostProcessor.class, true, false);
          
            // Separate between BeanFactoryPostProcessors that implement PriorityOrdered,
            // Ordered, and the rest.
            // 定義實現了PriorityOrdered的BFPP
            List<BeanFactoryPostProcessor> priorityOrderedPostProcessors = new ArrayList<>();
            // 定義實現了Ordered接口的集合
            //		List<String> orderedPostProcessorNames = new ArrayList<>();
            List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>();
            // 定義沒有排序的集合
            //		List<String> nonOrderedPostProcessorNames = new ArrayList<>();
            List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>();
            for (String ppName : postProcessorNames) {
              // 如果已經處理過了就不做處理
              if (processedBeans.contains(ppName)) {
                // skip - already processed in first phase above
              }
              else if (beanFactory.isTypeMatch(ppName, PriorityOrdered.class)) {
                priorityOrderedPostProcessors.add(beanFactory.getBean(ppName, BeanFactoryPostProcessor.class));
              }
              else if (beanFactory.isTypeMatch(ppName, Ordered.class)) {
                //				orderedPostProcessorNames.add(ppName);
                orderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
              }
              else {
                //				nonOrderedPostProcessorNames.add(ppName);
                nonOrderedPostProcessors.add(beanFactory.getBean(ppName,BeanFactoryPostProcessor.class));
              }
            }
          
            // First, invoke the BeanFactoryPostProcessors that implement PriorityOrdered.
            // 排序
            sortPostProcessors(priorityOrderedPostProcessors, beanFactory);
            // 先執行PriorityOrdered接口的bfpp
            invokeBeanFactoryPostProcessors(priorityOrderedPostProcessors, beanFactory);
          
            // Next, invoke the BeanFactoryPostProcessors that implement Ordered.
            // 這里將上面獲取到Ordered接口的BFPP進行集合轉換,然后排序,然后執行,這里其實可以直接合并,
            // 在上述進行獲取時就放在這個orderedPostProcessors集合中
            //		List<BeanFactoryPostProcessor> orderedPostProcessors = new ArrayList<>(orderedPostProcessorNames.size());
            //		for (String postProcessorName : orderedPostProcessorNames) {
            //			orderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
            //		}
            sortPostProcessors(orderedPostProcessors, beanFactory);
            invokeBeanFactoryPostProcessors(orderedPostProcessors, beanFactory);
          
            // Finally, invoke all other BeanFactoryPostProcessors.
            // 處理沒有排序的
            //		List<BeanFactoryPostProcessor> nonOrderedPostProcessors = new ArrayList<>(nonOrderedPostProcessorNames.size());
            //		for (String postProcessorName : nonOrderedPostProcessorNames) {
            //			nonOrderedPostProcessors.add(beanFactory.getBean(postProcessorName, BeanFactoryPostProcessor.class));
            //		}
            invokeBeanFactoryPostProcessors(nonOrderedPostProcessors, beanFactory);
          
            // Clear cached merged bean definitions since the post-processors might have
            // modified the original metadata, e.g. replacing placeholders in values...
            // 清除緩存的元數據,因為經過BFPP的執行,可能BeanDefinition的屬性值已經個變化,比如使用占位符的屬性值
            beanFactory.clearMetadataCache();
          }
          

          這個方法大概很長,實際上就做了一下這么幾點事情:

          • 先執行外部傳入的BeanFactoryPostProcessor的實現
          • 處理時先處理BeanFactoryPostProcessor的子接口BeanDefinitionRegistryPostProcessor的實現
          • 處理BeanDefinitionRegistryPostProcessor實現的時候先處理實現了PriorityOrdered接口的實現
          • 處理完PriorityOrdered接口實現的類之后再處理實現了Ordered接口的實現
          • 處理完Ordered接口的實現類之后處理沒有排序的
          • 處理完BeanDefinitionRegistryPostProcessor的實現之后處理BeanFactoryPostProcessor的實現
          • 處理順序也是PriorityOreded,Ordered,沒有排序的

          這里大概邏輯就是這個,看起來可能不是很懂,畫個流程圖:

          通過流程圖可以簡化為:先遍歷執行外部傳入的BFPP,再執行BDRPP,再執行BFPP三部分,處理每一部分可能會進行排序操作,排序按照PriorityOrdered,Ordered,noSort進行排序再執行。

          這里解釋下BeanDefinitionRegistryPostProcessor,這個接口是BeanFactoryPostProcessor,它里面包含一個方法叫postProcessBeanDefinitionRegistry,這個方法非常重要,在實現類ConfigurationClassPostProcessor中就是使用這個方法進行注解的解析的,而且這個類也是實現SpringBoot自動裝配的關鍵。

          ConfigurationClassPostProcessor這個類是什么時候加入到Spring容器的呢?

          在我們啟動容器的時候,Spring會進行BeanDefinition的掃描,如果我們在xml配置文件中開啟了注解掃描:

          <context:component-scan base-package="com.redwinter.test"/>
          

          那么這個時候就會自動添加多個BeanDefinition到Spring容器中,beanName為org.springframework.context.annotation.internalConfigurationAnnotationProcessor,其他還有幾個:

          前面的文章 https://www.cnblogs.com/redwinter/p/16165878.html 講到自定義標簽,在spring解析xml時分為默認的命名空間和自定義的命名空間的,而context就是自定義的命名空間,這個標簽的解析器為ComponentScanBeanDefinitionParser,這個類中的parse方法就是解析邏輯處理:

          @Override
          @Nullable
          public BeanDefinition parse(Element element, ParserContext parserContext) {
            String basePackage = element.getAttribute(BASE_PACKAGE_ATTRIBUTE);
            basePackage = parserContext.getReaderContext().getEnvironment().resolvePlaceholders(basePackage);
            String[] basePackages = StringUtils.tokenizeToStringArray(basePackage,
                                                                      ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
            // Actually scan for bean definitions and register them.
            // 配置掃描器
            ClassPathBeanDefinitionScanner scanner = configureScanner(parserContext, element);
            // 掃描BeanDefinition,在指定的包下
            Set<BeanDefinitionHolder> beanDefinitions = scanner.doScan(basePackages);
            // 注冊組件
            registerComponents(parserContext.getReaderContext(), beanDefinitions, element);
          
            return null;
          }
          

          這個方法執行流程:

          • 創建一個配置掃描器
          • 掃描指定包下標有注解的類并解析為BeanDefinition
          • 執行registerComponents方法,注冊組件

          registerComponents方法里面就是添加ConfigurationClassPostProcessor的地方,由于代碼太多這里只貼部分代碼:

          // ...省略部分代碼
          Set<BeanDefinitionHolder> beanDefs = new LinkedHashSet<>(8);
          		// 判斷注冊器中個是否包含org.springframework.context.annotation.internalConfigurationAnnotationProcessor
          		// 不包含就加入一個ConfigurationClassPostProcessor的BeanDefinition
          		// 用于解析注解
          		if (!registry.containsBeanDefinition(CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME)) {
          			// 創建一個BeanDefinition為ConfigurationClassPostProcessor
          			RootBeanDefinition def = new RootBeanDefinition(ConfigurationClassPostProcessor.class);
          			def.setSource(source);
          			// 注冊一個beanName為org.springframework.context.annotation.internalConfigurationAnnotationProcessor
          			// 的BeanDefinition,class為ConfigurationClassPostProcessor
          			beanDefs.add(registerPostProcessor(registry, def, CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME));
          		}
          		// 創建一個AutowiredAnnotationBeanPostProcessor的BeanDefinition
          		// 用于自動裝配
          		if (!registry.containsBeanDefinition(AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME)) {
          			RootBeanDefinition def = new RootBeanDefinition(AutowiredAnnotationBeanPostProcessor.class);
          			def.setSource(source);
          			beanDefs.add(registerPostProcessor(registry, def, AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME));
          		}
          // ...省略部分代碼
          

          源碼中注冊了一個beanName為CONFIGURATION_ANNOTATION_PROCESSOR_BEAN_NAME常量的名字,這個常量就是org.springframework.context.annotation.internalConfigurationAnnotationProcessor,class為ConfigurationClassPostProcessor

          那注解的解析是如何進行解析的呢?由于篇幅過長,下一篇再來解析。


          如果本文對你有幫助,別忘記給我個3連 ,點贊,轉發,評論,,咱們下期見。

          收藏 等于白嫖,點贊才是真情。





          原文 https://www.cnblogs.com/redwinter/p/16196359.html

          做過微信或支付寶支付的童鞋,可能遇到過這種問題,就是填寫支付結果回調,就是在支付成功之后,支付寶要根據我們給的地址給我們進行通知,通知我們用戶是否支付成功,如果成功我們就要去處理下面相應的業務邏輯,如果在測試服務,那么這個回調地址我們就需要填寫測試服務的,如果發布到線上那么我們就需要改成線上的地址。

          針對上面的場景,我們一般都會通過如下的方式,進行一個動態配置,不需要每次去改,防止出現問題。

          public class PayTest {
          
              @Value("${spring.profiles.active}")
              private String environment;
          
              public Object notify(HttpServletRequest request) {
          
                  if ("prod".equals(environment)) {
                      // 正式環境
                  } else if ("test".equals(environment)) {
          
                      // 測試環境
                  }
                  return "SUCCESS";
              }
          }
          復制代碼

          上面的代碼看起來沒有一點問題,但是身為搬磚的我們咋可能這樣搬,姿勢不對呀!

          問題:

          擴展性太差,如果這個參數我們還需要在別的地方用到,那么我們是不是還要使用@Value的注解獲取一遍,假如有天我們的leader突然說嗎,test這個單詞看著太low了,換個高端一點的,換成dev,那么我們是不是要把項目中所有的test都要改過來,如果少還好,要是很多,那我們怕不是涼了。

          所以我們能不能將這些配置參數搞成一個全局的靜態變量,這樣的話我們直接飲用就好了,哪怕到時候真的要改,那我也只需要改動一處就好了。

          注意大坑

          有的朋友可能就比較自信了,那我直接加個static修飾下不就好了,如果你真是打算這樣做,那你就準備卷好鋪蓋走人吧。直接加static獲取到的值其實是一個null,至于原因,大家復習下類以及靜態變量變量的加載順序。

          @PostConstruct注解

          那么既然說出了問題,肯定就有解決方法,不然你以為我跟你玩呢。

          首先這個注解是由Java提供的,它用來修飾一個非靜態的void方法。它會在服務器加載Servlet的時候運行,并且只運行一次

          改造:

          @Component
          public class SystemConstant {
          
              public static String surroundings;
          
              @Value("${spring.profiles.active}")
              public String environment;
          
              @PostConstruct
              public void initialize() {
                  System.out.println("初始化環境...");
                  surroundings = this.environment;
              }
          }
          復制代碼

          結果:

          我們可以看到在項目啟動的時候進行了初始化


          到這里我們已經可以拿到當前運行的環境是測試還是正式,這樣就可以做到動態配置


          最后想說

          其實這個注解遠不止這點用處,像我之前寫的Redis工具類,我使用的是RedisTemplate操作Redis,導致寫出來的方法沒辦法用static修飾,每次使用Redis工具類只能先注入到容器然后再調用,使用了這個注解就可以完美的解決這種尷尬的問題。代碼如下。

          - 簡介

          這個系列是我學習Flink之后,想到加強一下我的FLink能力,所以開始一系列的自己擬定業務場景,進行開發。

          這里更類似筆記,而不是教學,所以不會特別細致,敬請諒解。

          這里是實戰的,具體一些環境,代碼基礎知識不會講解,例如docker,flink語法之類的,看情況做具體講解,所以需要一些技術門檻。

          2 - 準備

          • flink - 1.12.0
          • elasticsearch - 7.12
          • kafka - 2.12-2.5.0
          • kibana - 7.12
          • filebeat - 7.12

          這里就不做下載地址的分享了,大家自行下載吧。

          3 - 代碼

          Flink代碼

          maven pom依賴,別問為啥這么多依賴,問我就說不知道,你就復制吧。

          <?xml version="1.0" encoding="UTF-8"?>
          <project xmlns="http://maven.apache.org/POM/4.0.0"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
              <modelVersion>4.0.0</modelVersion>
          
              <groupId>com.iolo</groupId>
              <artifactId>flink_study</artifactId>
              <version>1.0.0</version>
              <!-- 指定倉庫位置,依次為aliyun、apache和cloudera倉庫 -->
              <repositories>
                  <repository>
                      <id>aliyun</id>
                      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                  </repository>
                  <repository>
                      <id>apache</id>
                      <url>https://repository.apache.org/content/repositories/snapshots/</url>
                  </repository>
                  <repository>
                      <id>cloudera</id>
                      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
                  </repository>
              </repositories>
          
              <properties>
                  <encoding>UTF-8</encoding>
                  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                  <maven.compiler.source>1.8</maven.compiler.source>
                  <maven.compiler.target>1.8</maven.compiler.target>
                  <java.version>1.8</java.version>
                  <scala.version>2.12</scala.version>
                  <flink.version>1.12.0</flink.version>
              </properties>
              <dependencies>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-clients_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-scala_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-java</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-streaming-scala_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-streaming-java_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <!-- blink執行計劃,1.11+默認的-->
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-planner-blink_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-common</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <!-- flink連接器-->
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-kafka_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-sql-connector-kafka_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-jdbc_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-csv</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-json</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.bahir</groupId>
                      <artifactId>flink-connector-redis_2.11</artifactId>
                      <version>1.0</version>
                      <exclusions>
                          <exclusion>
                              <artifactId>flink-streaming-java_2.11</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                          <exclusion>
                              <artifactId>flink-runtime_2.11</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                          <exclusion>
                              <artifactId>flink-core</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                          <exclusion>
                              <artifactId>flink-java</artifactId>
                              <groupId>org.apache.flink</groupId>
                          </exclusion>
                      </exclusions>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-hive_2.12</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-shaded-hadoop-2-uber</artifactId>
                      <version>2.7.5-10.0</version>
                  </dependency>
          
                  <dependency>
                      <groupId>mysql</groupId>
                      <artifactId>mysql-connector-java</artifactId>
                      <version>5.1.38</version>
                  </dependency>
          
                  <!-- 日志 -->
                  <dependency>
                      <groupId>org.slf4j</groupId>
                      <artifactId>slf4j-log4j12</artifactId>
                      <version>1.7.7</version>
                      <scope>runtime</scope>
                  </dependency>
                  <dependency>
                      <groupId>log4j</groupId>
                      <artifactId>log4j</artifactId>
                      <version>1.2.17</version>
                      <scope>runtime</scope>
                  </dependency>
          
                  <dependency>
                      <groupId>com.alibaba</groupId>
                      <artifactId>fastjson</artifactId>
                      <version>1.2.44</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.projectlombok</groupId>
                      <artifactId>lombok</artifactId>
                      <version>1.18.2</version>
                      <scope>provided</scope>
                  </dependency>
          
                  <dependency>
                      <groupId>com.squareup.okhttp3</groupId>
                      <artifactId>okhttp</artifactId>
                      <version>4.9.1</version>
                  </dependency>
          
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
          
              </dependencies>
          
              <build>
                  <sourceDirectory>src/main/java</sourceDirectory>
                  <plugins>
                      <!-- 編譯插件 -->
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-compiler-plugin</artifactId>
                          <version>3.5.1</version>
                          <configuration>
                              <source>1.8</source>
                              <target>1.8</target>
                              <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                          </configuration>
                      </plugin>
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-surefire-plugin</artifactId>
                          <version>2.18.1</version>
                          <configuration>
                              <useFile>false</useFile>
                              <disableXmlReport>true</disableXmlReport>
                              <includes>
                                  <include>**/*Test.*</include>
                                  <include>**/*Suite.*</include>
                              </includes>
                          </configuration>
                      </plugin>
                      <!-- 打包插件(會包含所有依賴) -->
                      <plugin>
                          <groupId>org.apache.maven.plugins</groupId>
                          <artifactId>maven-shade-plugin</artifactId>
                          <version>2.3</version>
                          <executions>
                              <execution>
                                  <phase>package</phase>
                                  <goals>
                                      <goal>shade</goal>
                                  </goals>
                                  <configuration>
                                      <filters>
                                          <filter>
                                              <artifact>*:*</artifact>
                                              <excludes>
                                                  <!--
                                                  zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                                  <exclude>META-INF/*.SF</exclude>
                                                  <exclude>META-INF/*.DSA</exclude>
                                                  <exclude>META-INF/*.RSA</exclude>
                                              </excludes>
                                          </filter>
                                      </filters>
                                      <transformers>
                                          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                              <!-- 設置jar包的入口類(可選) -->
                                              <mainClass></mainClass>
                                          </transformer>
                                      </transformers>
                                  </configuration>
                              </execution>
                          </executions>
                      </plugin>
                  </plugins>
              </build>
          
          </project>

          下面是flink的,具體講解都在代碼里

          package com.iolo.flink.cases;
          
          import com.alibaba.fastjson.JSONObject;
          import com.iolo.common.util.DingDingUtil;
          import org.apache.flink.api.common.RuntimeExecutionMode;
          import org.apache.flink.api.common.functions.FlatMapFunction;
          import org.apache.flink.api.common.functions.RuntimeContext;
          import org.apache.flink.api.common.serialization.SimpleStringSchema;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.shaded.hadoop2.com.google.gson.Gson;
          import org.apache.flink.streaming.api.datastream.DataStreamSource;
          import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
          import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
          import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
          import org.apache.flink.util.Collector;
          import org.apache.http.HttpHost;
          import org.elasticsearch.action.index.IndexRequest;
          import org.elasticsearch.client.Requests;
          
          import java.util.ArrayList;
          import java.util.HashMap;
          import java.util.List;
          import java.util.Map;
          import java.util.Properties;
          
          /**
           * @author iolo
           * @date 2021/3/17
           * 監控日志實時報警
           * <p>
           * 準備環境
           * 1 - flink 1.12
           * 2 - kafka
           * 3 - filebeat
           * 4 - Springboot服務(可以產生各類級別日志的接口)
           * 5 - es+kibana
           * <p>
           * filebeat 監控Springboot服務日志 提交給kafka(主題sever_log_to_flink_consumer)
           * flink消費kafka主題日志 ,整理收集,如果遇到error日志發送郵件,或者發釘釘(這里可以調用Springboot服務,或者直接flink發送)
           * 然后將所有日志存入es 進行 kibana分析
           **/
          public class case_1_kafka_es_log {
              public static void main(String[] args) throws Exception {
                  // TODO env
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
          
                  Properties props = new Properties();
                  //集群地址
                  props.setProperty("bootstrap.servers", "127.0.0.1:9092");
                  //消費者組id
                  props.setProperty("group.id", "test-consumer-group");
                  //latest有offset記錄從記錄位置開始消費,沒有記錄從最新的/最后的消息開始消費
                  //earliest有offset記錄從記錄位置開始消費,沒有記錄從最早的/最開始的消息開始消費
                  props.setProperty("auto.offset.reset", "latest");
                  //會開啟一個后臺線程每隔5s檢測一下Kafka的分區情況,實現動態分區檢測
                  props.setProperty("flink.partition-discovery.interval-millis", "5000");
                  //自動提交(提交到默認主題,后續學習了Checkpoint后隨著Checkpoint存儲在Checkpoint和默認主題中)
                  props.setProperty("enable.auto.commit", "true");
                  //自動提交的時間間隔
                  props.setProperty("auto.commit.interval.ms", "2000");
          
                  // TODO source
                  FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("sever_log_to_flink_consumer", new SimpleStringSchema(), props);
          
                  DataStreamSource<String> ds = env.addSource(source);
          
                  // TODO transformation
                  SingleOutputStreamOperator<Tuple3<String, String, String>> result = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, String>>() {
                      @Override
                      public void flatMap(String s, Collector<Tuple3<String, String, String>> collector) throws Exception {
                          JSONObject json = (JSONObject) JSONObject.parse(s);
                          String timestamp = json.getString("@timestamp");
                          String message = json.getString("message");
                          String[] split = message.split(" ");
                          String level = split[3];
                          if ("[ERROR]".equalsIgnoreCase(level)) {
                              System.out.println("error!");
                              DingDingUtil.dingdingPost("error");
                          }
          
                          collector.collect(Tuple3.of(timestamp, level, message));
                      }
                  });
          
                  // TODO sink
                  result.print();
          
                  /**
                   * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/elasticsearch.html
                   */
                  List<HttpHost> httpHosts = new ArrayList<>();
                  httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
                  ElasticsearchSink.Builder<Tuple3<String, String, String>> esSinkBuilder = new ElasticsearchSink.Builder<>(
                          httpHosts,
                          new ElasticsearchSinkFunction<Tuple3<String, String, String>>() {
                              @Override
                              public void process(Tuple3<String, String, String> stringStringStringTuple3, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                                  Map<String, String> json = new HashMap<>();
                                  json.put("@timestamp", stringStringStringTuple3.f0);
                                  json.put("level", stringStringStringTuple3.f1);
                                  json.put("message", stringStringStringTuple3.f2);
                                  IndexRequest item = Requests.indexRequest()
                                          .index("my-log")
                                          .source(json);
                                  requestIndexer.add(item);
                              }
                          });
                  esSinkBuilder.setBulkFlushMaxActions(1);
                  result.addSink(esSinkBuilder.build());
                  // TODO execute
                  env.execute("case_1_kafka_es_log");
              }
          }

          其中為了告警通知,做了個釘釘自定義機器人通知,需要的可以去百度查看一下,很方便。

          https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2

          package com.iolo.common.util;
          
          import lombok.extern.slf4j.Slf4j;
          import okhttp3.MediaType;
          import okhttp3.OkHttpClient;
          import okhttp3.Request;
          import okhttp3.RequestBody;
          import okhttp3.Response;
          
          import java.io.IOException;
          
          /**
           * @author iolo
           * @date 2021/3/30
           * https://developers.dingtalk.com/document/app/custom-robot-access/title-jfe-yo9-jl2
           **/
          @Slf4j
          public class DingDingUtil {
              private static final String url = "https://oapi.dingtalk.com/robot/send?access_token=你自己的token替換";
          
              /**
               * 秘鑰token
               *
               * @param
               * @return java.lang.String
               * @author fengxinxin
               * @date 2021/3/30 下午5:03
               **/
              public static void dingdingPost(String text) throws Exception {
                  MediaType JSON = MediaType.parse("application/json");
                  OkHttpClient client = new OkHttpClient();
                  String json = "{\"msgtype\": \"text\",\"text\": {\"content\": \"FlinkLog:" + text + "\"}}";
                  RequestBody body = RequestBody.create(JSON, json);
                  Request request = new Request.Builder()
                          .url(url)
                          .post(body)
                          .build();
                  try (Response response = client.newCall(request).execute()) {
                      String responseBody = response.body().string();
                      log.info(responseBody);
          
                  } catch (IOException e) {
                      log.error(e.getMessage());
                  }
              }
          }

          然后可以直接在控制面板直接啟動這個main方法


          Springboot

          gitee地址直接下載,不做詳細講解

          接口地址 http://127.0.0.1:8080/test/log?level=error&count=10


          Kafka

          操作命令,這些命令都是在Kafka里的bin目錄下,Zookeeper是kafka自帶的那個

          # Zookeeper 啟動命令
          ./zookeeper-server-start.sh ../config/zookeeper.properties
          # Kafka 啟動命令
          ./kafka-server-start.sh ../config/server.properties
          # 創建 topic sever_log_to_flink_consumer
          ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sever_log_to_flink_consumer
          # 查看是否創建成功
          ./kafka-topics.sh --list --zookeeper localhost:2181
          # 這是生產者
          ./kafka-console-producer.sh --broker-list localhost:9092 --topic sever_log_to_flink_consumer
          # 這是消費者
          ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sever_log_to_flink_consumer --from-beginning

          Elasticsearch

          這里開始使用docker,具體環境可以自行搭建,并且以后docker的場景會越來越多,直接上命令。

          docker run \
          --name fxx-es \
          -p 9200:9200 \
          -p 9300:9300 \
          -v /Users/iOLO/dev/docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
          -e "discovery.type=single-node" \
          docker.elastic.co/elasticsearch/elasticsearch:7.12.0

          驗證


          Kibana

          docker run \
          --name fxx-kibana \
          --link fxx-es:elasticsearch \
          -p 5601:5601 \
          docker.elastic.co/kibana/kibana:7.12.0

          我這里去容器內部設置中文,你可以不做

          設置的方法,在配置文件kibana.yml增加i18n.locale: "zh-CN"

          驗證 地址 是 127.0.0.1:5601

          具體操作的時候進行圖文講解

          Filebeat

          下載地址 https://www.elastic.co/cn/downloads/beats/filebeat

          選擇自己電腦環境進行下載,我是MAC

          解壓之后修改配置文件里,直接上配置文件

          # ============================== Filebeat inputs ===============================
          
          filebeat.inputs:
          
          # Each - is an input. Most options can be set at the input level, so
          # you can use different inputs for various configurations.
          # Below are the input specific configurations.
          
          - type: log
          
            # Change to true to enable this input configuration. 這里是需要修改
            enabled: true
          
            # Paths that should be crawled and fetched. Glob based paths. 這里改成你本地下載那個Springboot的log文件地址
            paths:
              - /Users/iOLO/dev/Java/flinklog/logs/flink-log.*.log
          
          # ------------------------------ Kafka Output -------------------------------
          output.kafka:
            # initial brokers for reading cluster metadata kafka的連接地址,這是直接從官網粘貼過來的,
            # https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
            hosts: ["127.0.01:9092"]
          
            # message topic selection + partitioning 然后就是消費topic,其他都是官網的默認值,我就沒做改動
            topic: 'sever_log_to_flink_consumer'
            partition.round_robin:
              reachable_only: false
          
            required_acks: 1
            compression: gzip
            max_message_bytes: 1000000


          4 - 實戰

          環境和程序都準備好了之后,別忘了啟動Springboot服務

          然后通過請求接口服務 127.0.0.1:8080/test/log?level=error&count=10 來產生日志

          通過查看釘釘 看是否有報警信息


          釘釘成功!!!

          然后就是Kibana的操作

          直接上結果頁面

          然后就是操作步驟

          第一先去es選擇index

          第二步根據紅框點擊進去es查詢index頁面

          最后在輸入框里查詢你剛才的index ,咱們的代碼就是my-index,根據提示進行下一步,我這里已經創建過了,所以就不再演示。最后就可以會有之前頁面的效果。

          5 - 結束語

          整體就是這樣,很多人肯定會提出質疑,說直接filebeat+ELK 也能完成這類效果,好的,你別杠,我這是學習flink之后,然后自己出業務場景,進行flink的實戰總結,如果你有更好的方案,就干你的。

          然后如果大家有啥想要的,遇到的場景,都可以提出來,我會斟酌后進行采納進行實戰實施。

          最后感謝閱讀。


          主站蜘蛛池模板: 精品国产亚洲一区二区在线观看| 国产AⅤ精品一区二区三区久久| 免费精品一区二区三区第35| 国产精品乱码一区二区三| 精品国产一区二区三区久| 水蜜桃av无码一区二区| 久久人妻内射无码一区三区| 国产在线精品一区二区三区不卡| 国内精品视频一区二区八戒| 日韩人妻精品一区二区三区视频| 亚洲国产成人精品久久久国产成人一区二区三区综 | 一区二区三区在线免费观看视频| 亚洲av无码一区二区三区乱子伦| 亚洲一区二区三区在线播放 | 亚洲午夜精品一区二区公牛电影院| 国产精品熟女一区二区| 无码人妻一区二区三区免费看 | 精品国产乱子伦一区二区三区| 中文字幕一区二区三区永久 | 文中字幕一区二区三区视频播放| 亚洲第一区香蕉_国产a| 中文字幕一区二区三区精彩视频| 日韩精品中文字幕无码一区| 亚洲国产美女福利直播秀一区二区| 亚洲一区二区三区成人网站| 无码精品一区二区三区| 一区二区三区亚洲视频| 亚洲AV无码一区二区乱子伦| 中文字幕一区二区三区精彩视频 | 最新中文字幕一区二区乱码| 国产精品熟女视频一区二区| 日韩在线一区视频| 国产一区二区三区内射高清| 多人伦精品一区二区三区视频| 国产精品乱码一区二区三| 国产精品久久久久久一区二区三区| 亚洲一区二区无码偷拍| 国产suv精品一区二区6| 国产成人一区二区三区电影网站| 免费萌白酱国产一区二区| 波多野结衣免费一区视频|