WebSocket : 실시간 양방향 통신을 가능하게 하는 기술
Kafka : 여러 WebSocket 서버가 주고받는 메시지를 중앙에서 관리하고 메세지를 저장하며 배포하는 메세지 큐 시스템
Zookeeper : Kafka 클러스터 내 여러 서버가 동기화되고 협력할 수 있도록 상태를 관리
쉽게 말해서 WebSocket은 사용자가 실시간으로 채팅하는 것, Kafka는 채팅방에서 모든 메시지가 하나의 시스템에 모여서 관리되고 각 사용자에게 메시지를 전달하는 역할, Zookeeper는 채팅방이 원활하게 운영될 수 있도록 메시지 시스템(Kafka)을 관리하고 각 서버들이 잘 협력하도록 돕는 관리자로 정리할 수 있다.
따라서 WebSocket과 Kafka를 결합하여 실시간 메시지 시스템을 만들고 그 시스템이 안정적으로 작동하도록 Zookeeper가 돕는 구조이다.
Kafka 구성요소
- 프로듀서 (Producer) : 메세지를 생성해서 전송하는 사람
- 컨슈머 (Consumer) : 메세지를 받아서 처리하는 사람
- 브로커 (Broker) : 메세지를 저장하고 관리하는 서버
Kafka 흐름
프로듀서가 메세지를 Kafka의 Topic에 보낸다.
메세지가 Kafka의 브로커에 저장된다.
컨슈머가 Kafka의 Topic을 구독해 메세지를 읽고 메세지 처리한다.
Kafka 구현
https://taeh00n.tistory.com/entry/Spring-Boot-%EC%9B%B9-%EC%86%8C%EC%BC%93Web-Socket
[Spring Boot] 웹 소켓(Web Socket)
웹 소켓(Web Socket) : 클라이언트(브라우저)와 서버가 실시간으로 양방향 통신을 할 수 있도록 해주는 프로토콜 기존 HTTP 통신과 달리 한 번 연결을 맺으면 계속 유지되기 때문에 빠르고 효율적인
taeh00n.tistory.com
코드는 위의 내용에서 수정하겠다.
build.gradle 추가
implementation 'org.springframework.kafka:spring-kafka'
application.yml 추가
spring:
kafka:
bootstrap-servers: [Kafka 서버 IP 주소]:9092
consumer:
group-id: [그룹ID]
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
MessageHandler 클래스 HandleTextMessage 수정
@RequiredArgsConstructor
@Component
public class MessageHandler extends TextWebSocketHandler {
private final Set<WebSocketSession> sessions = new HashSet<>();
private final KafkaTemplate<String, String> kafkaTemplate;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
System.out.println("클라이언트가 연결했다.");
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
kafkaTemplate.send("chat", message.getPayload());
System.out.println(message+"메시지를 받았다.");
}
@KafkaListener(topics="chat", groupId ="matfia-group")
public void getMessaseFromKafka(ConsumerRecord<String, String> record) throws IOException {
for (WebSocketSession session : sessions) {
if (session.isOpen()) { // 세션 상태 확인
try {
session.sendMessage(new TextMessage(record.value()));
} catch (IllegalStateException e) {
System.err.println("닫힌 세션 전송 실패: " + session.getId());
sessions.remove(session);
}
} else {
System.out.println("닫힌 세션 제거: " + session.getId());
sessions.remove(session);
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
System.out.println("클라이언트가 연결을 종료했다.");
}
}
기존 코드에서 KafkaTemplate 의존성을 주입받고 getMessageFromKafka 메소드가 추가됐다.
handleTextMessage() 메소드는 이 메시지를 받아 Kafka의 chat 토픽으로 보낸다 - producer 역할
@KafkaListener가 chat 토픽을 구독해 메시지를 받아 웹소켓 클라이언트들에게 전달한다. - consumer 역할
위 코드에서는 Zookeeper를 별도로 설정하지 않아도 된다. Spring Kafka가 Zookeeper의 설정을 관리해주기에 기본적인 Kafka 클러스터를 사용하려면 Zookeeper의 설정은 필요없다.
리눅스 환경에서 구현
https://kafka.apache.org/downloads
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
kafka.apache.org
Kafka 설치
Broker, Producer, Consumer 서버에 모두 Kakfa를 설치한다
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar zxvf kafka_2.13-3.9.0.tgz
Kafka 서버 설정
/kafka_2.13-3.9.0/config 위치에서 server.properties 파일 수정
34번째 줄 주석 제거, 38번째 줄 주석 제거 후 IP 주소 설정
Zookeeper, Kafka 실행 (Broker 서버)
/kafka_2.13-3.9.0/bin 위치에서 Zookeeper와 Kafka 실행
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
하나의 서버에서 둘 다 실행해야한다.
메세지 보내는 서버 (Producer 서버)
/kafka_2.13-3.9.0/bin 위치에서 명령어 입력
./kafka-console-producer.sh --broker-list [Broker 서버 IP 주소]:9092 --topic [Topic명]
메세지 수신하는 서버 (Consumer 서버)
/kafka_2.13-3.9.0/bin 위치에서 명령어 입력
./kafka-console-consumer.sh --bootstrap-server [Broker 서버 IP 주소]:9092 --topic [Topic명]
'한화시스템 Beyond SW Camp > 백엔드' 카테고리의 다른 글
[Spring Boot] 멀티 모듈 프로젝트 (1) | 2025.02.22 |
---|---|
[Spring Boot] Stomp를 이용한 채팅 시스템 구현 (0) | 2025.02.19 |
[Spring Boot] 웹 소켓(Web Socket) (1) | 2025.02.18 |
[리눅스] 리눅스 환경에서 NFS로 파일 공유하기 (0) | 2025.02.16 |
[Spring Boot] Multipart 이미지 업로드 기능 구현 (0) | 2025.02.16 |