kafka

kafka

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息订阅系统.

zookeeper 和 kafka依赖JDK环境

kafka特性
  • 高吞吐量、低延迟
  • 可扩展性
  • 持久性、可靠性
  • 容错性
  • 高并发
kafka的使用场景
  • 日志收集
  • 消息系统
  • 用户活动跟踪
  • 运营指标
  • 流式处理
  • 事件源
zookeeper安装

下载安装

1
2
3
4
5
6
7
cd /usr/local/src/

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

tar zxf zookeeper-3.4.14.tar.gz

mv zookeeper-3.4.14 /usr/local/zookeeper

修改配置和启动

1
2
3
4
5
cd /usr/local/zookeeper/

cp conf/zoo_sample.cfg conf/zoo.cfg

./bin/zkServer.sh start

kafka安装(单节点多Broker)

下载kafka

1
2
3
4
5
6
7
cd /usr/local/src/

wget http://archive.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

tar zxf kafka_2.11-2.2.0.tgz

mv kafka_2.11-2.2.0 /usr/local/kafka

修改配置启动

1
2
3
4

cd /usr/local/kafka/

vim config/server.properties

配置如下,主要是修改#的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

egrep -v "^#|^$" config/server.properties


broker.id=0
listeners=PLAINTEXT://:9092 #监听端口
advertised.listeners=PLAINTEXT://193.112.46.206:9092 #监听地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/log/kafka #数据存在目录
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

复制一份配置文件

​ cp config/server.properties config/server-1.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
修改broker.id 、监听端口、监听地址和数据存放路径

egrep -v "^#|^$" config/server-1.properties

broker.id=1
listeners=PLAINTEXT://:9093
advertised.listeners=PLAINTEXT://193.112.46.206:9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/log/kafka1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

启动kafka

1
2
3
./bin/kafka-server-start.sh ./config/server.properties

./bin/kafka-server-start.sh ./config/server-1.properties
kafka常用指令

创建一个测试topic,replication-factor X 多少个副本,一般大于等于2,或等于broker数量 –partitions X 多少个分区

1
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 2 --topic test

查看所有topic

1
2

./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看主题详情

1
./bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test

删除主题详情

1
./bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test

打开一个运行窗口,生产者查看,输入字符内存

1
./bin/kafka-console-producer.sh --broker-list 127.0.0.1:2181 --topic test

打开另外一个运行窗口,消费者查看,可以看到接受到的字符内容

1
./bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
python操作kafka

安装kafka-python

1
pip3 install kafka-python

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: UTF-8 -*-

import time
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers = ['193.112.46.206:9092', '193.112.46.206:9093'])

# Assign a topic

topic = 'test'

def main():
print('begin')
for n in range(100):
message = "I am No.{}".format(n)

# for i in range(100):

producer.send(topic,message.encode(encoding="utf-8") )
print(message)
time.sleep(0.5)
print('done')

if __name__ == '__main__':
main()

消费者

1
2
3
4
5
6
7
8
9
-*- coding: UTF-8 -*-
from kafka import KafkaConsumer
#earliest获取最早的、latest获取最新的
consumer = KafkaConsumer('test', group_id="group2",bootstrap_servers= ['193.112.46.206:9092', '193.112.46.206:9093'],auto_offset_reset='latest',enable_auto_commit=False)
for msg in consumer:
print(msg)
print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(msg.timestamp/1000)))
#commit_async()异步提交,commit()同步提交
consumer.commit_async()

消费者实际情况中用到更多的是poll模式,提交方式为同步和异步结合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""
同步和异步组合的方式提交偏移量
"""

import pickle
import uuid
import time
from kafka import KafkaConsumer

class KafkaManage(object):
def __init__(self,kafka_hosts=["localhost:9092"],auto_offset_reset='latest',enable_auto_commit=False):
self.kafka_hosts = kafka_hosts
self.auto_offset_reset = auto_offset_reset
self.enable_auto_commit = enable_auto_commit

def kafka_consumer(self,group_name):
consumer = KafkaConsumer(
bootstrap_servers = self.kafka_hosts,
group_id = group_name,
client_id="{}".format(str(uuid.uuid4())),
auto_offset_reset = self.auto_offset_reset,
enable_auto_commit = self.enable_auto_commit
)
return consu
def kafka_consumer_topic(self,topic,group_name):
kafka_consumer = self.kafka_consumer(group_name)
kafka_consumer.subscribe(topics=(topic,))
return kafka_consu
def _on_send_response(self,*args, **kwargs):
"""
提交偏移量涉及的回调函数
:param args:
:param kwargs:
:return:
"""
if isinstance(args[1], Exception):
print('偏移量提交异常. {}'.format(args[1]))
else:
print('偏移量提交成功. {}'.format(args[1]))

def get_kafka_messages(self,topic,group_name):
consumer = self.kafka_consumer_topic(topic, group_name)
return consumer

def get_kafka_messages_poll(self,topic,group_name,timeout_ms=1000):
consumer = self.kafka_consumer_topic(topic,group_name)
try:
while True:
consumer_records = consumer.poll(timeout_ms)
for k, records in consumer_records.items():
for record in records:
print("topic = {},partition = {},offset = {},key = {},value = {}".format(
record.topic, record.partition, record.offset, record.key, record.value))

message = {
"topic": record.topic,
"partition": record.partition,
"key": record.key,
"value": record.value
}

try:
# 轮询一个batch 手动提交一次
consumer.commit_async(callback=self._on_send_response)
except Exception as e:
print('commit failed', str(e))
except Exception as e:
print(str(e))
finally:
try:
# 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
consumer.commit()
print("同步补救提交成功")
except Exception as e:
consumer.close()

if __name__ == '__main__':
k = KafkaManage(['193.112.46.206:9092', '193.112.46.206:9093'])
data = k.get_kafka_messages("test1","gourp")
for msg in data:
print(msg)
-------------本文结束-------------