ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • kafka 기본 사용법
    kafka 2022. 1. 20. 09:36
    728x90

    kafka 스터디 내용 정리


    kafka 설치

    이 글 작성 시점엔 2.13 버전, stable버전 설치 하였음

    https://kafka.apache.org/downloads

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

    wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
    
    tar -xvf kafka_2.13-3.0.0.tgz

     

    kafka broker 실행 옵션 설정

    추후 운영중 옵션을 바꿀경우에 브로커를 재시작 해야하므로 주의해야 한다.

    vi config/server/properties
    
    ...
    
    #이후 아래와 같이 설정 후 저장
    #ec2등에서 실행할 경우 혹은 다른 경우에 ip를 입력. 나의 경우는 로컬 맥북에서 테스트 하여 127.0.0.1 입력
    advertised.listeners=PLAINTEXT://127.0.0.1:9092

     

    zookeeper 시작

    # -daemon , 백그라운드 모드로 실행한다.
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    
    #jps -vm 으로 동작 확인, jps 는 JVM 프로세스 상태를 보는 도구
    # -m (main 메서드에 전달된 인자 확인) , -v (jvm전달된 인자 확인)
    jps -vm
    12180 QuorumPeerMain config/zookeeper.properties -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/Users/diaz/utils/kafka_2.13-3.0.0/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/diaz/utils/kafka_2.13-3.0.0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties

     

    kafka 브로커 실행 & 로그 확인

    bin/kafka-server-start.sh -daemon config/server.properties
    jps -m
    
    13828 Jps -m
    13300 RemoteMavenServer36
    12180 QuorumPeerMain config/zookeeper.properties
    13301 RemoteMavenServer36
    13240
    13819 Kafka config/server.properties
    
    #로그 확인
    diaz ~/utils/kafka_2.13-3.0.0 ls -la
    total 88
    drwxr-xr-x   10 diaz  staff    320  1 22 20:42 .
    drwxr-xr-x   11 diaz  staff    352  1 22 20:35 ..
    -rw-r--r--    1 diaz  staff  14521  9  9 06:21 LICENSE
    -rw-r--r--    1 diaz  staff  28184  9  9 06:21 NOTICE
    drwxr-xr-x   40 diaz  staff   1280  9  9 06:26 bin
    drwxr-xr-x   18 diaz  staff    576  1 22 20:41 config
    drwxr-xr-x  100 diaz  staff   3200  9  9 06:26 libs
    drwxr-xr-x   12 diaz  staff    384  9  9 06:26 licenses
    drwxr-xr-x   16 diaz  staff    512  1 22 21:27 logs
    drwxr-xr-x    3 diaz  staff     96  9  9 06:26 site-docs
    diaz ~/utils/kafka_2.13-3.0.0 tail -f logs/server.log
    [2022-01-22 21:27:24,493] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)

     

    kafka topic 생성

    bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test-1 --partitions 3 --replication-factor 1

     

    kafka topic list, 토픽 리스트 보기

    #일반 조회
    bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
    test-1
    
    #상세조회
     ✘ diaz ~/utils/kafka_2.13-3.0.0  bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe -topic test-1
    Topic: test-1	TopicId: tndv4GaSQUu2zA_4JCQyPg	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824
    	Topic: test-1	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    	Topic: test-1	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
    	Topic: test-1	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

     

    생성한 토픽에 데이터 넣기

    Record ( key, value ) 형태로 토픽에 삽입 된다.

    아래의 경우는 메시지를 보낼때 key 값만 보내는 경우

    diaz ~/utils/kafka_2.13-3.0.0  bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-1
    >hello
    >world

     

    key:value 를 함께 보내려면 아래와 같이 한다.

     ✘ diaz ~/utils/kafka_2.13-3.0.0  bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test-1 --property "parse.key=true" --property "key.separator=:"
    hi:hello

     

    토픽의 값 확인하기

     ✘ diaz ~/utils/kafka_2.13-3.0.0  bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-1 --from-beginning
    hello
    world
    hi:hello
    
    # key:value모두 프린트 하기
    bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-3 --property print.key=true --from-beginning

     

    토픽의 레코드 삭제하기

    # 먼저 삭제 대상의 정보가 담긴 json 파일을 작성한다.
    vi delete-topic.json
    {"partitions": [{"topic":"test-1", "partition":0, "offset": 2}], "version":1}
    
    # 이후 삭제한다.
    bin/kafka-delete-records.sh --bootstrap-server 127.0.0.1:9092 --offset-json-file delete-topic.json

     

    브로커 실행 중단하기

    bin/kafka-server-stop.sh
    728x90

    'kafka' 카테고리의 다른 글

    kafka could not be established. Broker may not be available  (0) 2022.01.24
Designed by Tistory.