728x90
반응형
카프카
What is Kafka?
분산 스트리밍 플랫폼으로, 실시간 데이터 스트리밍과 처리에 특화된 오픈소스 소프트웨어다.
카프카의 주요 개념
- Producer: 데이터를 생성하여 카프카로 보내는 애플리케이션
- Consumer: 카프카에서 데이터를 읽어가는 애플리케이션
- Broker: 데이터를 저장하고 관리하는 서버. 카프카 클러스터를 구성하며 분산 처리의 핵심
- Topic: 메시지를 분류하여 저장하는 논리적 공간. Producer는 특정 Topic에 데이터를 보낸다.
카프카의 주요 기능
- 메시지 큐 역할
데이터(메시지)를 특정 순서로 안전하게 저장 및 전달한다.
Producer가 보낸 메시지를 Consumer가 처리한다. - 분산 시스템
여러 Broker로 구성된 클러스터 형태로 동작해 높은 가용성과 확장성을 제공한다.
장애 발생 시 데이터를 안전하게 복구한다. - 실시간 스트리밍 처리
빠르고 안정적으로 대량의 데이터를 실시간으로 처리할 수 있다.
데이터 파이프라인을 구성하거나 실시간 이벤트 처리가 가능하다.
카프카의 장점
- 확장성: 수평적 확장이 가능하다.
- 빠른 처리 속도: 고속으로 데이터를 처리할 수 있다.
- 유연성: 데이터 소비 패턴에 맞춰 설계가 가능하다.
카프카 설치
왜 도커를 사용하는게 좋을까?
Kafka는 복잡한 설정이 필요한 분산 시스템이다.
Docker 이미지를 활용하면 Kafka와 Zookeeper를 한 번에 설치하고 실행할 수 있다.
도커로 카프카 설치하기
#docker-compose.yml 작성
[root@]$ vi kafka-docker-compose.yml
version: '3.3' # Docker Compose 파일의 버전. docker-compose의 버전에 맞게 수정이 필요할 수도 있음
services:
zookeeper: # Zookeeper 서비스 정의
image: wurstmeister/zookeeper:latest # Zookeeper Docker 이미지. 최신 버전 사용.
container_name: zookeeper # 컨테이너 이름을 'zookeeper'로 지정.
ports:
- "2181:2181" # 호스트의 2181 포트를 컨테이너의 2181 포트와 바인딩. Zookeeper 기본 포트.
kafka: # Kafka 서비스 정의
image: wurstmeister/kafka:latest # Kafka Docker 이미지. 최신 버전 사용.
container_name: kafka # 컨테이너 이름을 'kafka'로 지정.
ports:
- "9092:9092" # 호스트의 9092 포트를 컨테이너의 9092 포트와 바인딩. Kafka 기본 포트.
environment: # Kafka에 전달할 환경 변수 설정.
KAFKA_BROKER_ID: 1 # Kafka 브로커 ID. 클러스터 내에서 고유해야 함.
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # Kafka가 연결할 Zookeeper 주소와 포트.
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # Kafka가 내부적으로 사용하는 리스너 주소.
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://xxx.xxx.xxx.xxx:9092 # 외부 클라이언트가 Kafka에 접근할 때 사용할 주소. 호스트의 IP와 포트 지정.
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT # Kafka 리스너의 보안 프로토콜 설정.
volumes:
- /var/run/docker.sock:/var/run/docker.sock # Docker 내부 통신을 위한 소켓 파일을 컨테이너에 마운트.
# 커스텀 yml파일을 사용하여 정의된 컨테이너들을 백그라운드에서 실행
[root@]$ docker-compose -f kafka-docker-compose.yml up -d
# 컨테이너들이 실행되었는지 확인
[root@]$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0092aa433c05 wurstmeister/kafka:latest "start-kafka.sh" 5 days ago Up 5 days 0.0.0.0:9092->9092/tcp kafka
58dabc5566ba wurstmeister/zookeeper:latest "/bin/sh -c '/usr/sb…" 5 days ago Up 5 days 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper
# kafka 컨테이너에 대화형(-ti)으로 접속하여 Bash 셸을 실행
[root@]$ docker exec -ti kafka bash
# 토픽을 생성
root@0092aa433c05:/# kafka-topics.sh --create --topic {토픽명} --bootstrap-server xxx.xxx.xxx.xxx:9092
# 생성된 토픽을 조회
root@0092aa433c05:/# kafka-topics.sh --describe --topic {토픽명} --bootstrap-server xxx.xxx.xxx.xxx:9092
# Kafka 토픽에서 메시지를 읽어들임(소비)
root@0092aa433c05:/# kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --from-beginning --topic {토픽명}
# Kafka 토픽으로 메시지를 전송
root@0092aa433c05:/# kafka-console-producer.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --topic {토픽명}
스프링부트 카프카 템플릿
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class KafkaLogger {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void logToKafka(String topic, String message) {
try {
// 메시지 전송
kafkaTemplate.send("토픽", "전송할 데이터");
System.out.println("Kafka Logging Successful!");
} catch (Exception e) {
System.err.println("Kafka Logging Failed!");
e.printStackTrace(); // 예외의 스택 트레이스 출력
}
}
}
스프링부트 카프카 템플릿을 호출하는 서비스
package com.example.service;
import com.example.logging.KafkaLogger;
import org.springframework.stereotype.Service;
@Service
public class LogService {
@Autowired
private final KafkaLogger kafkaLogger;
public void logToKafka(String topic, String message) {
kafkaLogger.log(topic, message);
}
}
728x90
반응형
'오픈소스' 카테고리의 다른 글
[도커] 웹 어플리케이션 도커 환경 구성하기 (0) | 2024.12.26 |
---|---|
[logback/k6] Rolling File Appenders 샘플 및 k6 테스트 (0) | 2024.12.06 |
[오픈소스] Istio (0) | 2024.10.09 |
[OpenAPI] SVNKit 샘플코드 및 사용후기 (0) | 2023.05.10 |
[OpenAPI] KOMORAN 형태소 분석기 샘플코드 및 사용 후기 (0) | 2023.03.19 |
댓글