alibaba canal1.1.5环境搭建-CSDN博客文章
canal介绍
copygit的文档 地址如下:
https://github.com/alibaba/canal
https://github.com/alibaba/canal/wiki/AdminGuide
https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
https://github.com/alibaba/canal/wiki/DevGuide
https://github.com/alibaba/canal/wiki/TableMetaTSDB
https://github.com/alibaba/canal/wiki/ClientExample
https://github.com/alibaba/canal/wiki/ClientAPI
https://github.com/alibaba/canal/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94
al [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
注意一般地,canal的文档比较少,很多时候遇到问题,需要看下源码; 所以最好clone下源码下来看下;
环境可以完全参照官方文档:
https://github.com/alibaba/canal/wiki/QuickStart
copy from canal
准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
启动
下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
解压缩
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
配置修改
vi conf/example/instance.properties
此配置可以暂时启动参照下,试下能否成功监控到binlog日志的变更,
一般生产环境的部署,会使用zk来搭建集群,管理canal-server;
在canal-admin上进行canal-instance管理(增加、删除、编辑)
使用zk+mysql db来管理和保存binlog位置;
## mysql serverId canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
## 启动
sh bin/startup.sh
## 关闭
sh bin/stop.sh
canal admin管理安装
准备
canal-admin的限定依赖
MySQL,用于存储配置和节点等相关数据
canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
部署
下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
解压缩
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 6 agapple staff 204B 8 31 15:37 bin
drwxr-xr-x 8 agapple staff 272B 8 31 15:37 conf
drwxr-xr-x 90 agapple staff 3.0K 8 31 15:37 lib
drwxr-xr-x 2 agapple staff 68B 8 31 15:26 logs
配置修改
vi conf/application.yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: # mysql地址用于保存canal管理配置信息 address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
初始化元数据库
mysql -h127.1 -uroot -p
导入初始化SQL
’ >source conf/canal_manager.sql ’
a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql
启动
sh bin/startup.sh
vi logs/admin.log
此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
canal-server端配置
使用canal_local.properties的配置覆盖canal.properties
# register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =
启动admin-server即可。
或在启动命令中使用参数:sh bin/startup.sh local 指定配置
基本的demo就可以正常操作了,未搭建集群,直接创建单机版就可以了; 启动instance后注意在admin上看下日志;
搭建zk集群下供canal进行注册和位点管理
https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
copy from canal
准备
首先确保你先完成了canal-admin的基本部署,请参考: Canal Admin QuickStart
设计理念
canal-admin的核心模型主要有:
instance,对应canal-server里的instance,一个最小的订阅mysql的队列
server,对应canal-server,一个server里可以包含多个instance
集群,对应一组canal-server,组合在一起面向高可用HA的运维
简单解释:
instance因为是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上,有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务动态加载的过程,有点类似于之前的autoScan机制,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件;
将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)
理解了这一层基本概念之后,就开始WebUI的操作介绍.
集群运维
创建集群(前提是自己已搭建好zk集群)
集群变更
配置项:
修改集群/删除集群,属于基本的集群信息维护和删除
主配置,主要是指集群对应的canal.properties配置,设计上一个集群的所有server会共享一份全局canal.properties配置 (如果有个性化的配置需求,可以创建多个集群)
查看server,主要是指查看挂载在这个集群下的所有server列表。
Server运维
配置项:
所属集群,可以选择为单机 或者 集群。一般单机Server的模式主要用于一次性的任务或者测试任务
Server名称,唯一即可,方便自己记忆
Server Ip,机器ip
admin端口,canal 1.1.4版本新增的能力,会在canal-server上提供远程管理操作,默认值11110
tcp端口,canal提供netty数据订阅服务的端口
metric端口, promethues的exporter监控数据端口 (未来会对接监控)
Server变更
配置项:
配置,主要是维护单机模式的canal.properties配置,注意:挂载到集群模式的server,不允许单独编辑server的canal.properties配置,需要保持集群配置统一
修改/删除,主要是维护server的基本属性,比如名字和ip、port
启动/停止,主要是提供动态启停server的能力,比如集群内这个机器打算下线了,可以先通过停止释放instance的运行,集群中的其他机器通过HA就会开始接管任务
日志,查看server的根日志,主要是canal/canal.log的最后100行日志
详情,主要提供查询在当前这个server上运行的instance列表,以server维度方便快速做instance的启动、停止操作. 比如针对集群模式,如果server之间任务运行负载不均衡,可以通过对高负载Server执行部分Instance的停止操作来达到均衡的目的
Instance运维
创建Instance
instance配置比较简单,主要关注:
资源关联,比如挂载到具体的单机 或 集群
instance.properties配置维护,可以载入默认模板进行修改
Instance变更
配置项:
修改,主要就是维护instance.properties配置,做了修改之后会触发对应单机或集群server上的instance做动态reload
删除,相当于直接执行instance stop,并执行配置删除
启动/停止,对instance进行状态变更,做了修改会触发对应单机或集群server上的instance做启动/停止操作
日志,主要针对instance运行状态时,获取对应instance的最后100行日志,比如example/example.log;
配置文件参考
集群主配置: canal-cluster.tmplate
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 #canal.admin.port = 11110 #canal.admin.user = admin #canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = canal.zkServers = canal-zookeeper:2181 # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = tcp # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true #canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} #canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; #canal.instance.tsdb.dbUsername = canal #canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# canal.destinations = # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml ## 使用mysql来保存tsdb信息 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = manager canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml ##默认 #canal.instance.global.spring.xml = classpath:spring/file-instance.xml ## 使用zk来对cluster的管理,binlog日志位点的保存 canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ Properties ############# ################################################## # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = canal.aliyun.uid= canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 ################################################## ######### Kafka ############# ################################################## #kafka.bootstrap.servers = 127.0.0.1:6667 #kafka.acks = all #kafka.compression.type = none #kafka.batch.size = 16384 #kafka.linger.ms = 1 #kafka.max.request.size = 1048576 #kafka.buffer.memory = 33554432 #kafka.max.in.flight.requests.per.connection = 1 #kafka.retries = 0 #kafka.kerberos.enable = false #kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" #kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf" ################################################## ######### RocketMQ ############# ################################################## #rocketmq.producer.group = test #rocketmq.enable.message.trace = false #rocketmq.customized.trace.topic = #rocketmq.namespace = #rocketmq.namesrv.addr = 127.0.0.1:9876 #rocketmq.retry.times.when.send.failed = 0 #rocketmq.vip.channel.enabled = false #rocketmq.tag = ################################################## ######### RabbitMQ ############# ################################################## #rabbitmq.host = #rabbitmq.virtual.host = #rabbitmq.exchange = #rabbitmq.username = #rabbitmq.password = #rabbitmq.deliveryMode =
instance配置:canal-instance.tmplate
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=localhost:3306 canal.instance.master.journal.name= canal.instance.master.position= ##可以指定时间位点进行消费 ##canal.instance.master.timestamp=1636712400000 canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true ## 手动创建db canal_tsdb 表会自动创建 canal.instance.tsdb.url=jdbc:mysql://localhost:3306/canal_tsdb canal.instance.tsdb.dbUsername=root canal.instance.tsdb.dbPassword=***** #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex #canal.instance.filter.regex=.*\\..* #canal.instance.filter.regex=dev\\..* ## 只监控一张t_user表 canal.instance.filter.regex=dev.t_user # table black regex canal.instance.filter.black.regex= ## 忽略ddl的变更,默认是不忽略 true ##canal.instance.filter.query.ddl = false # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
0 条评论