前言
Kafka的基本工作原理

我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:
生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。
1.引入spring-kafka的jar包
在pom.xml里面导入spring-kafka包
4.0.0 org.springframework.boot spring-boot-starter-parent 2.7.4 com.example SpringBootKafka 0.0.1-SNAPSHOT SpringBootKafka SpringBootKafka 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka log4j log4j 1.2.17 org.projectlombok lombok org.apache.kafka kafka-streams central aliyun maven https://maven.aliyun.com/repository/public/ default true false org.springframework.boot spring-boot-maven-plugin
2.编写配置文件
在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者
spring: kafka: bootstrap-servers: 192.168.110.105:9092 #streams: #application-id: my-streams-app consumer: group-id: myGroupId auto-offset-reset: latest enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 5
3.编写生产者
使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果
package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Future;@Slf4j@Servicepublic class KafkaProducer { private final KafkaTemplate kafkaTemplate; private final ObjectMapper objectMapper; @Autowired public KafkaProducer(KafkaTemplate kafkaTemplate, ObjectMapper objectMapper) { this.kafkaTemplate = kafkaTemplate; this.objectMapper = objectMapper; } public void sendMessage(String message) { log.info("KafkaProducer message:{ }", message); //kafkaTemplate.send("test", message).addCallback(); Future> future = kafkaTemplate.send("test", message); CompletableFuture completableFuture = CompletableFuture.runAsync(() -> { try { future.get(); // 等待原始future完成 } catch (Exception e) { throw new RuntimeException(e); } });// 使用whenComplete方法 completableFuture.whenComplete((result, ex) -> { if (ex != null) { System.out.println("Error occurred: " + ex.getMessage()); // 成功发送 } else { System.out.println("Completed successfully"); } }); /*future.whenComplete((result, ex) -> { if (ex == null) { // 成功发送 RecordMetadata metadata = result.getRecordMetadata(); System.out.println("Message sent successfully with offset: " + metadata.offset()); } else { // 发送失败 System.err.println("Failed to send message due to: " + ex.getMessage()); } });*/ } public void sendUser(User user) throws JsonProcessingException { //final ProducerRecord record = createRecord(data); //ListenableFuture> future = kafkaTemplate.send("test", message); //ListenableFuture> future = kafkaTemplate.send("test", user); String userJson = objectMapper.writeValueAsString(user); ListenableFuture> future = kafkaTemplate.send("test", userJson); /*future.addCallback( success -> System.out.println("Message sent successfully: " + userJson), failure -> System.err.println("Failed to send message: " + failure.getMessage()) );*/ CompletableFuture completableFuture = CompletableFuture.runAsync(() -> { try { future.get(); // 等待原始future完成 } catch (Exception e) { throw new RuntimeException(e); } }); completableFuture.whenComplete((result, ex) -> { if (ex != null) { System.out.println("Error occurred: " + ex.getMessage()); // 成功发送 } else { System.out.println("Completed successfully"); } }); }}
4.编写消费者
通过org.springframework.kafka.annotation.KafkaListener来监听消息
package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;import org.apache.log4j.Logger;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;@Slf4j@Servicepublic class KafkaConsumer { @KafkaListener(topics = "test", groupId = "myGroupId") public void consume(String message) { System.out.println("Received message: " + message); log.info("KafkaConsumer message:{ }", message); }}
5.测试消息的生成与发送
package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;import com.example.springbootkafka.service.KafkaProducer;import com.fasterxml.jackson.core.JsonProcessingException;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@Slf4j@RestControllerpublic class MessageController { private final KafkaProducer producer; @Autowired public MessageController(KafkaProducer producer) { this.producer = producer; } @GetMapping("/send-message") public String sendMessage() { log.info("MessageController sendMessage start!"); producer.sendMessage("hello, Kafka!"); log.info("MessageController sendMessage end!"); return "Message sent successfully."; } @GetMapping("/send") public String sendMessage1() { log.info("MessageController sendMessage1 start!"); User user = User.builder().name("xuxin").dept("IT/DevlopMent").build(); try { producer.sendUser(user); } catch (JsonProcessingException e) { throw new RuntimeException(e); } log.info("MessageController sendMessage1 end!"); return "Message sendMessage1 successfully."; }}
6.查看结果:


详细代码见https://gitee.com/dylan_2017/springboot-kafka.git