-
kafka 기본 사용법kafka 2022. 1. 20. 09:36728x90
kafka 스터디 내용 정리
kafka 설치
이 글 작성 시점엔 2.13 버전, stable버전 설치 하였음
https://kafka.apache.org/downloads
wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz tar -xvf kafka_2.13-3.0.0.tgz
kafka broker 실행 옵션 설정
추후 운영중 옵션을 바꿀경우에 브로커를 재시작 해야하므로 주의해야 한다.
vi config/server/properties ... #이후 아래와 같이 설정 후 저장 #ec2등에서 실행할 경우 혹은 다른 경우에 ip를 입력. 나의 경우는 로컬 맥북에서 테스트 하여 127.0.0.1 입력 advertised.listeners=PLAINTEXT://127.0.0.1:9092
zookeeper 시작
# -daemon , 백그라운드 모드로 실행한다. bin/zookeeper-server-start.sh -daemon config/zookeeper.properties #jps -vm 으로 동작 확인, jps 는 JVM 프로세스 상태를 보는 도구 # -m (main 메서드에 전달된 인자 확인) , -v (jvm전달된 인자 확인) jps -vm 12180 QuorumPeerMain config/zookeeper.properties -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/Users/diaz/utils/kafka_2.13-3.0.0/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/diaz/utils/kafka_2.13-3.0.0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties
kafka 브로커 실행 & 로그 확인
bin/kafka-server-start.sh -daemon config/server.properties jps -m 13828 Jps -m 13300 RemoteMavenServer36 12180 QuorumPeerMain config/zookeeper.properties 13301 RemoteMavenServer36 13240 13819 Kafka config/server.properties #로그 확인 diaz ~/utils/kafka_2.13-3.0.0 ls -la total 88 drwxr-xr-x 10 diaz staff 320 1 22 20:42 . drwxr-xr-x 11 diaz staff 352 1 22 20:35 .. -rw-r--r-- 1 diaz staff 14521 9 9 06:21 LICENSE -rw-r--r-- 1 diaz staff 28184 9 9 06:21 NOTICE drwxr-xr-x 40 diaz staff 1280 9 9 06:26 bin drwxr-xr-x 18 diaz staff 576 1 22 20:41 config drwxr-xr-x 100 diaz staff 3200 9 9 06:26 libs drwxr-xr-x 12 diaz staff 384 9 9 06:26 licenses drwxr-xr-x 16 diaz staff 512 1 22 21:27 logs drwxr-xr-x 3 diaz staff 96 9 9 06:26 site-docs diaz ~/utils/kafka_2.13-3.0.0 tail -f logs/server.log [2022-01-22 21:27:24,493] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
kafka topic 생성
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test-1 --partitions 3 --replication-factor 1
kafka topic list, 토픽 리스트 보기
#일반 조회 bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list test-1 #상세조회 ✘ diaz ~/utils/kafka_2.13-3.0.0 bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe -topic test-1 Topic: test-1 TopicId: tndv4GaSQUu2zA_4JCQyPg PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
생성한 토픽에 데이터 넣기
Record ( key, value ) 형태로 토픽에 삽입 된다.
아래의 경우는 메시지를 보낼때 key 값만 보내는 경우
diaz ~/utils/kafka_2.13-3.0.0 bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-1 >hello >world
key:value 를 함께 보내려면 아래와 같이 한다.
✘ diaz ~/utils/kafka_2.13-3.0.0 bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-1 --property "parse.key=true" --property "key.separator=:" hi:hello
토픽의 값 확인하기
✘ diaz ~/utils/kafka_2.13-3.0.0 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-1 --from-beginning hello world hi:hello # key:value모두 프린트 하기 bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-3 --property print.key=true --from-beginning
토픽의 레코드 삭제하기
# 먼저 삭제 대상의 정보가 담긴 json 파일을 작성한다. vi delete-topic.json {"partitions": [{"topic":"test-1", "partition":0, "offset": 2}], "version":1} # 이후 삭제한다. bin/kafka-delete-records.sh --bootstrap-server 127.0.0.1:9092 --offset-json-file delete-topic.json
브로커 실행 중단하기
bin/kafka-server-stop.sh
728x90'kafka' 카테고리의 다른 글
kafka could not be established. Broker may not be available (0) 2022.01.24