본문 바로가기
kafka

spring kafka ConsumerAwareRebalanceListener 구현

by 토망이 2023. 7. 22.

본글에서는 kafka consumer 단에서 사용가능한 ConsumerAwareRebalanceListener를 구현해보도록하겠습니다. spring kafka 관련 설정 및 코드는 이전글을 참고해주세요.

spring kafka pub/sub 구현

 

spring kafka pub/sub 구현

본 글에서는 springboot에서 kafka를 사용하여 메시지 pub/sub을 구현해보고자 합니다. spring에서 kafka로 pub/sub을 구현후 테스트하기전에, local에서 kafka 서버를 띄워야합니다. 이전 글에서 kafka 로컬에서

dnl1029.tistory.com

 

ConsumerAwareRebalanceListener의 메서드는 docs.spring에도 잘 나와있습니다.(https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.html)

 

이 중에서 onPartitionsAssigned 메서드만 override하여 구현체를 만들어보고자 합니다. kafka에서는 리밸런싱이 주기적으로 이루어지는데, 컨슈머와 파티션의 매핑을 주기적으로 수행하는 kafka 고유의 기능이며, onPartitionsAssigned 메서드는 로컬에서 서버를 처음 올릴때도 호출이됩니다. 이를 통해 로컬에서 현재 내 파티션의 offset 값을 로깅하는 식으로 활용할수 있습니다.

 

- MyListener

@Component
@Slf4j
public class MyListener implements ConsumerAwareRebalanceListener {

    //파티션 재할당시 호출(서버 처음 구동시에도 호출)
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        log.info("# PartitionsAssigned. metadata: {}, partitions: {}"
                , consumer.groupMetadata()
                , partitions);

        partitions
                .forEach(partition -> {
                    // position에 나온 offset-1 이 현재까지 발행된 offset 수이며, position 값은 이제 들어올 예정인 offset
                    log.info("partition: " + partition.partition() + ", position(readOffset): " + consumer.position(partition) + " , topic: " + partition.topic());
                });
    }
}

 

현재 로컬 구동시, 아래와 같이 groupId, 파티션, offset등의 정보를 불러와서 로그를 초기 1회 남기게 됩니다. position이 22라는 의미는 현재 offset = '21' 까지 발행이 되었으며, offset 22번부터 들어올 예정이라는 뜻입니다.

 

현재 제 서비스는 컨트롤러에서 pub api 호출시, 2개의 메시지가 발행되는 구조로 되어있습니다. 그래서 api를 1회 호출시, offset = '22', offset = '23' 발행, position 24로 출력될겁니다.  

@RestController
@RequestMapping("api")
@RequiredArgsConstructor
@Slf4j
public class ProducerController {

    private final MessageProducer messageProducer;

    @PostMapping("kafka/test")
    public void sendMessage(@RequestBody MessageDto messageDto){
        log.info("controller");
        messageProducer.sendMessage(messageDto);
        messageProducer.sendMessageV2(messageDto);

    }
}

 

아래와 같이 sendMessage api를 1회 실행하여, 메시지를 두개 발행시켜 봅시다. onPartitionsAssigned를 다시 확인하려면, 메시지 두개 발행후, 로컬 서버를 내렸다가 다시 올리면 onPartitionsAssigned 메서드가 다시 호출됩니다.

 

서버 restart

댓글