본문 바로가기
한화시스템 Beyond SW Camp/백엔드

[Spring Boot] Kafka, Zookeeper

by taeh00n 2025. 2. 18.

WebSocket : 실시간 양방향 통신을 가능하게 하는 기술

Kafka : 여러 WebSocket 서버가 주고받는 메시지를 중앙에서 관리하고 메세지를 저장하며 배포하는 메세지 큐 시스템

Zookeeper : Kafka 클러스터 내 여러 서버가 동기화되고 협력할 수 있도록 상태를 관리

 

쉽게 말해서 WebSocket은 사용자가 실시간으로 채팅하는 것, Kafka는 채팅방에서 모든 메시지가 하나의 시스템에 모여서 관리되고 각 사용자에게 메시지를 전달하는 역할, Zookeeper는 채팅방이 원활하게 운영될 수 있도록 메시지 시스템(Kafka)을 관리하고 각 서버들이 잘 협력하도록 돕는 관리자로 정리할 수 있다.

 

따라서 WebSocketKafka를 결합하여 실시간 메시지 시스템을 만들고 그 시스템이 안정적으로 작동하도록 Zookeeper가 돕는 구조이다.

 

Kafka 구성요소

  • 프로듀서 (Producer) : 메세지를 생성해서 전송하는 사람
  • 컨슈머 (Consumer) : 메세지를 받아서 처리하는 사람
  • 브로커 (Broker) : 메세지를 저장하고 관리하는 서버

Kafka 흐름

프로듀서메세지를 Kafka의 Topic에 보낸다.

메세지가 Kafka의 브로커에 저장된다.

컨슈머가 Kafka의 Topic을 구독해 메세지를 읽고 메세지 처리한다.

 

Kafka 구현

https://taeh00n.tistory.com/entry/Spring-Boot-%EC%9B%B9-%EC%86%8C%EC%BC%93Web-Socket

 

[Spring Boot] 웹 소켓(Web Socket)

웹 소켓(Web Socket) : 클라이언트(브라우저)와 서버가 실시간으로 양방향 통신을 할 수 있도록 해주는 프로토콜 기존 HTTP 통신과 달리 한 번 연결을 맺으면 계속 유지되기 때문에 빠르고 효율적인

taeh00n.tistory.com

코드는 위의 내용에서 수정하겠다.

 

build.gradle 추가

implementation 'org.springframework.kafka:spring-kafka'

 

application.yml 추가

spring:
  kafka:
    bootstrap-servers: [Kafka 서버 IP 주소]:9092
    consumer:
      group-id: [그룹ID]
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

MessageHandler 클래스 HandleTextMessage 수정

@RequiredArgsConstructor
@Component
public class MessageHandler extends TextWebSocketHandler {
    private final Set<WebSocketSession> sessions = new HashSet<>();
    private final KafkaTemplate<String, String> kafkaTemplate;


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessions.add(session);
        System.out.println("클라이언트가 연결했다.");
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        kafkaTemplate.send("chat", message.getPayload());
        System.out.println(message+"메시지를 받았다.");

    }

    @KafkaListener(topics="chat", groupId ="matfia-group")
    public void getMessaseFromKafka(ConsumerRecord<String, String> record) throws IOException {
        for (WebSocketSession session : sessions) {
            if (session.isOpen()) { //  세션 상태 확인
                try {
                    session.sendMessage(new TextMessage(record.value()));
                } catch (IllegalStateException e) {
                    System.err.println("닫힌 세션 전송 실패: " + session.getId());
                    sessions.remove(session);
                }
            } else {
                System.out.println("닫힌 세션 제거: " + session.getId());
                sessions.remove(session);
            }
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        sessions.remove(session);
        System.out.println("클라이언트가 연결을 종료했다.");
    }
}

기존 코드에서 KafkaTemplate 의존성을 주입받고 getMessageFromKafka 메소드가 추가됐다.

 

handleTextMessage() 메소드는 이 메시지를 받아 Kafka의 chat 토픽으로 보낸다 - producer 역할

@KafkaListenerchat 토픽을 구독해 메시지를 받아 웹소켓 클라이언트들에게 전달한다. - consumer 역할

 

위 코드에서는 Zookeeper를 별도로 설정하지 않아도 된다. Spring Kafka가 Zookeeper의 설정을 관리해주기에 기본적인 Kafka 클러스터를 사용하려면 Zookeeper의 설정은 필요없다.


리눅스 환경에서 구현

https://kafka.apache.org/downloads

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

Kafka 설치

Broker, Producer, Consumer 서버에 모두 Kakfa를 설치한다

wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar zxvf kafka_2.13-3.9.0.tgz

 

Kafka 서버 설정

/kafka_2.13-3.9.0/config 위치에서 server.properties 파일 수정

34번째주석 제거, 38번째주석 제거 후 IP 주소 설정

 

Zookeeper, Kafka 실행 (Broker 서버)

/kafka_2.13-3.9.0/bin 위치에서 ZookeeperKafka 실행

./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties

하나의 서버에서 둘 다 실행해야한다.

 

메세지 보내는 서버 (Producer 서버)

/kafka_2.13-3.9.0/bin 위치에서 명령어 입력

./kafka-console-producer.sh --broker-list [Broker 서버 IP 주소]:9092 --topic [Topic명]

메세지 수신하는 서버 (Consumer 서버)

/kafka_2.13-3.9.0/bin 위치에서 명령어 입력

./kafka-console-consumer.sh --bootstrap-server [Broker 서버 IP 주소]:9092 --topic [Topic명]