三、Kafka生产者

news/2024/7/5 6:05:08

目录

    • 3.1 生产者消息发送流程
      • 3.1.1 发送原理
    • 3.2 异步发送 API
    • 3.3 同步发送数据
    • 3.4 生产者分区
      • 3.4.1 kafka分区的好处
      • 3.4.2 生产者发送消息的分区策略
      • 3.4.3 自定义分区器
    • 3.5 生产者如何提高吞吐量
    • 3.6 数据可靠性

3.1 生产者消息发送流程

3.1.1 发送原理

3.2 异步发送 API

3.3 同步发送数据

3.4 生产者分区

3.4.1 kafka分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

3.4.2 生产者发送消息的分区策略

在这里插入图片描述

3.4.3 自定义分区器

1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区

2、定义类实现 Partitioner 接口,重写 partition()方法。

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取数据 atguigu  hello
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("atguigu")){
            partition = 0;
        }else {
            partition = 1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3、使用分区器的方法,在生产者的配置中添加分区器参数

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 关联自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null) {
                        System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

在这里插入图片描述



3.5 生产者如何提高吞吐量

  • 分批次发送消息
  • 对生产者消息采用压缩

四个重要参数:
在这里插入图片描述

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

3.6 数据可靠性


http://lihuaxi.xjx100.cn/news/1465266.html

相关文章

Python学习笔记第六十六天(Matplotlib 散点图)

Python学习笔记第六十六天 Matplotlib 散点图散点图 scatter()颜色条 Colormap 后记 Matplotlib 散点图 我们可以使用 pyplot 中的 scatter() 方法来绘制散点图。 散点图 scatter() scatter() 方法语法格式如下&#xff1a; matplotlib.pyplot.scatter(x, y, sNone, cNone,…

C++ 加号运算符重载

通过重载运算符&#xff0c;实现用运算符操作自定义的数据类型。 语法&#xff1a;一个正常的成员函数 把函数名改为operator加上一个操作符。 情况一&#xff08;用成员函数重载运算符&#xff09;&#xff1a; #include<iostream> using namespace std; class Perso…

设计模式——依赖倒转原则

文章目录 基本介绍应用实例依赖关系传递的三种方式和应用案例1, 接口传递&#xff0c;应用案例代码2, 构造方法传递&#xff0c;应用案例代码3, setter 方式传递&#xff0c;应用案例代码 依赖倒转原则的注意事项和细节 基本介绍 依赖倒转原则(Dependence Inversion Principle…

stm32单片机/51单片机蜂鸣器不响(proteus模拟)

蜂鸣器不发生原因就1个&#xff1a;电压不够 所以需要提高蜂鸣器2端的电压&#xff1a;可以采用的方法有&#xff1a; 1提高蜂鸣器电阻&#xff0c;这样根据分压原理&#xff0c;可以提升蜂鸣器2段电压 2更改蜂鸣器的工作电压为更小的值&#xff0c;这个可以通过在proteus内…

Gin+微服务实现抖音视频上传到七牛云

文章目录 安装获取凭证Gin处理微服务处理 如果你对Gin和微服务有一定了解&#xff0c;看本文较容易。 安装 执行命令&#xff1a; go get github.com/qiniu/go-sdk/v7获取凭证 Go SDK 的所有的功能&#xff0c;都需要合法的授权。授权凭证的签算需要七牛账号下的一对有效的A…

2023前端面试笔记 —— HTML5(持续收集更新)

系列文章目录 内容链接2023前端面试笔记HTML5 文章目录 系列文章目录前言一、HTML 文件中的 DOCTYPE 是什么作用二、HTML、XML、XHTML 之间有什么区别三、前缀为 data- 开头的元素属性是什么四、谈谈你对 HTML 语义化的理解五、HTML5 对比 HTML4 有哪些不同之处六、meta 标签有…

【C++随笔01】联合体union —— 一种节省空间的类

【C随笔01】联合体union —— 一种节省空间的类 一、联合体&#xff08;union)二、定义三、用法1、定义union、访问union成员2、匿名union3、使用类管理union成员4、管理并销毁string 一、联合体&#xff08;union) 联合体是一种特殊的类。一个union可以有多个数据成员&#x…

茂名 湛江阳江某学校 ibm x3850服务器维修经历

简介&#xff1a;中国广东省阳江市某中学联想 IBM System x3850 x6服务器维修 io板故障处理经历分享&#xff1a; 这一天一位阳江的老师经其他学校老师介绍推荐对接我&#xff0c;说他们学校有一台ibm服务器出问题了&#xff0c;老师大致跟我描述了一下这台服务器发生故障的前…