清华主页 - 清华新闻 - 综合时讯 - 正文

SpringBoot Kafka发送消息与接收消息实例

前言 

Kafka的基本工作原理 

 我们将消息的发布(publish)称作 producer(生产者),将消息的订阅(subscribe)表述为 consumer(消费者),将中间的存储阵列称作 broker(代理),这样就可以大致描绘出这样一个场面:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。 

 1.引入spring-kafka的jar包

在pom.xml里面导入spring-kafka包

    4.0.0            org.springframework.boot        spring-boot-starter-parent        2.7.4                 com.example    SpringBootKafka    0.0.1-SNAPSHOT    SpringBootKafka    SpringBootKafka            1.8                            org.springframework.boot            spring-boot-starter-web                            org.springframework.boot            spring-boot-starter-test            test                                    org.springframework.kafka            spring-kafka                            log4j            log4j            1.2.17                            org.projectlombok            lombok                            org.apache.kafka            kafka-streams                                    central            aliyun maven            https://maven.aliyun.com/repository/public/            default                                        true                                                    false                                                                org.springframework.boot                spring-boot-maven-plugin                        

2.编写配置文件

在src/main/resources/application.yml里面编写kafka的配置,包括生成者和消费者 

spring:  kafka:    bootstrap-servers: 192.168.110.105:9092    #streams:      #application-id: my-streams-app    consumer:      group-id: myGroupId      auto-offset-reset: latest      enable-auto-commit: true      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer      retries: 5

3.编写生产者

使用org.springframework.kafka.core.KafkaTemplate来发送消息,这里采用了异步方式,获取了消息的处理结果

package com.example.springbootkafka.service;import com.example.springbootkafka.entity.User;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.CompletableFuture;import java.util.concurrent.Future;@Slf4j@Servicepublic class KafkaProducer {     private final KafkaTemplate kafkaTemplate;    private final ObjectMapper objectMapper;    @Autowired    public KafkaProducer(KafkaTemplate kafkaTemplate, ObjectMapper objectMapper) {         this.kafkaTemplate = kafkaTemplate;        this.objectMapper = objectMapper;    }    public void sendMessage(String message) {         log.info("KafkaProducer message:{ }", message);        //kafkaTemplate.send("test", message).addCallback();        Future> future = kafkaTemplate.send("test", message);        CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {             try {                 future.get(); // 等待原始future完成            } catch (Exception e) {                 throw new RuntimeException(e);            }        });// 使用whenComplete方法        completableFuture.whenComplete((result, ex) -> {             if (ex != null) {                 System.out.println("Error occurred: " + ex.getMessage());                // 成功发送            } else {                 System.out.println("Completed successfully");            }        });        /*future.whenComplete((result, ex) -> {             if (ex == null) {                 // 成功发送                RecordMetadata metadata = result.getRecordMetadata();                System.out.println("Message sent successfully with offset: " + metadata.offset());            } else {                 // 发送失败                System.err.println("Failed to send message due to: " + ex.getMessage());            }        });*/    }    public void sendUser(User user) throws JsonProcessingException {         //final ProducerRecord record = createRecord(data);        //ListenableFuture> future = kafkaTemplate.send("test", message);        //ListenableFuture> future = kafkaTemplate.send("test", user);        String userJson = objectMapper.writeValueAsString(user);        ListenableFuture> future = kafkaTemplate.send("test", userJson);        /*future.addCallback(                success -> System.out.println("Message sent successfully: " + userJson),                failure -> System.err.println("Failed to send message: " + failure.getMessage())        );*/        CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {             try {                 future.get(); // 等待原始future完成            } catch (Exception e) {                 throw new RuntimeException(e);            }        });        completableFuture.whenComplete((result, ex) -> {             if (ex != null) {                 System.out.println("Error occurred: " + ex.getMessage());                // 成功发送            } else {                 System.out.println("Completed successfully");            }        });    }}

 4.编写消费者

通过org.springframework.kafka.annotation.KafkaListener来监听消息

package com.example.springbootkafka.service;import lombok.extern.slf4j.Slf4j;import org.apache.log4j.Logger;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;@Slf4j@Servicepublic class KafkaConsumer {     @KafkaListener(topics = "test", groupId = "myGroupId")    public void consume(String message) {         System.out.println("Received message: " + message);        log.info("KafkaConsumer message:{ }", message);    }}

5.测试消息的生成与发送

package com.example.springbootkafka.controller;import com.example.springbootkafka.entity.User;import com.example.springbootkafka.service.KafkaProducer;import com.fasterxml.jackson.core.JsonProcessingException;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@Slf4j@RestControllerpublic class MessageController {     private final KafkaProducer producer;    @Autowired    public MessageController(KafkaProducer producer) {         this.producer = producer;    }    @GetMapping("/send-message")    public String sendMessage() {         log.info("MessageController sendMessage start!");        producer.sendMessage("hello, Kafka!");        log.info("MessageController sendMessage end!");        return "Message sent successfully.";    }    @GetMapping("/send")    public String sendMessage1() {         log.info("MessageController sendMessage1 start!");        User user = User.builder().name("xuxin").dept("IT/DevlopMent").build();        try {             producer.sendUser(user);        } catch (JsonProcessingException e) {             throw new RuntimeException(e);        }        log.info("MessageController sendMessage1 end!");        return "Message sendMessage1 successfully.";    }}

 6.查看结果:

 

详细代码见https://gitee.com/dylan_2017/springboot-kafka.git

2025-06-24 12:37:55

相关新闻

清华大学新闻中心版权所有,清华大学新闻网编辑部维护,电子信箱: news@tsinghua.edu.cn
Copyright 2001-2020 news.tsinghua.edu.cn. All rights reserved.