flink整合spring boot
2021-08-07 本文已影响0人
山间草夫
Flink框架:Flink整合springboot
首先说一下, 为什么flink 需要集成flink, spring boot给我们带来了更好的框架整合, 同时使用spring的DI和IOC,能更好的使用bean,当然直接使用spring 整合也是一样。
实现原理
实现原理, spring 的启动 一般使用 AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(AppConfig.class);
即可启动spring 容器, 对么spring boot 呢, 看过源码的人或许知道
SpringApplication.run(arge);
只需要在启动flink之前启动sping boot 即可。
代码
flink 整合spring boot 以及redission, 并将事件的id放入redis 中, 代码库 https://gitee.com/imomoda/flink-sprint-boot
-
spring boot 启动工具类
@SpringBootApplication(scanBasePackages = {"io.github.jeesk.flink"}) @Import(SpringUtil.class) @Slf4j @EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class}) public class SpringBootApplicationUtil { static SpringApplication springBootApplication = null; static SpringApplicationBuilder springApplicationBuilder = null; public static synchronized void run(String[] arge) { if (springBootApplication == null) { StandardEnvironment standardEnvironment = new StandardEnvironment(); MutablePropertySources propertySources = standardEnvironment.getPropertySources(); propertySources.addFirst(new SimpleCommandLinePropertySource(arge)); String startJarPath = SpringBootApplicationUtil.class.getResource("/").getPath().split("!")[0]; String[] activeProfiles = standardEnvironment.getActiveProfiles(); propertySources.addLast(new MapPropertySource("systemProperties", standardEnvironment.getSystemProperties())); propertySources.addLast(new SystemEnvironmentPropertySource("systemEnvironment", standardEnvironment.getSystemEnvironment())); if (springBootApplication == null) { springApplicationBuilder = new SpringApplicationBuilder(SpringBootApplicationUtil.class); // 这里可以通过命令行传入 springApplicationBuilder.profiles("dev"); springApplicationBuilder.sources(SpringBootApplicationUtil.class).web(WebApplicationType.NONE); } springBootApplication = springApplicationBuilder.build(); springBootApplication.run(arge); } } }
-
flink job
package io.github.jeesk.flink; import cn.hutool.extra.spring.SpringUtil; import io.github.jeesk.flink.config.SpringBootApplicationUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.walkthrough.common.entity.Alert; import org.apache.flink.walkthrough.common.entity.Transaction; import org.apache.flink.walkthrough.common.sink.AlertSink; import org.apache.flink.walkthrough.common.source.TransactionSource; import org.springframework.data.redis.core.StringRedisTemplate; public class FraudDetectionJob { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); if (args != null) { configuration.setString("args", String.join(" ", args)); } SpringBootApplicationUtil.run(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions"); DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) .process(new FraudDetector()) .name("fraud-detector"); alerts .addSink(new AlertSink()) .name("send-alerts"); env.execute("Fraud Detection"); } static public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> { private StringRedisTemplate redisTemplate = null; @Override public void open(Configuration parameters) throws Exception { // 初始化bean super.open(parameters); SpringBootApplicationUtil.run(parameters.getString("arge", "").split(" ")); redisTemplate = SpringUtil.getBean(StringRedisTemplate.class); } @Override public void processElement( Transaction transaction, Context context, Collector<Alert> collector) throws Exception { Alert alert = new Alert(); alert.setId(transaction.getAccountId()); // 将id 放入redis 中 redisTemplate.opsForSet().add("tmpKey", String.valueOf(alert.getId())); collector.collect(alert); } } }
-
flink 使用logback 还是log4j, 本demo 使用的是Logback , 需要做以下的处理
- 服务器端处理: flink 的安装目录下面放入logback 的包,
log4j-over-slf4j-1.7.15.jar,logback-classic-1.2.3.jar,logback-core-1.2.3.jar
, - 然后删除lib下面关于log4j的包
log4j-1.2.17.jar及slf4j-log4j12-1.7.15.jar
), 如果不懂这些包的作用可以仔细阅读: JAVA 常见日志依赖处理细节 , - 在代码的pom文件里面排除log4j的包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.1</version> <!--排除log4j--> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.1</version> <!--排除log4j--> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> <!--<scope>provided</scope>--> </dependency>
- 服务器端处理: flink 的安装目录下面放入logback 的包,
-
如果想修改flink 的logback的日志文件 , 可以在flink的conf目录下面修改下面的三个文件
logback-console.xml logback-session.xml logback.xml