[카프카/스프링부트] Kafka On Docker(WLS2/Window) + SpringBoot 샘플코드

    728x90
    반응형

    카프카

    What is Kafka?

    분산 스트리밍 플랫폼으로, 실시간 데이터 스트리밍과 처리에 특화된 오픈소스 소프트웨어다.

     

    카프카의 주요 개념

    1. Producer: 데이터를 생성하여 카프카로 보내는 애플리케이션
    2. Consumer: 카프카에서 데이터를 읽어가는 애플리케이션
    3. Broker: 데이터를 저장하고 관리하는 서버. 카프카 클러스터를 구성하며 분산 처리의 핵심
    4. Topic: 메시지를 분류하여 저장하는 논리적 공간. Producer는 특정 Topic에 데이터를 보낸다.

     

    카프카의 주요 기능

    1. 메시지 큐 역할
      데이터(메시지)를 특정 순서로 안전하게 저장 및 전달한다.
      Producer가 보낸 메시지를 Consumer가 처리한다.
    2. 분산 시스템
      여러 Broker로 구성된 클러스터 형태로 동작해 높은 가용성과 확장성을 제공한다.
      장애 발생 시 데이터를 안전하게 복구한다.
    3. 실시간 스트리밍 처리
      빠르고 안정적으로 대량의 데이터를 실시간으로 처리할 수 있다.
      데이터 파이프라인을 구성하거나 실시간 이벤트 처리가 가능하다.

     

    카프카의 장점

    1. 확장성: 수평적 확장이 가능하다.
    2. 빠른 처리 속도: 고속으로 데이터를 처리할 수 있다.
    3. 유연성: 데이터 소비 패턴에 맞춰 설계가 가능하다.

    카프카 설치

    왜 도커를 사용하는게 좋을까?

    Kafka는 복잡한 설정이 필요한 분산 시스템이다.
    Docker 이미지를 활용하면 Kafka와 Zookeeper를 한 번에 설치하고 실행할 수 있다.

     

    도커로 카프카 설치하기

    #docker-compose.yml 작성
    [root@]$ vi kafka-docker-compose.yml
    
    version: '3.3'  # Docker Compose 파일의 버전. docker-compose의 버전에 맞게 수정이 필요할 수도 있음
    services:
      zookeeper:  # Zookeeper 서비스 정의
        image: wurstmeister/zookeeper:latest  # Zookeeper Docker 이미지. 최신 버전 사용.
        container_name: zookeeper  # 컨테이너 이름을 'zookeeper'로 지정.
        ports:
          - "2181:2181"  # 호스트의 2181 포트를 컨테이너의 2181 포트와 바인딩. Zookeeper 기본 포트.
    
      kafka:  # Kafka 서비스 정의
        image: wurstmeister/kafka:latest  # Kafka Docker 이미지. 최신 버전 사용.
        container_name: kafka  # 컨테이너 이름을 'kafka'로 지정.
        ports:
          - "9092:9092"  # 호스트의 9092 포트를 컨테이너의 9092 포트와 바인딩. Kafka 기본 포트.
        environment:  # Kafka에 전달할 환경 변수 설정.
          KAFKA_BROKER_ID: 1  # Kafka 브로커 ID. 클러스터 내에서 고유해야 함.
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  # Kafka가 연결할 Zookeeper 주소와 포트.
          KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092  # Kafka가 내부적으로 사용하는 리스너 주소.
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xxx.xxx.xxx.xxx:9092  # 외부 클라이언트가 Kafka에 접근할 때 사용할 주소. 호스트의 IP와 포트 지정.
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT  # Kafka 리스너의 보안 프로토콜 설정.
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock  # Docker 내부 통신을 위한 소켓 파일을 컨테이너에 마운트.

     

    # 커스텀 yml파일을 사용하여 정의된 컨테이너들을 백그라운드에서 실행
    [root@]$ docker-compose -f kafka-docker-compose.yml up -d

     

    # 컨테이너들이 실행되었는지 확인
    [root@]$ docker ps
    CONTAINER ID   IMAGE                           COMMAND                  CREATED      STATUS      PORTS                                                NAMES
    0092aa433c05   wurstmeister/kafka:latest       "start-kafka.sh"         5 days ago   Up 5 days   0.0.0.0:9092->9092/tcp                               kafka
    58dabc5566ba   wurstmeister/zookeeper:latest   "/bin/sh -c '/usr/sb…"   5 days ago   Up 5 days   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper

     

    # kafka 컨테이너에 대화형(-ti)으로 접속하여 Bash 셸을 실행
    [root@]$ docker exec -ti kafka bash
    
    
    # 토픽을 생성
    root@0092aa433c05:/# kafka-topics.sh --create --topic {토픽명} --bootstrap-server xxx.xxx.xxx.xxx:9092
    
    # 생성된 토픽을 조회
    root@0092aa433c05:/# kafka-topics.sh --describe --topic {토픽명} --bootstrap-server xxx.xxx.xxx.xxx:9092
    
    # Kafka 토픽에서 메시지를 읽어들임(소비)
    root@0092aa433c05:/# kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --from-beginning  --topic {토픽명}
    
    
    # Kafka 토픽으로 메시지를 전송
    root@0092aa433c05:/# kafka-console-producer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --topic {토픽명}

     

    스프링부트 카프카 템플릿

    package com.example.kafka;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Component
    public class KafkaLogger {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void logToKafka(String topic, String message) {
            try {
                // 메시지 전송
                kafkaTemplate.send("토픽", "전송할 데이터");
    
                System.out.println("Kafka Logging Successful!");
            } catch (Exception e) {
                System.err.println("Kafka Logging Failed!");
                e.printStackTrace();        // 예외의 스택 트레이스 출력
            }
        }
    }

     

    스프링부트 카프카 템플릿을 호출하는 서비스

    package com.example.service;
    
    import com.example.logging.KafkaLogger;
    import org.springframework.stereotype.Service;
    
    @Service
    public class LogService {
    	@Autowired
        private final KafkaLogger kafkaLogger;
    
        public void logToKafka(String topic, String message) {
            kafkaLogger.log(topic, message);
        }
    }

     

    728x90
    반응형

    댓글