Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

news/2024/7/5 5:39:04

目录

  • Kafka的安装
  • 文件与配置
    • 目录
      • bin
      • config
    • 配置文件
      • server.properties
      • producer.properties
      • consumer.properties
  • 命令行简单使用
    • kafka-topics.sh
      • 新增
      • 查看列表
      • 查看详情
      • 修改
      • 删除
    • kafka-console-producer.sh
    • kafka-console-consumer.sh
  • 概念
    • 集群
    • 代理broker
    • 主题topic
    • 分区partition
    • 偏移量offset
    • 生产者producer
    • 消费者组consumer group
    • 消费者consumer
  • FAQ
    • 如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?
    • 如何设置分区和消费者数?
  • 参考


Kafka的安装

前提,已安装docker和docker-compose。
拉取镜像

docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest

docker-compose.yaml如下

version: '3'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENER=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

启动命令

docker-compose up -d

截图
在这里插入图片描述
之后的相关命令若涉及容器id,请自行更换

文件与配置

目录

docker exec -it a0 ls /opt/bitnami/kafka

查看目录命令

截图
在这里插入图片描述
重要目录解释如下:

  • bin: 脚本目录
  • config:配置目录
  • libs:第三方依赖库目录
  • logs:日志

bin

重要的shell脚本加粗了,之后会用

connect-distributed.sh kafka-dump-log.sh kafka-storage.sh
connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh
connect-standalone.sh kafka-get-offsets.sh kafka-topics.sh
kafka-acls.sh kafka-leader-election.sh kafka-transactions.sh
kafka-broker-api-versions.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh
kafka-cluster.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh
kafka-configs.sh kafka-mirror-maker.sh trogdor.sh
kafka-console-consumer.sh kafka-producer-perf-test.sh windows
kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh
kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh
kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh
kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh
kafka-delete-records.sh kafka-server-stop.sh

config

connect-console-sink.properties connect-mirror-maker.properties server.properties
connect-console-source.properties connect-standalone.properties tools-log4j.properties
connect-distributed.properties consumer.properties trogdor.conf
connect-file-sink.properties kraft zookeeper.properties
connect-file-source.properties log4j.properties
connect-log4j.properties producer.properties

配置文件

server.properties

  • broker.id: 唯一id值,通过环境变量设置为了1
  • log.dirs: kafka集群日志目录,默认是log.dirs=/bitnami/kafka/data
  • zookeeper.connect:zookeeper地址端口,格式域名/ip:port,这块是zookeeper:2181,在docker的网络中可以解析为另一容器的ip

更多配置可以查看参考中Dockerhub链接的Configuration部分
在这里插入图片描述

producer.properties

  • bootstrap.servers:kafka的ip:port,这里是localhost:9092
  • compression.type:压缩类型,默认是none, 一共有四种,none, gzip, snappy, lz4, zstd,推荐排序LZ4 > GZIP > Snappy,详见腾讯云压缩算法对比

consumer.properties

  • group.id:消费者组id,默认为test-consumer-group
  • auto.offset.reset:offset设置,三种latest, earliest, none,看情况设置

命令行简单使用

kafka-topics.sh

对主题topic进行增删改查的工具
在这里插入图片描述

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --create:创建主题
  • --delete:删除主题
  • --describe:描述主题
  • --list:查看主题列表
  • --alter:修改主题的 partitions等
  • --topic <String: topic>:主题名
  • --topic-id <String: topic-id>:主题id
  • --partitions <Integer: # of partitions>:主题的partition

新增

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic lady_killer9

截图
在这里插入图片描述

查看列表

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list

截图
在这里插入图片描述

查看详情

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic lady_killer9

截图
在这里插入图片描述

修改

命令
以修改主题partiion数量为例

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic lady_killer9 --partitions 3

截图
在这里插入图片描述

删除

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic lady_killer9

截图
在这里插入图片描述

kafka-console-producer.sh

标准输入读数据,发送到Kafka的工具
在这里插入图片描述
常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic <String: topic> :Kafka主题,必须的
  • --sync:同步发送
  • --compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-producer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图
在这里插入图片描述

kafka-console-consumer.sh

在这里插入图片描述
常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic <String: topic> :Kafka主题,必须的
  • --group <String: consumer group id>:消费者组id
  • --key-deserializer <String: deserializer for key>:key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --value-deserializer <String: deserializer for values>:value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --offset <String: consume offset>:消费的offset
  • --partition <Integer: partition>:消费的分区

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图
在这里插入图片描述
命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2

截图
在这里插入图片描述
上手之后我们再来了解一些概念。

概念

集群

已发布的消息保存在一组服务器中,称为Kafka集群。

