网站首页 > 技术文章 正文
刚接触kafka时,对于kafka创建topic命令甚是疑惑,有的资料创建topic命令中有的用的是bootstrap.server 有的用的是zookeeper 。也没有说是什么原因,k刚开始搞不懂zookeeper 和bootstrap.server 有什么不同。其实很简单,只是kafka不同版本使用方式不同而已。
这里对于kafka版本用新旧来区分。
旧版本(< v2.2)Kafka的参数
kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic topic_name
新版本(>= v2.2)Kafka的参数
kafka-topics.sh --bootstrap-server node1:9092 --create --topic topic_name
其中,2181是zookeeper 的监听端口,9092是kafka的监听端口。
旧版本用--zookeeper参数,主机名(或主机IP)和端口用ZooKeeper的,就是server.properties文件中zookeeper.connect属性的配置值
新版本用--bootstrap-server参数,主机名(或主机IP)和端口用某个节点的即可,即主机名(或主机IP)9092。
查看kafka版本
kafka并没有直接提供查看version的命令。但也没关系,我们可以通过进入kafka/libs文件夹,libs下的文件名称中就包含kafka版本信息,蓝色中的2.2.1就是我们安装的kafka的版本。
除了这种方式可以查看kafka版本外,我们还可以通过下面这种方式查看,
进入kafka目录,执行下面这个命令。
find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
其中2.2.1就是我们要找的kafka版本。
认真踏实的你,相信你对前面的2.11是什么很好奇。2.11是Scala版本,2.2.1是Kafka版本。
创建topic命令
知道kafka版本号后,按照前面提供的创建命令,我们对号入座。
./kafka-topics.sh --create --topic topic_name --replication-factor 3 --partitions 3 --bootstrap-server node1:9092,node2:9092,node3:9092
- -topic 指定topic
- -partitions指定分区数
需要注意的是分区数越多,在一定程度上会提高消息处理的吞吐量,因为kafka是基于文件进行读写,因此也需要打开更多的文件句柄,也会增加一定的性能开销。
但分区数也不宜过得,如果分区数过多,日志分段也会因此多很多,写的时候对I/O对性能影响很大。
–replication-factor
用来设置主题的副本数。
每个主题可以有多个副本,副本位于集群中不同的broker上,也可以说副本的数量不能超过broker的数量。
查看topic命令
想确认下topic创建是否成功,我们可以查看topic列表,看看刚才创建的topic是否在这个列表中,有的话就表示创建成功了。
./kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
也可以通过查看topic内容来确认。
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topic_name
删除topic命令
跟mysql数据库一样,topic可以创建,理所当然可以删除。
./kafka-topics.sh --delete --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topic_name
查看消费组消费进度
./kafka-consumer-groups.sh --describe --bootstrap-server node1:9092,node2:9092,node3:9092 --group consumer_group_name
重置消费组位移
./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --group consumer_group_name --all-topics --execute --reset-offsets --to-datetime 2022-08-07T16:00:00.000
需要注意的是时区问题。
由于笔者水平有限,文中纰漏之处在所难免,欢迎批评指正。
- 上一篇: Linux 如何查看应用程序进程号、端口等等
- 下一篇: 将kafka集群端口映射到公网访问
猜你喜欢
- 2024-12-12 Kafka监控与指标解析-UnderReplicatedPartitions
- 2024-12-12 聊聊 Kafka: Consumer 源码解析之 ConsumerNetworkClient
- 2024-12-12 为什么Kafka依赖ZooKeeper?
- 2024-12-12 一款Kafka可视化Web界面管理工具:CMAK
- 2024-12-12 MongoDB 数据同步kafka
- 2024-12-12 kafka快速入门到精通
- 2024-12-12 SpringBoot集成Kafka+Kafka优化问题
- 2024-12-12 kafka consumer 配置详解
- 2024-12-12 kafka生产者配置详解
- 2024-12-12 Kafka两种集群详解和搭建教程
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- c语言min函数头文件 (77)
- asynccallback (87)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 无效的列索引 (74)