카프카 사용 해 보기
kafka 설치하기
설치 환경
- with vmbox, 2 core,
- ram: 4 GB (ref. 1 에서 최소 4GB가 없으면 kafka 설치가 fail 난다고 한다.)
- Ubuntu: ubuntu-20.04.1-desktop-amd64
- OpenJDK 11
절차
- kafka 용 계정 생성
- kafka 계정생성
- sudo group 에 추가: kafka dependency 들을 설치할 때 root 권한 필요, 설치가 끝난 이후에는 지우자.
- 만든 kafka 계정으로 login
- kafka binary 다운로드 및 압축풀기
- kafka server 설정하기(configurate)
- systemd unit file 들을 생성
- zoopkeeper를 위한 unit file 생성(kafka는 cluster state 와 cluster cofiguration을 관리하기 위해 zookeeper 를 사용한다.)
- kafka 를 위한 systemd service file 생성
- kafka 실행
- kafka 는 기본적으로 9092 port 를 사용한다.
- 설치가 잘됐는지 테스트
- 보안을 위한 계정 권한 설정
sudo deluser kafka sudo
sudo passwd kafka -l
sudo su - kafka
설치
1. kafka 용 계정 생성
sudo adduser kafka
sudo adduser kafka sudo
su -l kafka
2. download
kafka.*.tgz
는 한 71MB 정도 된다.
mkdir ~/downloads
curl "https://downloads.apache.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz" -o ~/downloads/kafka.tgz
# mirror site 는 아래와 같다.
curl "https://mirror.navercorp.com/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz" -o ~/downloads/kafka.tgz
3. 압축풀기
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/downloads/kafka.tgz --strip 1
kafka topic 으로 message 를 발행하게 된다.(publish) 이 topic 은 기본적으로 지울 수 없게 되어있다.
~/kafka/config/server.properties
이 kafka server 설정
파일이다.
############################# Mine #############################
# topic 을 삭제할 수 있게 한다.
delete.topic.enable = true
# log path 변경
log.dirs=/home/kafka/logs
4. systemd 등록
Zookeeper 와 kafka 에 대해서 .service 를 만든다. zookeeper 는 위에서 download 한 kafaka 안에 같이 있다.
/etc/systemd/system/
에 zookeeper.service
와 kafka.service
를 만들자. .service 의 내용을 보면
대략알겠지만, kafka 를 실행하기 전에 zookeeper 를 실행해야 한다. 그래서
kafka.service
를 실행할때 zookeeper.service 가 같이
실행되게 설정을 해놓는다.
sudo vi /etc/systemd/system/zookeeper.service
sudo vi /etc/systemd/system/kafka.service
zookeeper.service
# /etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
kafka.service
# /etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
5. 실행
실행하면 kafka 는 9092 port 를 사용한다.
sudo systemctl start kafka
sudo systemctl status kafka
kafka@myuser-VirtualBox:~$ sudo systemctl status kafka
● kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
Active: active (running) since Sat 2021-06-05 21:14:25 KST; 3min 20s ago
Main PID: 1992 (sh)
Tasks: 70 (limit: 4653)
Memory: 339.9M
CGroup: /system.slice/kafka.service
├─1992 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config>
└─1994 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingH>
6월 05 21:14:25 myuser-VirtualBox systemd[1]: Started kafka.service.
booting 시 자동실행
sudo systemctl enable zookeeper
sudo systemctl enable kafka
6. 테스트
MyTestTopic
이라는 topic 을 만들기
kafka@myuser-VirtualBox:~$ ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic MyTestTopic
Created topic MyTestTopic.
publish a message
message 를 topic 으로 날려보자.
bin/kafka-console-producer.sh
를 이용하면 된다. 이 script
는 kafka의 kafka.tools.ConsoleProducer
class 를 실행시켜
준다.
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic MyTestTopic > /dev/null
echo MYHEADER:CREATE,mytest:otherinfo$24${"test":"ttt"} | kafka-console-producer.sh --broker-list localhost:9092 --topic local.procurement.receiving --property "parse.key=true" --property "parse.headers=true" --property "key.separator=$" --property "headers.delimiter=$"
consume a message
topic 에서 message 가져와 보자. 아래처럼 명령어를 넣으면 된다. 이
script 는 kafka.tools.ConsoleConsumer
class를 실행하게
된다.
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyTestTopic --from-beginning
실행하면 계속 대기하게 된다. 이 상태에서 계속 새로운 message 를 publish 하면 그 message 를 볼 수 있을 것이다.
kafka@myuser-VirtualBox:~$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyTestTopic --from-beginning
Hello, World
kafka topic 삭제
~/kafka/bin/windows/kafka-topics.sh –bootstrap-server localhost:9092 –delete –topic mytopic
7. kafka 계정의 직접 로그인 막기
kafka 계정으로 sudo 를 사용할 수 없게 하고, 직접 로그인이 되게 막는다.
sudo deluser kafka sudo
sudo passwd kafka -l
sudo su - kafka
직접로그인을 풀려면 아래처럼 해주면 된다.
sudo passwd kafka -u
8. 기타
ref. 1 에서 kafkaT 를 추천한다.
Kafka Administration and Monitoring UI Tools - DZone Big Data:
- Kafdrop 을 괜찮다고 하는 듯 하다. : 개인적으로 docker run으로 사용 해봤지만 node assignment 이 안돼서 결국 사용하지 못했다.(Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.)
- CMAK 은 관리용 도구라
한다. CMAK 으로는 kafka message 를 확인할 수는 없다고 한다.
- windows 에서 개발용으로는 Offset Explorer 이 간편하다. 다만 string 이 그냥 hex 값으로 보인다.
- https://github.com/provectus/kafka-ui
Download Magic :: Kafka Magic Tool: 무료버전도 쓸만하다.
sudo apt install ruby ruby-dev build-essential
sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
vi ~/.kafkatcfg
kafkat partitions
~/.kafkatcfg
{
"kafka_path": "~/kafka",
"log_path": "/home/kafka/logs",
"zk_path": "localhost:2181"
}
댓글 없음:
댓글 쓰기