一、简介
环境:
1、JDK 1.8
2、SpringBoot
3、华为Kafka集群 1.1.0
Kafka集群用户名密码的连接集群简单,Kerberos认证集群的网上几乎没有,这里我把对接的重要点分享一下。
二、集成
1、能到手上的资料
Kafka集群一般不是我们安装的,当我们需要对接到Kafka集群的时候,对方会提供给我们的东西有:
- 密钥文件:user.keytab
- principal:xxxxxx
- 加密协议security.protocol: SASL_SSL或SASL_PLAINTEXT
- jaas.conf(如果没有,我们也可以根据以上内容构建一个)
2、动手
2.1、程序配置
通过官网、网上的资料,我们最后的配置文件如下:
spring:
##########################################################################
############# kafka 配置
##########################################################################
kafka:
# kafka实例的broker地址和端口
bootstrap-servers: 100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
# 生产者配置
producer:
# 重试次数,则客户端会将发送失败的记录重新发送
retries: 1
# 16K
batch-size: 16384
# #32M
buffer-memory: 33554432
# 发送确认参数: 0:发送后不管, 1:发送后Partition Leader消息落盘, all: 所有的副本都ok才返回
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 消费者组
group-id: Consumer-test
# 动提交
enable-auto-commit: true
# 偏移量的方式:
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从新产生的该分区下的数据消费
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
jaas:
enabled: true
login-module: com.sun.security.auth.module.Krb5LoginModule
control-flag: required
options:
"useKeyTab": true
"debug": true
"useTicketCache": false
"storeKey": true
"keyTab": "/etc/user.keytab"
"principal": xxxxx
properties:
# 加密协议,目前支持 SASL_SSL、SASL_PLAINTEXT 协议
"security.protocol": SASL_PLAINTEXT
# 域名
"kerberos.domain.name": topinfo.com
# 服务名
"sasl.kerberos.service.name": kafka
关注: jaas 属性的配置,"keyTab" 的路径配置。
2.2、其他配置
我们还需要在指定 java.security.auth.login.config 的配置,
网上说 可以通过 System.setProperty("java.security.auth.login.config", /etc/jaas/root.jaas.conf)
而我是选择在Tomccat的/bin/catalina.sh中添加一下内容:
JAVA_OPTS=" $JAVA_OPTS -Djava.security.auth.login.config=/etc/jaas/root.jaas.conf"
3、异常排查
错误信息: Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
错误详情:
2022-08-30 21:20:52.052 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO (org.apache.kafka.common.network.Selector:?) -
[Consumer clientId=collect-Consumer-3, groupId=collect-Consumer] Failed authentication with topinfo/11.11.11.20
(An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)])
occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly.
You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTHENTICATION_FAILED state.)
解决办法:
更换kafka-clients,使用华为依赖仓库里的kafka-clients,下载kafka-clients,我们下载kafka-clients-2.4.0-hw-ei-311006.jar版本的,亲测可用。
华为仓库地址: https://repo.huaweicloud.com/...
我这里给出我的maven:
<!-- spring kafka 排查依赖的kafka而引入华为的kafak包 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.4.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 华为 组件 kafka start -->
<dependency>
<groupId>com.huawei</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-311006.jar</systemPath>
</dependency>
<dependency>
<groupId>com.huawei</groupId>
<artifactId>kafka</artifactId>
<version>2.11</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kafka_2.11-1.1.0.jar</systemPath>
</dependency>
<dependency>
<groupId>com.huawei</groupId>
<artifactId>kafka-streams-examples</artifactId>
<version>1.1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kafka-streams-examples-1.1.0.jar</systemPath>
</dependency>
<dependency>
<groupId>com.huawei</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/kafka-streams-1.1.0.jar</systemPath>
</dependency>
<!-- 华为 组件 kafka end -->
参考资料: https://www.baojieearth.cn/po...