spring 消费kafka
消费者配置:
@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaConsumerConfig {
@Value("${kafka.zdry.consumer.autoStartup}")
private Boolean autoStartup;
@Value("${kafka.zdry.consumer.servers}")
private String servers;
@Value("${kafka.zdry.consumer.topic}")
private String topic;
@Value("${kafka.zdry.consumer.group.id}")
private String groupId;
@Value("${kafka.zdry.consumer.enable.auto.commit}")
private String enableAutoCommit;
@Value("${kafka.zdry.consumer.auto.commit.interval.ms}")
private String autoCommitIntervalMs;
@Value("${kafka.zdry.consumer.session.timeout.ms}")
private String sessionTimeoutMs;
@Value("${kafka.zdry.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.zdry.consumer.max.poll.records}")
private String maxPollRecords;
@Value("${kafka.zdry.consumer.concurrency}")
private Integer concurrency;
/**
* 消费者批量工厂 人员轨迹
*/
@Bean("zdry_person_track")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);
factory.setAutoStartup(autoStartup);
return factory;
}
/**
* 消费者工厂
*/
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* 消费者配置
*/
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
}
消费监听:
@KafkaListener(topics = "${kafka.zdry.consumer.topic}", containerFactory = "zdry_person_track")
public void consumeToJson(List<ConsumerRecord<String, String>> records){
int count = records.size();
log.info("count={}",count);
JSONArray arr = new JSONArray();
for (ConsumerRecord<String, String> record : records) {
JSONObject obj = JSON.parseObject(record.value());
arr.add(obj);
}
}
推送消费到kafka
生产者kafka配置:
@Configuration
@EnableKafka
@ConditionalOnResource(resources = "/special-run.txt")
public class ZdryKafkaProducerConfig {
@Value("${kafka.zdry.consumer.servers}")
private String servers;
@Bean("zdryProducerTemplate")
public KafkaTemplate<String, List<Object>> createTemplate(){
Map<String, Object> pros = producerProps();
ProducerFactory<String, List<Object>> pf = new DefaultKafkaProducerFactory<String, List<Object>>(pros);
KafkaTemplate<String, List<Object>> template = new KafkaTemplate<>(pf);
return template;
}
public Map<String, Object> producerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
产生消费并推送到kafka
@Autowired
private KafkaTemplate zdryProducerTemplate;
@Value("${kafka.zdry.consumer.push.topic}")
private String pushTopic;
方法:
try{
zdryProducerTemplate.send(pushTopic,JSON.toJSONString(warn));
}catch(Exception e){
e.printStackTrace();
}
zdryProducerTemplate.flush();
yml 配置
kafka:
## 重点人员抓拍轨迹
zdry:
consumer:
autoStartup: true
servers: x.x.x.x:6667
topic: person_track
group.id: aa_1
enable.auto.commit: true
auto.commit.interval.ms: 100
session.timeout.ms: 10000
auto.offset.reset: earliest
max.poll.records: 100
concurrency: 1
push.topic: ai_warn
相关文章
暂无评论...