Kafka 3개의 Broker 환경 구축하기 - Kraft Mode
Docker compose를 통해 Kafka 환경을 구축하려고 합니다.
- Kraft Mode
- Zoopkeeper가 아닌 Kraft 모드로 3개의 Broker 구축
- Network
- Kafka 서비스들이 서로 통신할 수 있도록 독립적인 도커 네트워크를 설정합니다. (kafka_network)
- Volume
- Kafka 서비스의 데이터 저장을 위해 각 Broker 별로 도커 볼륨을 설정합니다. (Kafka00, Kafka01, Kafka02)
Kafka Kraft 모드란
KRaft(Kafka Raft) 모드는 기존의 ZooKeeper에 의존하던 Kafka의 메타데이터 관리를 Kafka 자체에서 처리하도록 하는 새로운 아키텍처입니다.
KRaft 모드는 Kafka 2.8.0부터 도입되었으며, ZooKeeper를 사용하지 않고 Kafka 클러스터를 관리하는 방식입니다.
docker-compose 분석
networks:
kafka_network:
volumes:
Kafka00:
driver: local
Kafka01:
driver: local
Kafka02:
driver: local
services:
Kafka00Service:
image: bitnami/kafka:3.7.0
restart: unless-stopped
container_name: Kafka00Container
ports:
- '9092:9092' # 내부 네트워크 통신을 위한 PLAINTEXT Listener
- '10000:10000' # 외부 접근을 위한 EXTERNAL Listener
environment:
# KRaft settings
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# Listeners
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:10000
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://localhost:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- kafka_network
volumes:
- "Kafka00:/bitnami/kafka"
1. Networks & Volumes 정의
networks:
kafka_network:
volumes:
Kafka00:
driver: local
Kafka01:
driver: local
Kafka02:
driver: local
- networks: kafka_network라는 사용자 정의 네트워크가 정의되어 있음
- volumes: Kafka00, Kafka01, Kafka02라는 3 개의 볼륨으로 구성
2. Kafka 브로커 설정
services:
Kafka00Service:
image: bitnami/kafka:3.7.0
restart: unless-stopped
container_name: Kafka00Container
ports:
- '9092:9092'
- '10000:10000'
- image
- bitnami/kafka:3.7.0 이미지 사용
- restart
- unless-stopped 옵션으로 컨테이너가 중단되지 않는 한 다시 시작을 보장
- ports:
- 9092:9092: 내부 네트워크 통신을 위한 PLAINTEXT 리스너.
- Kafka 클라이언트가 브로커와 통신할 때 사용됩니다.
- 10000:10000: 외부 접근을 위한 EXTERNAL 리스너.
- 외부에서 Kafka 브로커에 접근할 때 사용됩니다. (ex: SpringBoot)
- 9092:9092: 내부 네트워크 통신을 위한 PLAINTEXT 리스너.
3. Kafka Environment 설정
services:
Kafka00Service:
image: bitnami/kafka:3.7.0
restart: unless-stopped
container_name: Kafka00Container
ports:
- '9092:9092' # 내부 네트워크 통신을 위한 PLAINTEXT Listener
- '10000:10000' # 외부 접근을 위한 EXTERNAL Listener
environment:
# KRaft settings
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# Listeners
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:10000
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://localhost:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
networks:
- kafka_network
volumes:
- "Kafka00:/bitnami/kafka"
KRaft 설정:
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- 새로운 토픽을 자동으로 생성하도록 설정합니다.
- KAFKA_CFG_BROKER_ID
- 브로커 ID를 설정합니다.
- 각 Broker별로 다르게 설정합니다.
- KAFKA_CFG_NODE_ID
- 노드 ID를 설정합니다.
- 각 Broker별로 다르게 설정합니다.
- KAFKA_KRAFT_CLUSTER_ID
- 클러스터 ID를 지정합니다.
- 모든 브로커가 동일한 클러스터 ID를 사용하여 같은 클러스터에 속하게 합니다.
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
- 클러스터에서 투표권을 가진 컨트롤러들을 설정합니다.
- 클러스터 내의 컨트롤러들에 대한 정보를 정의합니다.
- 각 컨트롤러는 다른 서비스의 컨트롤러와 통신하여 클러스터를 관리합니다.
- 0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093로 컨트롤러 설정이 되어 있습니다.
- KAFKA_CFG_PROCESS_ROLES
- 해당 브로커가 controller와 broker 역할을 모두 수행하도록 설정합니다.
Listener 설정:
- ALLOW_PLAINTEXT_LISTENER
- plain text 리스너를 허용합니다.
- 암호화되지 않은 평문 통신을 허용합니다.
- KAFKA_CFG_LISTENERS
- Kafka가 수신하는 포트를 정의합니다.
- 리스너를 설정하여 Kafka 클라이언트가 브로커에 연결할 수 있는 방법을 지정합니다.
- KAFKA_CFG_ADVERTISED_LISTENERS
- 외부 클라이언트에게 접속가능한 리스너 주소입니다.
- 리스너를 설정하여 Kafka 클라이언트가 브로커에 연결할 수 있는 방법을 지정합니다.
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
- 각 리스너의 보안 프로토콜을 매핑합니다.
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES
- 컨트롤러 역할을 하는 리스너를 지정합니다.
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME
- 브로커 간 통신에 사용하는 리스너를 설정합니다.
Kafka의 네트워크 설정
- KAFKA_CFG_LISTENERS
- 정의: Kafka 브로커가 수신할 수 있는 네트워크 포트와 주소를 정의합니다.
- 목적: Kafka 브로커는 이 설정을 통해 다양한 프로토콜 및 포트에서 클라이언트의 연결을 수락할 수 있습니다.
- 예시: 이 설정은 브로커가 9092 포트에서 PLAINTEXT 프로토콜을 통해 외부 클라이언트와 통신하며, 9093 포트에서 클러스터 컨트롤러와 통신하고, 10000 포트에서 EXTERNAL 프로토콜을 통해 다른 외부 연결을 수락함을 나타냅니다.
- KAFKA_CFG_ADVERTISED_LISTENERS
- 정의: 브로커가 클라이언트에게 광고할 네트워크 주소와 포트를 정의합니다.
- 목적: 클라이언트는 이 주소를 사용하여 브로커에 연결합니다. ADVERTISED_LISTENERS는 브로커가 클라이언트에게 자신을 어떻게 알릴지를 결정합니다.
- 예시: 이 설정은 내부 클라이언트에게는 Kafka02Service라는 호스트명과 9092 포트를 광고하며, 외부 클라이언트에게는 localhost와 10002 포트를 광고합니다.
연결 과정
- 클라이언트 연결 시도:
- 클라이언트가 Kafka 브로커에 연결을 시도할 때, KAFKA_CFG_LISTENERS에 정의된 주소와 포트를 사용합니다.
- 브로커와의 초기 연결:
- 클라이언트가 브로커와의 연결을 성공적으로 수립하면, 브로커는 KAFKA_CFG_ADVERTISED_LISTENERS에서 정의된 주소와 포트를 클라이언트에게 전달합니다.
- 광고된 주소 사용:
- 클라이언트는 브로커로부터 받은 광고된 주소를 사용하여 향후 통신을 진행합니다. 이는 브로커가 클라이언트에게 어떻게 접근할 수 있는지 알려줍니다.
Clustering 설정:
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
- Offset Topic의 복제본 수를 설정합니다.
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
- 트랜잭션 상태 로그의 복제본 수를 설정합니다.
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
- 트랜잭션 로그의 최소 ISR(In-Sync Replica)을 설정합니다.
4. Kafka UI 설정
services:
KafkaWebUiService:
image: provectuslabs/kafka-ui:latest
restart: unless-stopped
container_name: KafkaWebUiContainer
ports:
- '8080:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
depends_on:
- Kafka00Service
- Kafka01Service
- Kafka02Service
networks:
- kafka_network
- image
- provectuslabs/kafka-ui를 사용합니다.
- KAFKA_CLUSTERS_0_NAME
- Kafka UI에서 관리할 카프카 클러스터 이름을 설정합니다.
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
- Kafka 클러스터에 연결하기 위한 부트스트랩 서버의 주소를 지정합니다. (Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092)
- DYNAMIC_CONFIG_ENABLED
- 동적 구성에 대한 활성화 여부
- 실행 중인 Kafka 클러스터의 설정을 실시간으로 변경할 수 있게 합니다.
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED
- 특정 토픽에 대한 audit 기능을 활성화합니다.
docker kafka 실행하기
실행하기
docker-compose up -d
Kafka 테스트하기
도커 컨테이너: Kafka00Container 접속하여 아래의 실행 파일을 통해 카프카 환경을 테스트합니다.
kafka-topics.sh, kafka-console.producer.sh, kafka-console.consumer.sh
docker exec -it Kafka00Container /bin/bash
- 실행파일이 있는 위치로 이동합니다.
cd /opt/bitnami/kafka/bin
1. Topic Create
./kafka-topics.sh --create --topic tester --bootstrap-server Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092 --partitions 3 --replication-factor 2
Why 9092로 연결하는지,
📌 컨테이너 내부 통신
- Kafka 브로커는 기본적으로 9092번 포트를 통해 외부와 통신하도록 설정되어 있습니다.
- ports: '9093:9092'로 설정된 것은 호스트 머신의 포트 번호와 컨테이너 내부의 포트 번호를 포워딩한 것입니다.
- 컨테이너 내부에서 각 카프카 브로커와 연결을 하는 것이므로, 9092 포트로 연결합니다.
2. Console Producer Test
./kafka-console-producer.sh --topic tester --bootstrap-server Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
3. Console Consumer Test
./kafka-console-consumer.sh --topic tester --from-beginning --bootstrap-server Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
Spring application.yml 설정하기
server:
port: 8088
spring:
kafka:
bootstrap-servers: localhost:10000,localhost:10001,localhost:10002
consumer:
group-id: myGroup
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: '*'
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
//value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
📌 Spring application kafka 연결 이슈
스프링 application.yml 파일에서 아래와 같이 각 consumer, producer에 bootstrap-servers를 설정하니 연결이 안 된다는 오류가 있었습니다.
Docker Desktop을 통해 컨테이너와 볼륨이 정상적으로 생성되고 실행되는 것을 확인하였습니다. 또한, 도커 컨테이너에 접속한 후 topic 생성, producer, consumer.sh를 테스트했을 때 정상적으로 동작해 연결이 되지 않는 이유를 파악하기 어려웠습니다.
spring:
kafka:
consumer:
group-id: myGroup
auto-offset-reset: earliest
bootstrap-servers: localhost:10000,localhost:10001,localhost:10002
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:10000,localhost:10001,localhost:10002
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
이를 Docker compose 파일에서 Kafka 내부 포트와 외부 포트 포워딩을 설정하고, bootstrap-servers 설정을 kafka: 하위에 설정함으로써 연결 이슈를 해결하였습니다.
Kafka 내부 포트의 역할
- 내부 포트(9092, 9093, 9094)는 Kafka 브로커 간 통신을 위해 사용되며, 메타데이터 동기화와 데이터 복제를 담당합니다.
- 보안과 성능을 최적화하기 위해 클러스터 내부에서만 사용합니다.
내부 포트가 필요한 이유
1. Broker 간 메타데이터 동기화
- Kafka 클러스터는 여러 브로커로 구성되며, 각 브로커는 클러스터의 상태와 메타데이터(토픽, 파티션, 리더 브로커 정보 등)를 유지해야 합니다.
2. 파티션 리더와 팔로워 간의 데이터 복제
- Kafka는 고가용성과 내구성을 위해 파티션 리더가 데이터를 수신하면 팔로워 브로커에게 데이터를 복제합니다.
- 복제 과정도 내부 네트워크에서 발생하며, 내부 포트를 통해 브로커 간 통신이 이루어진다.
3. 보안
- 내부 포트를 사용하면 Kafka 클러스터 내의 통신이 외부로부터 격리됩니다.
- PLAINTEXT 리스너를 내부 네트워크 통신에만 사용함으로써 클러스터 외부에서의 무단 접근을 방지할 수 있습니다.
Kafka 외부 포트의 역할
- 외부 포트(10000, 10001, 10002)는 외부 애플리케이션이 Kafka 브로커에 연결하여 데이터를 송수신할 수 있도록 합니다.
외부 포트가 필요한 이유
외부 클라이언트 연결
- Kafka 클러스터는 다양한 외부 애플리케이션(예: Spring Boot 애플리케이션, 데이터 처리 파이프라인, 분석 도구 등)에서 데이터를 수집하고 분산 처리하기 위해 사용됩니다.
- 외부 포트(10000, 10001, 10002)는 이러한 애플리케이션들이 Kafka 브로커에 연결하여 데이터를 송수신할 수 있도록 합니다.
Bootstrap 서버로 사용
- 외부 애플리케이션은 Kafka 클러스터와의 초기 연결을 설정하기 위해 bootstrap-servers를 사용합니다.
- 예를 들어, Spring 애플리케이션의 application.yml에서 bootstrap-servers 설정을 통해 Kafka 브로커에 연결을 설정할 수 있습니다.
'Infra > Kafka' 카테고리의 다른 글
[Kafka] Kafka 개념 이해하기, Producer, Consumer, Broker (1) | 2024.08.30 |
---|