代理broker

集群中的每一个服务器都是一个代理。

主题topic

每条发布到kafka集群的消息都有一个主题,这个主题被称为topic。每个topic都由一个或者多个分区构成。

分区partition

topic的partition数量可以在创建时配置,partition数量决定了每个消费者组中并发消费者的最大数量

分区的原则:

  • 生产者指定了partition,则直接使用
  • 未指定partition但指定了key,通过对key的value进行hash出一个partition
  • partition和key都未指定,使用轮询选出一个partition

偏移量offset

任何发布到partition的消息都会被直接追加到partition尾部,每条消息的位置称为offset,offset是一个long型数字,它唯一标记一条消息。消费者可以通过(topic、partition、offset)跟踪记录。

生产者producer

push消息到topc的叫生产者,push后可以获得offset。生产者可以指定partition,但不建议这么做。

消费者组consumer group

包含多个消费者,有一个 group id,可以订阅topic进行消费。消费偏移以消费者组为单位。

消费者consumer

从topic中pull数据,可以指定partition和offset。

FAQ

如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?

Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,因此,topic下只有一个partition时一定有序。

如何设置分区和消费者数?

建议分区数与消费者数一致,防止消费不过来。

参考

dockerhub-bitnami/kafka
腾讯云CKafka 压缩算法对比
python-kafka客户端封装


http://lihuaxi.xjx100.cn/news/1533418.html

相关文章

日志平台搭建第二章:Linux使用docker安装elasticsearch-head

一、elasticsearch-head的安装启动 #下载镜像 docker pull alivv/elasticsearch-head #启动 docker run -d --name eshead -p 9100:9100 alivv/elasticsearch-head 查看日志 docker logs -f eshead 出现如下证明启动成功 浏览器访问9100端口&#xff0c;出现以下页面也说明…

嵌入式系统设计与应用---嵌入式系统概述(学习笔记)

目录​​​​​​​ 嵌入式系统 概念 组成 嵌入式常用的操作系统 与PC机的区别 开发 软件开发 硬件开发 嵌入式处理器 分类 嵌入式系统 概念 以应用为中心&#xff0c;以计算机技术为基础&#xff0c;软硬件可载剪&#xff0c;适应对功能、可靠性、成本&#xff0c…

【干货】风控建模中把原始变量转成WOE实现(Python)

很多刚开始建模的同学,对原始变量转WOE都是一知半解,弄不清楚为什么要转WOE,也不清楚要怎么把变量转成WOE。对于WOE原理不清楚的小伙伴,可以先看下本公众号之前的文章:风控建模中的IV和WOE。本文重点讲解用Python中的toad库实现变量的WOE转换。 文章目录 一、WOE的定义二、…

三、C#—变量,表达式,运算符(3)

&#x1f33b;&#x1f33b; 目录 一、变量1.1 变量1.2 使用变量的步骤1.3 变量的声明1.4 变量的命名规则1.5 变量的初始化1.6 变量初始化的三种方法1.7 变量的作用域1.8 变量使用实例1.9 变量常见错误 二、C#数据类型2.1 数据类型2.2 值类型2.2.1 值类型直接存储值2.2.2 简单类…

AJAX学习笔记9 搜索联想自动补全

AJAX学习笔记8 跨域问题及解决方案_biubiubiu0706的博客-CSDN博客 其实就一个功能 搜索联想 自动补全 键盘按下事件keydown 键盘弹起事件keyup 做模糊查询 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><t…

LVGL在单色屏上运行

LVGL在单色屏上运行 一、环境准备二、实现步骤 本文主要讲述LVGL在单色屏应用中的实现思路&#xff0c;并已验证通过。网上关于lvgl移植的文章有很多了&#xff0c;本文不加以赘述。 一、环境准备 版本&#xff1a;LVGL 8.2屏幕&#xff1a;中景园ZJY213S0700屏幕驱动芯…

MySQL触发器使用指南大全

一、介绍 触发器是与表有关的数据库对象&#xff0c;指在insert/update/delete之前或之后&#xff0c;触发并执行触发器中定义的SQL语句集合。触发器的这种特性可以协助应用在数据库端确保数据的完整性&#xff0c;日志记录&#xff0c;数据校验等操作。 使用别名OLD和NEW来引…

DockerFile构建过程 实践

Dockerfile 1 DockerFile 构建过程 dockerfile是用来构建docker镜像文件&#xff01;命令参数脚本 构建步骤 dockerfiledocker build 构建一个镜像docker run 运行镜像docker push 发布一个镜像 dockerhub官方的镜像&#xff0c;会对应到github上一个仓库中的Dockerfile …