alibaba canal1.1.5环境搭建-CSDN博客文章

canal介绍
copygit的文档 地址如下:
https://github.com/alibaba/canal
https://github.com/alibaba/canal/wiki/AdminGuide
https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
https://github.com/alibaba/canal/wiki/DevGuide
https://github.com/alibaba/canal/wiki/TableMetaTSDB
https://github.com/alibaba/canal/wiki/ClientExample
https://github.com/alibaba/canal/wiki/ClientAPI
https://github.com/alibaba/canal/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94

al [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)

注意一般地,canal的文档比较少,很多时候遇到问题,需要看下源码; 所以最好clone下源码下来看下;

环境可以完全参照官方文档:

https://github.com/alibaba/canal/wiki/QuickStart
copy from canal

准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

启动
下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz

解压缩

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal

解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs

配置修改
vi conf/example/instance.properties

此配置可以暂时启动参照下,试下能否成功监控到binlog日志的变更,
一般生产环境的部署,会使用zk来搭建集群,管理canal-server;
在canal-admin上进行canal-instance管理(增加、删除、编辑)
使用zk+mysql db来管理和保存binlog位置;

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

## 启动
sh bin/startup.sh
## 关闭
sh bin/stop.sh

canal admin管理安装
准备
canal-admin的限定依赖
MySQL,用于存储配置和节点等相关数据
canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)

部署
下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解压缩
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 6 agapple staff 204B 8 31 15:37 bin
drwxr-xr-x 8 agapple staff 272B 8 31 15:37 conf
drwxr-xr-x 90 agapple staff 3.0K 8 31 15:37 lib
drwxr-xr-x 2 agapple staff 68B 8 31 15:26 logs
配置修改
vi conf/application.yml

server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8

spring.datasource:
# mysql地址用于保存canal管理配置信息
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin

初始化元数据库
mysql -h127.1 -uroot -p
导入初始化SQL

’ >source conf/canal_manager.sql ’
a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql
启动
sh bin/startup.sh

vi logs/admin.log
此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

canal-server端配置
使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

启动admin-server即可。

或在启动命令中使用参数:sh bin/startup.sh local 指定配置

基本的demo就可以正常操作了,未搭建集群,直接创建单机版就可以了; 启动instance后注意在admin上看下日志;

搭建zk集群下供canal进行注册和位点管理

https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
copy from canal
准备
首先确保你先完成了canal-admin的基本部署,请参考: Canal Admin QuickStart

设计理念
canal-admin的核心模型主要有:

instance,对应canal-server里的instance,一个最小的订阅mysql的队列
server,对应canal-server,一个server里可以包含多个instance
集群,对应一组canal-server,组合在一起面向高可用HA的运维

简单解释:

instance因为是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上,有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务动态加载的过程,有点类似于之前的autoScan机制,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件;
将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)

理解了这一层基本概念之后,就开始WebUI的操作介绍.
集群运维
创建集群(前提是自己已搭建好zk集群)

集群变更

配置项:
修改集群/删除集群,属于基本的集群信息维护和删除
主配置,主要是指集群对应的canal.properties配置,设计上一个集群的所有server会共享一份全局canal.properties配置 (如果有个性化的配置需求,可以创建多个集群)
查看server,主要是指查看挂载在这个集群下的所有server列表。

Server运维

配置项:
所属集群,可以选择为单机 或者 集群。一般单机Server的模式主要用于一次性的任务或者测试任务
Server名称,唯一即可,方便自己记忆
Server Ip,机器ip
admin端口,canal 1.1.4版本新增的能力,会在canal-server上提供远程管理操作,默认值11110
tcp端口,canal提供netty数据订阅服务的端口
metric端口, promethues的exporter监控数据端口 (未来会对接监控)

Server变更

配置项:

配置,主要是维护单机模式的canal.properties配置,注意:挂载到集群模式的server,不允许单独编辑server的canal.properties配置,需要保持集群配置统一
修改/删除,主要是维护server的基本属性,比如名字和ip、port
启动/停止,主要是提供动态启停server的能力,比如集群内这个机器打算下线了,可以先通过停止释放instance的运行,集群中的其他机器通过HA就会开始接管任务
日志,查看server的根日志,主要是canal/canal.log的最后100行日志
详情,主要提供查询在当前这个server上运行的instance列表,以server维度方便快速做instance的启动、停止操作. 比如针对集群模式,如果server之间任务运行负载不均衡,可以通过对高负载Server执行部分Instance的停止操作来达到均衡的目的

Instance运维
创建Instance

instance配置比较简单,主要关注:
资源关联,比如挂载到具体的单机 或 集群
instance.properties配置维护,可以载入默认模板进行修改
Instance变更

配置项:

修改,主要就是维护instance.properties配置,做了修改之后会触发对应单机或集群server上的instance做动态reload
删除,相当于直接执行instance stop,并执行配置删除
启动/停止,对instance进行状态变更,做了修改会触发对应单机或集群server上的instance做启动/停止操作
日志,主要针对instance运行状态时,获取对应instance的最后100行日志,比如example/example.log;

配置文件参考
集群主配置: canal-cluster.tmplate

#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
#canal.admin.port = 11110
#canal.admin.user = admin
#canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers = canal-zookeeper:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
#canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#canal.instance.tsdb.dbUsername = canal
#canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
## 使用mysql来保存tsdb信息
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
##默认
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
## 使用zk来对cluster的管理,binlog日志位点的保存
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### Kafka #############
##################################################
#kafka.bootstrap.servers = 127.0.0.1:6667
#kafka.acks = all
#kafka.compression.type = none
#kafka.batch.size = 16384
#kafka.linger.ms = 1
#kafka.max.request.size = 1048576
#kafka.buffer.memory = 33554432
#kafka.max.in.flight.requests.per.connection = 1
#kafka.retries = 0

#kafka.kerberos.enable = false
#kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
#kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### RocketMQ #############
##################################################
#rocketmq.producer.group = test
#rocketmq.enable.message.trace = false
#rocketmq.customized.trace.topic =
#rocketmq.namespace =
#rocketmq.namesrv.addr = 127.0.0.1:9876
#rocketmq.retry.times.when.send.failed = 0
#rocketmq.vip.channel.enabled = false
#rocketmq.tag =

##################################################
######### RabbitMQ #############
##################################################
#rabbitmq.host =
#rabbitmq.virtual.host =
#rabbitmq.exchange =
#rabbitmq.username =
#rabbitmq.password =
#rabbitmq.deliveryMode =

instance配置:canal-instance.tmplate

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=localhost:3306
canal.instance.master.journal.name=
canal.instance.master.position=
##可以指定时间位点进行消费
##canal.instance.master.timestamp=1636712400000
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
## 手动创建db canal_tsdb 表会自动创建
canal.instance.tsdb.url=jdbc:mysql://localhost:3306/canal_tsdb
canal.instance.tsdb.dbUsername=root
canal.instance.tsdb.dbPassword=*****

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#canal.instance.filter.regex=.*\\..*

#canal.instance.filter.regex=dev\\..*
## 只监控一张t_user表
canal.instance.filter.regex=dev.t_user
# table black regex
canal.instance.filter.black.regex=

## 忽略ddl的变更,默认是不忽略 true
##canal.instance.filter.query.ddl = false

# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

alibaba canal1.1.5环境搭建-CSDN博客文章

分类: mysql

0 条评论

发表回复

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据

蜀ICP备16001794号
© 2014 - 2024 linpxing.cn All right reserved.