본 글에서는 springboot에서 kafka를 사용하여 메시지 pub/sub을 구현해보고자 합니다.
spring에서 kafka로 pub/sub을 구현후 테스트하기전에, local에서 kafka 서버를 띄워야합니다. 이전 글에서 kafka 로컬에서 띄우는 방법을 작성하였으니, 해당 글을 참고하여 4. Topic 생성까지 완료 후 아래 spring kafka 테스트를 진행해주세요. 주키퍼와 kafka 서버가 각각 띄워져있는 상태여야 spring에서 kafka 로컬 테스트가 가능합니다.
kafka Window 환경에서 로컬 설치 후 pub/sub 테스트
kafka Window 환경에서 로컬 설치 후 pub/sub 테스트
본 글에서는 마이크로서비스아키텍쳐에서 주로 사용하는 kafka를 로컬 window PC에 설치하여 간단하게 pub/sub하는 것을 구현해보겠습니다. - kafka 설치 https://kafka.apache.org/downloads 먼저 다운로드 주소
dnl1029.tistory.com
-의존성
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- application.yml
: kafka 위에 있는 설정들은 db 관련 설정들이니 넘어가셔도 됩니다. bootstrap-servers에 kafka 서버 ip, 포트를 입력해야합니다. kafka 관련 서버가 따로 있다고 가정하겠습니다.
spring:
lifecycle:
timeout-per-shutdown-phase: 40s #디폴트값은 30초
data:
mongodb:
# uri: mongodb+srv://admin:내암호@cluster0.1p17esk.mongodb.net/sample_airbnb
uri: ENC(1kiaGE3RJMoGZi9TFdjEj4IxeCJyofdL9i5bpvbN+N5pidF/5WUuNeDhGCLTXgQHUpFxFC2P1uOaEDDWv5OdLwiNVw2FT3mFNr4pB73Nb5E=)
datasource:
driver-class-name: org.h2.Driver
url: jdbc:h2:tcp://localhost/~/test
username: sa
password:
kafka:
bootstrap-servers:
localhost:9092
consumer:
group-id: groupKyu
auto-offset-reset : latest #가장 최근에 생산된 메시지로 offeset reset
enable-auto-commit: true #ack를 자동으로 보내기
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
정말 단순한 pub/sub구현은 아래와같이 구현할수있습니다. spring에서 bean을 등록하고 의존성주입을 통해 사용하는것이 기본이나, kafka는 의존성 추가만 해도 auto configuration을 통해 별도로 bean등록없이 kafkaTemplate을 의존성 주입후 사용할 수 있습니다.
-pub
@Slf4j
@RequiredArgsConstructor
@Component
public class MessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("kyu",message); //topic, 메시지 보내기
}
}
-sub
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageConsumer {
@KafkaListener(topics = "kyu", groupId = "groupKyu")
public void receiveMessage(ConsumerRecord<String, String> record){
log.info("Consume message : {}", record.value());
}
}
이제 조금더 심화된 내용입니다.
-config
: kafka 메시지의 크기를 제한하고 싶을때는, 아래와 같이 Java쪽 config를 추가하여 kafKaTemplate을 수동 Bean 등록해줘야 합니다. 아래는 2MB로 카프카 메시지 크기를 제한한 config 입니다.
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
@Bean
public ProducerFactory<String,String> producerFactory(){
Map<String,Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 2);
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(producerConfig);
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
- Producer
: sendMessage는 아주 단순하게 메시지를 보내는 것이고, sendMessageV2는 비동기방식으로 메시지를 보내는 방식입니다. 이전에는 ListenableFuture을 사용하여, addCallback 메서드로 kafka 비동기를 구현했는데, spring 3.0 이상 jdk17 기준으로 CompletableFuture에 whenComplete 메서드를 사용하는 방식으로 변경되었습니다.
@Slf4j
@RequiredArgsConstructor
@Component
public class MessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(MessageDto messageDto) {
kafkaTemplate.send("kyu",messageDto.getMessage()); //topic, 메시지 보내기
log.info("sendMessage : {}",messageDto.getMessage());
}
public void sendMessageV2(MessageDto messageDto) {
/* springboot 3.0 이전
ListenableFuture<SendResult<String, String>> future_old = kafkaTemplate.send("kyu", message);
future_old.addCallback(new ListenableFutureCallback<SendResult<String,String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Success");
}
@Override
public void onFailure(Throwable ex) {
log.info("Failure");
}
});
*/
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("kyu", messageDto.getMessage());
future.whenComplete(((stringStringSendResult, throwable) -> {
if(throwable == null) {
log.info("sendMessageV2 success, message : {}",messageDto.getMessage());
}
else {
log.info("Failure",throwable.getMessage(),throwable);
}
}));
}
}
-Consumer
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageConsumer {
@KafkaListener(topics = "kyu", groupId = "groupKyu")
public void receiveMessage(ConsumerRecord<String, String> record){
log.info("Consume message : {}", record.value());
}
}
-Dto
@Data
public class MessageDto {
private String message;
}
- Controller
: 이제 kafka pub/sub을 테스트 하는 api를 만들어보겠습니다. 하나의 MessageDto를 받아, sendMessage와 sendMessageV2를 둘다 호출하여 두개의 pub을 보내보겠습니다.
@RestController
@RequestMapping("api")
@RequiredArgsConstructor
@Slf4j
public class ProducerController {
private final MessageProducer messageProducer;
@PostMapping("kafka/test")
public void sendMessage(@RequestBody MessageDto messageDto){
log.info("controller");
messageProducer.sendMessage(messageDto);
messageProducer.sendMessageV2(messageDto);
}
}
- Swagger 테스트
http://localhost:8080/swagger-ui/index.html
- 결과 : 하나의 sub으로 두개의 메시지를 받아 테스트 메시지 로그도 두번 호출됨을 확인할수 있습니다.
'kafka' 카테고리의 다른 글
spring kafka HealthIndicator 사용하여 HealthCheck 구현 (0) | 2023.07.31 |
---|---|
spring kafka ConsumerAwareRebalanceListener 구현 (0) | 2023.07.22 |
kafka Window 환경에서 로컬 설치 후 pub/sub 테스트 (0) | 2023.07.21 |
댓글