canal git源码地址:
https://github.com/alibaba/canal
场景:当一个mysql表的字段有增减时,canal-server集群下的对应监听的canal-instance则会触发此错误;
2021-11-23 09:29:18.950 [destination = test_instance_lw , address = svc.cluster/10.32.3.2:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler – destination:test_instance_lw[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:test.t_shop,14 vs 13
解决方案(一):
1,配置canal-instance文件直接移动位点至最新的timestamp,但是会丢弃掉之前的binlog更新,所以需要做一次全量的同步更新;
canal.instance.master.timestamp=1636712400000
2,注意要先停止instance服务,canal-client最好也停下,删除zk上记录binlog位点的/otter/canal/destinations/test_instance_lw/1001/cursor
解决方案(二):
配置忽略ddl更新,但是与方案相似,仍然需要再做一次全量;
canal.instance.filter.query.ddl = false
canal源码:
protected EntryPosition findStartPositionInternal(ErosaConnection connection) { MysqlConnection mysqlConnection = (MysqlConnection) connection; LogPosition logPosition = logPositionManager.getLatestIndexBy(destination); if (logPosition == null) {// 找不到历史成功记录 EntryPosition entryPosition = null; if (masterInfo != null && mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) { entryPosition = masterPosition; } else if (standbyInfo != null && mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) { entryPosition = standbyPosition; } if (entryPosition == null) { entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费 } // 判断一下是否需要按时间订阅 if (StringUtils.isEmpty(entryPosition.getJournalName())) { // 如果没有指定binlogName,尝试按照timestamp进行查找 if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) { logger.warn("prepare to find start position {}:{}:{}", new Object[] { "", "", entryPosition.getTimestamp() }); return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp()); } else { logger.warn("prepare to find start position just show master status"); return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费 } } else { if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) { // 如果指定binlogName + offest,直接返回 entryPosition = findPositionWithMasterIdAndTimestamp(mysqlConnection, entryPosition); logger.warn("prepare to find start position {}:{}:{}", new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(), entryPosition.getTimestamp() }); return entryPosition; } else { EntryPosition specificLogFilePosition = null; if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) { // 如果指定binlogName + // timestamp,但没有指定对应的offest,尝试根据时间找一下offest EntryPosition endPosition = findEndPosition(mysqlConnection); if (endPosition != null) { logger.warn("prepare to find start position {}:{}:{}", new Object[] { entryPosition.getJournalName(), "", entryPosition.getTimestamp() }); specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection, entryPosition.getTimestamp(), endPosition, entryPosition.getJournalName(), true); } } if (specificLogFilePosition == null) { if (isRdsOssMode()) { // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理 return null; } // position不存在,从文件头开始 entryPosition.setPosition(BINLOG_START_OFFEST); return entryPosition; } else { return specificLogFilePosition; } } } } else { if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) { if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) { // binlog定位位点失败,可能有两个原因: // 1. binlog位点被删除 // 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点 boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null) && logPosition.getPostion().getServerId() != null && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection)); if (case2) { EntryPosition findPosition = fallbackFindByStartTimestamp(logPosition, mysqlConnection); dumpErrorCount = 0; return findPosition; } // 处理 binlog 位点被删除的情况,提供自动重置到当前位点的功能 // 应用场景: 测试环境不稳定,位点经常被删。强烈不建议在正式环境中开启此控制参数,因为binlog // 丢失调到最新位点也即意味着数据丢失 if (isAutoResetLatestPosMode()) { dumpErrorCount = 0; return findEndPosition(mysqlConnection); } Long timestamp = logPosition.getPostion().getTimestamp(); if (isRdsOssMode() && (timestamp != null && timestamp > 0)) { // 如果binlog位点不存在,并且属于timestamp不为空,可以返回null走到oss binlog处理 return null; } } else if (StringUtils.isBlank(logPosition.getPostion().getJournalName()) && logPosition.getPostion().getPosition() <= 0 && logPosition.getPostion().getTimestamp() > 0) { return fallbackFindByStartTimestamp(logPosition, mysqlConnection); } // 其余情况 logger.warn("prepare to find start position just last position\n {}", JsonUtils.marshalToString(logPosition)); return logPosition.getPostion(); } else { // 针对切换的情况,考虑回退时间 long newStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds * 1000; logger.warn("prepare to find start position by switch {}:{}:{}", new Object[] { "", "", logPosition.getPostion().getTimestamp() }); return findByStartTimeStamp(mysqlConnection, newStartTimestamp); } } }
0 条评论