使用Canal实现mysql binlog增量订阅数据

news/2024/7/5 4:35:18

目录

前言

简单原理

1.mysql数据库开启Binlog模式

1.docker 安装 canal 服务端

3.实现canal客户端


前言

是由公司业务改造搜索功能,使用ES搜索引擎中间件,那么我们需要将mysql中的数据同步至ES服务中,最总选择使用alibaba的canal增量订阅和解析工具。

简单原理

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

1.mysql数据库开启Binlog模式

注:如未安装mysql可参考《docker环境中安装mysql》进行安装。

针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限,无需任何操作,可直接进行下一步

docker进入已安装的mysql容器,找到my.cnf文件编辑

docker exec -it 容器id bash

ls
cd etc
ls

 编辑my.cnf文件

vim my.cnf

文件内容追加以下配置并保存,最后重启mysql容器

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

知识点补充:

mysql binlog的格式有三种:STATEMENT、ROW、MIXED

  1. ROW 模式(一般就用它)
    日志会记录每一行数据被修改的形式,不会记录执行 SQL 语句的上下文相关信息,只记录要修改的数据,哪条数据被修改了,修改成了什么样子,只有 value,不会有 SQL 多表关联的情况。
    优点:它仅仅只需要记录哪条数据被修改了,修改成什么样子了,所以它的日志内容会非常清楚地记录下每一行数据修改的细节,非常容易理解。
    缺点:ROW 模式下,特别是数据添加的情况下,所有执行的语句都会记录到日志中,都将以每行记录的修改来记录,这样会产生大量的日志内容。
  2. STATEMENT 模式
    每条会修改数据的 SQL 语句都会被记录下来。
    缺点:由于它是记录的执行语句,所以,为了让这些语句在 slave 端也能正确执行,那他还必须记录每条语句在执行过程中的一些相关信息,也就是上下文信息,以保证所有语句在 slave 端被执行的时候能够得到和在 master 端执行时候相同的结果。
    但目前例如 step()函数在有些版本中就不能被正确复制,在存储过程中使用了 last-insert-id()函数,可能会使 slave 和 master 上得到不一致的 id,就是会出现数据不一致的情况,ROW 模式下就没有。
  3. MIXED 模式
    以上两种模式都使用。

引用至:Canal:同步mysql增量数据工具,一篇详解核心知识点 - 知乎

新建一个mysql账号用于canal连接(不建议使用root账号),并具有作为 MySQL slave 的权限

# 新增账号
CREATE USER canal IDENTIFIED BY 'canal';  
#设置账号操作权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
#刷新权限配置
FLUSH PRIVILEGES;

1.docker 安装 canal 服务端

拉取最新版本canal-server镜像

docker pull canal/canal-server:latest

构建canal-server容器

docker run --name canal -p 11111:11111 -d canal/canal-server:latest

查看安装情况,并进入容器

docker ps
docker exec -it 容器id bash

 找到文件instance.properties

 编辑instance.properties

vim instance.properties

具体修改配置

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

主要关注属性:

canal.instance.master.address = 数据库ip地址:3306   

canal.instance.dbUsername = 数据库账号  

canal.instance.dbPassword = 数据库密码

canal.instance.filter.regex = .\*\\\\..\*  关注的表(默认全部表数据)配置的过滤正则,更多过滤格式可参考:Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑_Alice_qixin的博客-CSDN博客_canalconnector

 最后保存配置文件退出,并重启canal-server容器

3.实现canal客户端

核心代码实现

public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服务器ip地址",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 1000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

                System.out.println("message:" + message);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("entry======:" + entry);
            CanalEntry.EventType eventType = rowChage.getEventType();

            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

运行main方法,并查看控制台日志

 在这期间,操作关注的数据库表数据

 查看控制台日志,显示为update操作,获取binlog成功

注:在实际生产业务中可把canal逻辑放在定时任务中,定时获取mysql binlog 数据同步

 


http://lihuaxi.xjx100.cn/news/836916.html

相关文章

PT100温度采集电路设计

PT100是正温度系数的热敏电阻&#xff0c;顾名思义&#xff0c;随着温度的升高&#xff0c;电阻的阻值变大&#xff1b;相反&#xff0c;如果随着温度的升高&#xff0c;电阻的阻值变小&#xff0c;就是负温度系数的热敏电阻。之所以叫做PT100&#xff0c;是因为在0度时其阻值为…

【JavaEE】Java中复杂的Synchronized关键字

目录 一、synchronized的特性 &#xff08;1&#xff09;互斥 &#xff08;2&#xff09;刷新内存 &#xff08;3&#xff09;可重入 二、synchronized的使用 &#xff08;1&#xff09;修饰普通方法 &#xff08;2&#xff09;修饰静态方法 &#xff08;3&#xff09;修…

nohup后台启动程序jar包的时候进行定时按时间日期分割日志

在springboot应用开发中&#xff0c;常用jar方式进行部署&#xff0c;用nohup后台启动&#xff0c;这样生成的日志文件会越来越大&#xff0c;导致日志文件打开很慢&#xff0c;不方便后续问题的定位和解决。所以需要对日志进行分割&#xff0c;下面主要介绍按日期分割日志。话…

Python:每日一题之观光公交(前缀和)

题目描述 风景迷人的小城 Y 市&#xff0c;拥有 n 个美丽的景点。由于慕名而来的游客越来越多&#xff0c;Y 市特意安排了一辆观光公交车&#xff0c;为游客提供更便捷的交通服务。观光公交车在第 0 分钟出现在 1 号景点&#xff0c;随后依次前往 2、3、4……n 号景点。从第 i…

Linux设置开机自启keepalived+nginx服务

目录&#xff1a; 目录 背景&#xff1a; 分析过程&#xff1a; 解决方案&#xff1a; 解决方案一&#xff1a; 解决方案二&#xff1a; 背景&#xff1a; 在工作突发遇见了Linux虚拟机所在的宿主机重启了&#xff0c;虚拟机上部署nginxkeepalived服务&#xff0c;但是…

Hudi(21):Hudi集成Flink之核心原理分析

目录 0. 相关文章链接 1. 数据去重原理 1.1. 消息版本新旧 1.2. 攒消息阶段的去重 1.3. 写 parquet 增量消息的去重 1.4. 跨 partition 的消息去重 2. 表写入原理 2.1. 数据写入分析 2.2. 数据压缩 2.3. 数据清理 2.4. Job图 3. 表读取原理 0. 相关文章链接 Hudi文…

Hadoop三大框架

一、Hadoop是什么Hadoop是一个由apache开发的分布式系统基础架构。主要解决海量数据的存储和海量数据的分析计算问题。广义上来说&#xff0c;Hadoop通胀指一个更宽泛的概念——Hadoop生态圈1、Hadoop优势高可靠性&#xff1a;Hadoop底层维护多个数据副本&#xff0c;即使Hadoo…

es-Mapping

文章目录es-Mapping概念查看mappingES数据类型两种映射类型映射参数es-Mapping 概念&#xff1a;映射是定义文档及其包含的字段的存储和索引方式的过程两种映射方式 dynamic mapping&#xff08;动态映射或自动映射&#xff09;expllcit mapping &#xff08;静态映射或手动映…