본문 바로가기
kafka

spring kafka pub/sub 구현

by 토망이 2023. 7. 22.

본 글에서는 springboot에서 kafka를 사용하여 메시지 pub/sub을 구현해보고자 합니다. 

spring에서 kafka로 pub/sub을 구현후 테스트하기전에, local에서 kafka 서버를 띄워야합니다. 이전 글에서 kafka 로컬에서 띄우는 방법을 작성하였으니, 해당 글을 참고하여 4. Topic 생성까지 완료 후 아래 spring kafka 테스트를 진행해주세요. 주키퍼와 kafka 서버가 각각 띄워져있는 상태여야 spring에서 kafka 로컬 테스트가 가능합니다.

kafka Window 환경에서 로컬 설치 후 pub/sub 테스트

 

kafka Window 환경에서 로컬 설치 후 pub/sub 테스트

본 글에서는 마이크로서비스아키텍쳐에서 주로 사용하는 kafka를 로컬 window PC에 설치하여 간단하게 pub/sub하는 것을 구현해보겠습니다. - kafka 설치 https://kafka.apache.org/downloads 먼저 다운로드 주소

dnl1029.tistory.com

 

-의존성

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

- application.yml

: kafka 위에 있는 설정들은 db 관련 설정들이니 넘어가셔도 됩니다. bootstrap-servers에 kafka 서버 ip, 포트를 입력해야합니다. kafka 관련 서버가 따로 있다고 가정하겠습니다.

spring:
  lifecycle:
    timeout-per-shutdown-phase: 40s #디폴트값은 30초
  data:
    mongodb:
#      uri: mongodb+srv://admin:내암호@cluster0.1p17esk.mongodb.net/sample_airbnb
      uri: ENC(1kiaGE3RJMoGZi9TFdjEj4IxeCJyofdL9i5bpvbN+N5pidF/5WUuNeDhGCLTXgQHUpFxFC2P1uOaEDDWv5OdLwiNVw2FT3mFNr4pB73Nb5E=)
  datasource:
    driver-class-name: org.h2.Driver
    url: jdbc:h2:tcp://localhost/~/test
    username: sa
    password:
  kafka:
    bootstrap-servers:
      localhost:9092
    consumer:
      group-id: groupKyu
      auto-offset-reset : latest #가장 최근에 생산된 메시지로 offeset reset
      enable-auto-commit: true #ack를 자동으로 보내기
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
       value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

정말 단순한 pub/sub구현은 아래와같이 구현할수있습니다. spring에서 bean을 등록하고 의존성주입을 통해 사용하는것이 기본이나, kafka는 의존성 추가만 해도 auto configuration을 통해 별도로 bean등록없이 kafkaTemplate을 의존성 주입후 사용할 수 있습니다. 

-pub

@Slf4j
@RequiredArgsConstructor
@Component
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("kyu",message); //topic, 메시지 보내기
    }

}

 

-sub

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageConsumer {

    @KafkaListener(topics = "kyu", groupId = "groupKyu")
    public void receiveMessage(ConsumerRecord<String, String> record){
        log.info("Consume message : {}", record.value());
    }

}

 

이제 조금더 심화된 내용입니다.

 

-config

: kafka 메시지의 크기를 제한하고 싶을때는, 아래와 같이 Java쪽 config를 추가하여 kafKaTemplate을 수동 Bean 등록해줘야 합니다. 아래는 2MB로 카프카 메시지 크기를 제한한 config 입니다.

@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
    @Bean
    public ProducerFactory<String,String> producerFactory(){
        Map<String,Object> producerConfig = new HashMap<>();
        producerConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 2);
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(producerConfig);
    }
    @Bean
    public KafkaTemplate<String,String> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

 

- Producer 

: sendMessage는 아주 단순하게 메시지를 보내는 것이고, sendMessageV2는 비동기방식으로 메시지를 보내는 방식입니다. 이전에는 ListenableFuture을 사용하여, addCallback 메서드로 kafka 비동기를 구현했는데, spring 3.0 이상 jdk17 기준으로 CompletableFuture에 whenComplete 메서드를 사용하는 방식으로 변경되었습니다.

@Slf4j
@RequiredArgsConstructor
@Component
public class MessageProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(MessageDto messageDto) {
        kafkaTemplate.send("kyu",messageDto.getMessage()); //topic, 메시지 보내기
        log.info("sendMessage : {}",messageDto.getMessage());
    }

    public void sendMessageV2(MessageDto messageDto) {

        /* springboot 3.0 이전
        ListenableFuture<SendResult<String, String>> future_old = kafkaTemplate.send("kyu", message);
        future_old.addCallback(new ListenableFutureCallback<SendResult<String,String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Success");
            }

            @Override
            public void onFailure(Throwable ex) {
                log.info("Failure");
            }
        });
         */

        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("kyu", messageDto.getMessage());
        future.whenComplete(((stringStringSendResult, throwable) -> {
            if(throwable == null) {
                log.info("sendMessageV2 success, message : {}",messageDto.getMessage());
            }
            else {
                log.info("Failure",throwable.getMessage(),throwable);
            }
        }));

    }

}

 

-Consumer

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageConsumer {

    @KafkaListener(topics = "kyu", groupId = "groupKyu")
    public void receiveMessage(ConsumerRecord<String, String> record){
        log.info("Consume message : {}", record.value());
    }

}

 

-Dto 

@Data
public class MessageDto {

    private String message;

}

 

- Controller

: 이제 kafka pub/sub을 테스트 하는 api를 만들어보겠습니다. 하나의 MessageDto를 받아, sendMessage와 sendMessageV2를 둘다 호출하여 두개의 pub을 보내보겠습니다. 

@RestController
@RequestMapping("api")
@RequiredArgsConstructor
@Slf4j
public class ProducerController {

    private final MessageProducer messageProducer;

    @PostMapping("kafka/test")
    public void sendMessage(@RequestBody MessageDto messageDto){
        log.info("controller");
        messageProducer.sendMessage(messageDto);
        messageProducer.sendMessageV2(messageDto);

    }
}

 

- Swagger 테스트

http://localhost:8080/swagger-ui/index.html

 

- 결과 : 하나의 sub으로 두개의 메시지를 받아 테스트 메시지 로그도 두번 호출됨을 확인할수 있습니다.

댓글