ES同步-《GO开发知识笔记》

admin 2025-11-04 01:33:45 编程 来源:ZONE.CI 全球网 0 阅读模式
  • Canal服务端安装">Canal服务端安装
    • 1、打开MySQL的binlog日志">1、打开MySQL的binlog日志
    • 2、设置MySQL的配置">2、设置MySQL的配置
    • 3、设置RabbitMQ的配置">3、设置RabbitMQ的配置
    • 4、RabbitMQ新建exchange和Queue">4、RabbitMQ新建exchange和Queue
    • 5、启动服务端">5、启动服务端
    • 6、测试">6、测试
  • Canal客户端搭建">Canal客户端搭建
    • 1、创建消息实体类">1、创建消息实体类
    • 2、MQ消息监听业务">2、MQ消息监听业务
    • 3、测试">3、测试

    image.png

    Canal服务端安装

    服务端需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases目前最新的是v1.1.5 ,点击下载:ES同步 - 图2下载完成解压,目录如下:ES同步 - 图3本文使用Canal+RabbitMQ 进行数据的同步,因此下面步骤完全按照这个base进行。

    1、打开MySQL的binlog日志

    修改MySQL的日志文件,my.cnf 配置如下:

    1. [mysqld]
    2. log-bin=mysql-bin # 开启 binlog
    3. binlog-format=ROW # 选择 ROW 模式
    4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

    2、设置MySQL的配置

    需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

    1. # url
    2. canal.instance.master.address=127.0.0.1:3306
    3. # username/password
    4. canal.instance.dbUsername=root
    5. canal.instance.dbPassword=root
    6. # 监听的数据库
    7. canal.instance.defaultDatabaseName=test
    8. # 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
    9. canal.instance.filter.regex=.*\\..*

    3、设置RabbitMQ的配置

    服务端默认的传输方式是tcp ,需要在配置文件中设置MQ 的相关信息。这里需要修改两处配置文件,如下;1、canal.deployer-1.1.5\conf\canal.properties这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码…

    1. # 传输方式:tcp, kafka, rocketMQ, rabbitMQ
    2. canal.serverMode = rabbitMQ
    3. ##################################################
    4. ######## RabbitMQ #############
    5. ##################################################
    6. rabbitmq.host = 127.0.0.1
    7. rabbitmq.virtual.host =/
    8. # exchange
    9. rabbitmq.exchange =canal.exchange
    10. # 用户名、密码
    11. rabbitmq.username =guest
    12. rabbitmq.password =guest
    13. # 是否持久化
    14. rabbitmq.deliveryMode = 2

    2、canal.deployer-1.1.5\conf\example\instance.properties这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

    1. canal.mq.topic=canal.routing.key

    4、RabbitMQ新建exchange和Queue

    在RabbitMQ中需要新建一个canal.exchange (必须和配置中的相同)的exchange和一个名称为 canal.queue (名称随意)的队列。其中绑定的路由KEY为:canal.routing.key (必须和配置中的相同),如下图:ES同步 - 图4

    5、启动服务端

    点击bin目录下的脚本,windows直接双击startup.bat ,启动成功如下:ES同步 - 图5

    6、测试

    在本地数据库test 中的oauth_client_details 插入一条数据,如下:

    1. INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');

    此时查看MQ中的canal.queue 已经有了数据,如下:ES同步 - 图6其实就是一串JSON数据,这个JSON如下:

    1. {
    2. "data": [{
    3. "client_id": "myjszl",
    4. "resource_ids": "res1",
    5. "client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
    6. "scope": "all",
    7. "authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
    8. "web_server_redirect_uri": "http://www.baidu.com",
    9. "authorities": null,
    10. "access_token_validity": "1000",
    11. "refresh_token_validity": "1000",
    12. "additional_information": null,
    13. "autoapprove": "false"
    14. }],
    15. "database": "test",
    16. "es": 1640337532000,
    17. "id": 7,
    18. "isDdl": false,
    19. "mysqlType": {
    20. "client_id": "varchar(48)",
    21. "resource_ids": "varchar(256)",
    22. "client_secret": "varchar(256)",
    23. "scope": "varchar(256)",
    24. "authorized_grant_types": "varchar(256)",
    25. "web_server_redirect_uri": "varchar(256)",
    26. "authorities": "varchar(256)",
    27. "access_token_validity": "int(11)",
    28. "refresh_token_validity": "int(11)",
    29. "additional_information": "varchar(4096)",
    30. "autoapprove": "varchar(256)"
    31. },
    32. "old": null,
    33. "pkNames": ["client_id"],
    34. "sql": "",
    35. "sqlType": {
    36. "client_id": 12,
    37. "resource_ids": 12,
    38. "client_secret": 12,
    39. "scope": 12,
    40. "authorized_grant_types": 12,
    41. "web_server_redirect_uri": 12,
    42. "authorities": 12,
    43. "access_token_validity": 4,
    44. "refresh_token_validity": 4,
    45. "additional_information": 12,
    46. "autoapprove": 12
    47. },
    48. "table": "oauth_client_details",
    49. "ts": 1640337532520,
    50. "type": "INSERT"
    51. }

    每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值…..客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

    Canal客户端搭建

    客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue 这个队列。

    1、创建消息实体类

    MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:

    1. /**
    2. * Canal消息接收实体类
    3. */
    4. @NoArgsConstructor
    5. @Data
    6. public class CanalMessage<T> {
    7. @JsonProperty("type")
    8. private String type;
    9. @JsonProperty("table")
    10. private String table;
    11. @JsonProperty("data")
    12. private List<T> data;
    13. @JsonProperty("database")
    14. private String database;
    15. @JsonProperty("es")
    16. private Long es;
    17. @JsonProperty("id")
    18. private Integer id;
    19. @JsonProperty("isDdl")
    20. private Boolean isDdl;
    21. @JsonProperty("old")
    22. private List<T> old;
    23. @JsonProperty("pkNames")
    24. private List<String> pkNames;
    25. @JsonProperty("sql")
    26. private String sql;
    27. @JsonProperty("ts")
    28. private Long ts;
    29. }

    2、MQ消息监听业务

    接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。代码很简单,只是给出个接收的案例,具体的业务逻辑可以根据业务实现,如下:

    1. import cn.hutool.json.JSONUtil;
    2. import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
    3. import lombok.RequiredArgsConstructor;
    4. import lombok.extern.slf4j.Slf4j;
    5. import org.springframework.amqp.rabbit.annotation.Exchange;
    6. import org.springframework.amqp.rabbit.annotation.Queue;
    7. import org.springframework.amqp.rabbit.annotation.QueueBinding;
    8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    9. import org.springframework.stereotype.Component;
    10. /**
    11. * 监听MQ获取Canal增量的数据消息
    12. */
    13. @Component
    14. @Slf4j
    15. @RequiredArgsConstructor
    16. public class CanalRabbitMQListener {
    17. @RabbitListener(bindings = {
    18. @QueueBinding(
    19. value = @Queue(value = "canal.queue", durable = "true"),
    20. exchange = @Exchange(value = "canal.exchange"),
    21. key = "canal.routing.key"
    22. )
    23. })
    24. public void handleDataChange(String message) {
    25. //将message转换为CanalMessage
    26. CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
    27. String tableName = canalMessage.getTable();
    28. log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
    29. //TODO 业务逻辑自己完善...............
    30. }
    31. }

    3、测试

    下面向表中插入数据,看下接收的消息是什么样的,SQL如下:

    1. INSERT INTO `oauth_client_details`
    2. VALUES
    3. ( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );

    客户端转换后的消息如下图:ES同步 - 图7上图可以看出所有的数据都已经成功接收到,只需要根据数据完善自己的业务逻辑即可。

    以太坊cppgolang区别 编程

    以太坊cppgolang区别

    以太坊是一种去中心化的开源平台,它采用智能合约技术,旨在构建和运行不受干扰的分布式应用程序。作为目前最受欢迎的区块链平台之一,以太坊提供了多种编程语言的支持,其
    progolang 编程

    progolang

    Go语言(Golang)是由Google开发的一门静态类型编程语言。作为一名专业的Golang开发者,我深知这门语言的优势和特点。在本文中,我将介绍Golang
    golangn个发送者 编程

    golangn个发送者

    Golang是一种开源的编程语言,由Google团队开发,旨在提高程序的并发性和简化软件开发过程。在Go语言中,有时需要向多个接收者发送信息。本文将介绍如何在G
    golang技能图谱 编程

    golang技能图谱

    从互联网行业的快速发展到人工智能技术的日益成熟,各种编程语言也应运而生。而在这众多的编程语言中,Golang(即Go)作为一门强大且高效的开发语言备受关注。Go
    评论:0   参与:  15