背景:近期项目需要,引入flink,研究了下flink,步步踩坑终于可以单独运行,也可发布到集群运行,记录下踩坑点。开发环境:idea+springboot(2.3.5.RELEASSE)+kafka(2.8.1)+mysql(8.0.26)。废话不多说,直接上可执行代码。
以下代码实现了某个时间间隔,设备不上传数据,判断为离线的逻辑
一、项目application创建
/**
* flink任务提交application
*
* @author wangfenglei
*/
@SpringBootApplication(scanBasePackages = {"com.wfl.firefighting.flink","com.wfl.firefighting.util"})
public class DataAnalysisFlinkApplication {
public static void main(String[] args) {
SpringApplication.run(DataAnalysisFlinkApplication.class, args);
}
}
二、设备状态计算主体,从kafka接收数据,然后通过KeyedProcessFunction函数进行计算,然后把离线设备输出到mysql sink,更新设备状态
/**
* 从kafka读取数据,计算设备状态为离线后写入mysql
*
* @author wangfenglei
*/
@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class DeviceDataKafkaSource {
private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
private String kafkaServer;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String loginConfig;
@Value("${customer.flink.cal-device-status-topic}")
private String topic;
@Autowired
private ApplicationContext applicationContext;
/**
* 执行方法
*
* @throws Exception 异常
*/
@PostConstruct
public void execute() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setParallelism(1);
Properties properties = new Properties();
//kafka的节点的IP或者hostName,多个使用逗号分隔
properties.setProperty("bootstrap.servers", kafkaServer);
//kafka的消费者的group.id
properties.setProperty("group.id", "data-nanlysis-flink-devicestatus");
//设置kafka安全认证机制为PLAIN
properties.setProperty("sasl.mechanism", "PLAIN");
//设置kafka安全认证协议为SASL_PLAINTEXT
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
//设置kafka登录验证用户名和密码
properties.setProperty("sasl.jaas.config", loginConfig);
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);
stream.print().setParallelism(1);
DataStream<String> deviceStatus = stream
//进行转换只获取设备序列码
.map(data -> CommonConstant.GSON.fromJson(data, MsgData.class).getDevSn())
//按照设备序列码分组
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
})
//进行计算,判断周期内是否有新数据上传,没有则输出认为设备离线
.process((CalDeviceOfflineFunction) applicationContext.getBean("calDeviceOfflineFunction"));
//写入数据库
deviceStatus.addSink((SinkFunction) applicationContext.getBean("deviceStatusSink"));
//启动任务
new Thread(() -> {
try {
env.execute("deviceStatusFlinkJob");
} catch (Exception e) {
log.error(e.toString(), e);
}
}).start();
}
}
说明:
1、通过@ConditionalOnProperty开关形式控制程序是否执行,后续此模块可以开发多个flink执行任务,通过开关的形式提交需要的job
2、通过springboot的@PostConstruct注解,让项目application启动时,自动执行job
3、用Thread线程执行任务提交,否则application启动时会一直flink执行中
4、日志打印,需要使用slf4j,跟flink自己依赖jar包打印日志保持一致,如此在flink集群执行时可以打印日志
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
5、kafka连接开启了登录验证,配置见application.yml。kafka登录验证server端配置见官网资料,后续有时间写个文章记录下
三、设备离线计算
/**
* KeyedProcessFunction 为每个设备序列码维护一个state,并且会把间隔时间内(事件时间)内没有更新的设备序列码输出:
* 对于每条记录,CalDeviceOfflineFunction 修改最后的时间戳。
* 该函数还会在间隔时间内调用回调(事件时间)。
* 每次调用回调时,都会检查存储的最后修改时间与回调的事件时间时间戳,如果匹配则发送设备序列码(即在间隔时间内没有更新,表示没有设备数据上传)
*
* @author wangfenglei
*/
@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class CalDeviceOfflineFunction extends KeyedProcessFunction<String, String, String> {
private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
/**
* 这个状态是通过 ProcessFunction维护
*/
private ValueState<DeviceLastDataTimestamp> deviceState;
/**
* 定时任务执行时间
*/
private ValueState<Long> timerState;
@Autowired
private DeviceService deviceService;
@Override
public void open(Configuration parameters) throws Exception {
deviceState = getRuntimeContext().getState(new ValueStateDescriptor<>("deviceState", DeviceLastDataTimestamp.class));
timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class));
}
/**
* 每条数据执行过程
*
* @param value 输入数据
* @param ctx 环境
* @param out 输出数据
* @throws Exception 异常
*/
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
log.info("++++++++++++++fink recevice deviceSn={}", value);
// 查看当前计数
DeviceLastDataTimestamp current = deviceState.value();
if (current == null) {
current = new DeviceLastDataTimestamp();
current.key = value;
current.lastDataTime = ctx.timestamp();
}
Long currentTimerState = timerState.value();
if (null == currentTimerState) {
//初始值设置为-1
timerState.update(-1L);
}
if (-1 != timerState.value()) {
//删除原先定时任务,然后重新注册新的定时任务
ctx.timerService().deleteProcessingTimeTimer(timerState.value());
}
long interval = deviceService.getDeviceOfflineInterval(value);
// 设置状态的时间戳为记录的事件时间时间戳
current.lastDataTime = ctx.timestamp();
//设置判断离线时间间隔
current.interval = interval;
// 状态回写
deviceState.update(current);
//更新定时任务执行时间
timerState.update(current.lastDataTime + interval);
//注册新的定时任务
ctx.timerService().registerProcessingTimeTimer(current.lastDataTime + interval);
}
/**
* 定时器触发后执行的方法
*
* @param timestamp 这个时间戳代表的是该定时器的触发时间
* @param ctx 定时器环境类
* @param out 输出
* @throws Exception 异常
*/
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<String> out) throws Exception {
// 取得该设备状态的State状态
DeviceLastDataTimestamp result = deviceState.value();
// timestamp是定时器触发时间,如果等于最后一次更新时间+离线间隔时间,就表示这十秒内没有收到过该设备报文了
if (timestamp == result.lastDataTime + result.interval) {
// 发送
out.collect(result.key);
// 打印数据,用于核对是否符合预期
log.info("==================" + result.key + " is offline");
}
}
/**
* 设备最后上传数据时间戳数据类
*/
class DeviceLastDataTimestamp {
public String key;
public long lastDataTime;
public long interval;
}
}
四、 更新设备离线状态
/**
* 向mysql写入数据
*
* @author wangfenglei
*/
@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class DeviceStatusSink extends RichSinkFunction<String> {
private static final Logger log = LoggerFactory.getLogger(DeviceStatusSink.class);
@Value("${spring.datasource.dynamic.datasource.master.url}")
private String datasoureUrl;
@Value("${spring.datasource.dynamic.datasource.master.username}")
private String userName;
@Value("${spring.datasource.dynamic.datasource.master.password}")
private String password;
@Value("${spring.datasource.dynamic.datasource.master.driver-class-name}")
private String driverClass;
private Connection conn = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
//加载驱动,开启连接
try {
Class.forName(driverClass);
conn = DriverManager.getConnection(datasoureUrl, userName, password);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void invoke(String deviceSn, Context context) {
try {
String sql = "update biz_device t set t.status=2 where t.dev_sn=?";
ps = conn.prepareStatement(sql);
ps.setString(1, deviceSn);
ps.executeUpdate();
log.info("update biz_device t set t.status=2 where t.dev_sn={}", deviceSn);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 结束任务,关闭连接
*
* @throws Exception
*/
@Override
public void close() throws Exception {
if (conn != null) {
conn.close();
}
if (ps != null) {
ps.close();
}
}
}
五、application.yml配置
server:
port: 8099
spring:
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
druid:
stat-view-servlet:
enabled: true
loginUsername: admin
loginPassword: 123456
allow:
web-stat-filter:
enabled: true
dynamic:
druid: # 全局druid参数,绝大部分值和默认保持一致。(现已支持的参数如下,不清楚含义不要乱设置)
# 连接池的配置信息
# 初始化大小,最小,最大
initial-size: 5
min-idle: 5
maxActive: 20
# 配置获取连接等待超时的时间
maxWait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一个连接在池中最小生存的时间,单位是毫秒
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
# 打开PSCache,并且指定每个连接上PSCache的大小
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
filters: stat,wall,slf4j
# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
datasource:
master:
url: jdbc:mysql://127.0.0.1:3306/fire?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定kafka 代理地址,可以多个
producer: # 生产者
retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#修改最大向kafka推送消息大小
properties:
max.request.size: 52428800
consumer:
group-id: data-analysis-flink
#手动提交offset保证数据一定被消费
enable-auto-commit: false
#指定从最近地方开始消费(earliest)
auto-offset-reset: latest
#消费者组
#group-id: dev
properties:
#服务端没有收到心跳超时时间,设置长点以防调试时超时
session:
timeout:
ms: 60000
heartbeat:
interval:
ms: 30000
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="root" password="root";'
#自定义配置
customer:
#flink相关配置
flink:
#是否开启设置状态计算
cal-device-status: true
cal-device-status-topic: device-upload-data
六、pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.wfl.firefighting</groupId>
<artifactId>data-analysis</artifactId>
<version>1.0.0</version>
</parent>
<groupId>com.wfl.firefighting</groupId>
<artifactId>data-analysis-flink</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.wfl.firefighting</groupId>
<artifactId>data-analysis-service</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.wfl.firefighting</groupId>
<artifactId>data-analysis-model</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
<version>10.10.1</version>
</dependency>
<!-- druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
<!-- 动态数据源 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>2.5.4</version>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>
<!-- flink依赖引入 开始-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<!-- flink连接kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<!-- flink连接es-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.1</version>
</dependency>
<!-- flink连接mysql-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- flink依赖引入 结束-->
</dependencies>
<build>
<finalName>data-analysis-flink</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
说明:
1、如果使用local执行方式,不需要提交到flink服务端执行job,可以使用spring-boot-maven-plugin,直接java -jar执行即可,如下:
<build> <finalName>data-analysis-flink</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <!-- 指定启动入口 --> <configuration> <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass> </configuration> <executions> <execution> <goals> <!--可以把依赖的包都打包到生成的Jar包中--> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
使用spring-boot-maven-plugin打的jar包,提交到flink集群端执行,会报错,提示找不到类,因为springboot默认打包BOOT-INF目录,flink服务端执行会提示找不到类。使用maven-shade-plugin打包,既可以用java -jar执行,也可以提交到flink服务端执行。
2、maven-shade-plugin打的jar包,如果提交到服务端执行,需要去掉springboot默认集成的logback,否则服务端执行报错,提示Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath,如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>
如果本地执行java -jar形式,需要在build的中注释掉以下内容,否则启动报错提示:java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
<!--<exclude>org.slf4j:*</exclude>--> <!--<exclude>log4j:*</exclude>-->
3、使用maven-shade-plugin打包,必须添加如下,否则提示Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ManifestResourceTransformer
<transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> <resource>reference.conf</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass> </transformer> </transformers>
七、执行效果:
1、本地执行
2、提交到flink集群执行
八、其他踩坑点
1、报错提示:The RemoteEnvironment cannot be instantiated when running in a pre-defined context
解决方法:将StreamExecutionEnvironment修改为getExecutionEnvironment,获取当前执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、报错提示: Insufficient number of network buffers: required 65, but only 38 available. The total number of network buffers is currently set to 2048 of 32768 bytes each.
解决办法:env.setParallelism(1)
env.setParallelism(1);
3、报错提示: Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'
解决办法:在 flink 配置文件里 flink-conf.yaml设置
classloader.check-leaked-classloader: false