• 周五. 8月 19th, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

使用 Canal 增量同步 MySQL 到 ES (下)

admin

11月 28, 2021

原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/15077622.html

Canal Configuration

MySQL 授权

授权 Canal 链接 MySQL 账号具有作为 MySQL Slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

Note:对于自建 MySQL , 需要先开启 binlog 写入功能,配置 binlog-format 为 ROW 模式

MySQL 中新建 DB、Table

CREATE DATABASE test;

USE test;

CREATE TABLE `tb_user` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(30) DEFAULT NULL,
  `password` varchar(30) DEFAULT NULL,
  `age` int(10) unsigned DEFAULT NULL,
  `create_by` varchar(30) NOT NULL,
  `update_by` varchar(30) NOT NULL,
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

ES 中新建 index

PUT /test_user
{
  "mappings": {
    "properties": {
      "username": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "age": {
        "type": "long"
      },
      "create_time": {
        "type": "date"
      },
      "update_time": {
        "type": "date"
      }
    }
  }
}

Canal Deployer Configruation

下载安装包

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

新建目录

mkdir -p ~/app/canal-deployer

解压安装包到目录

tar zxvf canal.deployer-1.1.5.tar.gz -C ~/app/canal-deployer

修改 instance.properties

vi ~/app/canal-deployer/conf/example/instance.properties

修改这4行

canal.instance.mysql.slaveId=0
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

完整的 instance.properties 如下

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\..*
# table black regex
canal.instance.filter.black.regex=mysql\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

启动

cd ~/app/canal-deployer/bin
./startup.sh

Canal Adapter Configruation

下载安装包

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz

新建目录

mkdir -p ~/app/canal-adapter

解压安装包到目录

tar zxvf canal.adapter-1.1.5.tar.gz -C ~/app/canal-adapter

修改 application.yml

vi ~/app/canal-adapter/conf/application.yml

完整的 application.yml 如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es7
        key: es7key
        hosts: http://127.0.0.1:9200
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: docker-cluster
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

在 ~/app/canal-adapter/conf/es7/ 目下新建一个 test_user.yml

vi ~/app/canal-adapter/conf/es7/test_user.yml

新增如下

dataSourceKey: defaultDS
outerAdapterKey: es7key
destination: example
groupId: g1
esMapping:
  _index: test_user
  _id: _id
  upsert: true
  sql: "select a.id as _id, a.username, a.age, a.create_time, a.update_time from tb_user a"
  commitBatch: 3000

启动

cd ~/app/canal-adapter/bin
./startup.sh

启动过程中会遇到一个超级大坑

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
    at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]

具体解决方案可以参考 Canal Adapter com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

Test

解决完 jar 包冲突后,再次重启后 canal adapter 后,测试 MySQL 增量数据同步是否成功

未插入数据前

GET /test_user/_search

插入数据

INSERT INTO `tb_user` (`id`, `username`, `password`, `age`, `create_by`, `update_by`, `create_time`, `update_time`)
VALUES
(1, 'canal1', 'canal1', 28, 'admin', 'admin', '2021-07-01 00:00:00.000', '2021-07-01 00:00:00.000');

再次执行查询

 也可以执行单个查询

GET /test_user/_doc/1

更新数据

update tb_user set age = 38 where id = 1; 

再次查询

删除数据

delete from tb_user where id = 1;

再次查询

Reference

https://github.com/alibaba/canal/wiki/QuickStart

https://github.com/alibaba/canal/releases


欢迎点赞关注和收藏

  

强者自救 圣者渡人

发表回复

您的电子邮箱地址不会被公开。