前言 如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis、消息队列、大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将mysql的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢?如果有这样的一个需求,数据修改后,需要及时的将mysql中的数据更新到elasticsearch,我们会怎么进行实现呢? 数据同步方案选择针对上文的需求,经过思考,初步有如下的一些方案: 代码实现
针对代码中进行数据库的增删改操作时,同时进行elasticsearch的增删改操作。 mybatis实现
通过mybatis plugin进行实现,截取sql语句进行分析, 针对insert、update、delete的语句进行处理。显然,这些操作如果都是单条数据的操作,是很容易处理的。但是,实际开发中,总是会有一些批量的更新或者删除操作,这时候,就很难进行处理了。 Aop实现
不管是通过哪种Aop方式,根据制定的规则,如规范方法名,注解等进行切面处理,但依然还是会出现无法处理批量操作数据的问题。 logstash
logstash类似的同步组件提供的文件和数据同步的功能,可以进行数据的同步,只需要简单的配置就能将mysql数据同步到elasticsearch,但是logstash的原理是每秒进行一次增量数据查询,将结果同步到elasticsearch,实时性要求特别高的,可能无法满足要求。且此方案的性能不是很好,造成资源的浪费。
实现方式优缺点
代码实现技术难度低,侵入性强,实时性高
基于mybatis有一定的技术难度,但是无法覆盖所有的场景
Aop实现技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景
logstash技术难度低,无侵入性,无需开发,但会造成资源浪费。那么是否有什么更好的方式进行处理吗?mysql binlog同步,实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费,那么就有了我今天的主角——canal canal介绍canal 是阿里巴巴的一个开源项目,基于java实现,整体已经在很多大型的互联网项目生产环境中使用,包括阿里、美团等都有广泛的应用,是一个非常成熟的数据库同步方案,基础的使用只需要进行简单的配置即可。
canal是通过模拟成为mysql 的slave的方式,监听mysql 的binlog日志来获取数据,binlog设置为row模式以后,不仅能获取到执行的每一个增删改的脚本,同时还能获取到修改前和修改后的数据,基于这个特性,canal就能高性能的获取到mysql数据数据的变更。
使用canal Adapter为了便于大家的使用,官方做了一个独立的组件Adapter,Adapter是可以将canal server端获取的数据转换成几个常用的中间件数据源,现在支持kafka、rocketmq、hbase、elasticsearch,针对这几个中间件的支持,直接配置即可,无需开发。上文中,如果需要将mysql的数据同步到elasticsearch,直接运行 canal Adapter,修改相关的配置即可。 常见问题canal Adapter elasticsearch 改造因为有了canal和canal Adapter这个神器,同步到elasticsearch、hbase等问题都解决了,但是自己的开发的过程中发现,Adapter使用还是有些问题,因为先使用的是elasticsearch同步功能,所以对elasticsearch进行了一些改造: elasticsearch初始化一个全新的elasticsearch无法使用,因为没有创建elasticsearch index和mapping,增加了对应的功能。
elasticsearch配置文件mapping节点增加两个参数: Copy
[color=#f74449 !important] enablefieldmap: [color=#ae81ff !important]true[color=#f74449 !important] fieldmap:[color=#f74449 !important] id: [color=#e6db74 !important]"text"[color=#f74449 !important] name: [color=#e6db74 !important]"text"[color=#f74449 !important] c_time: [color=#e6db74 !important]"text" enablefieldmap 是否需要自动生成fieldmap,默认为false,如果需要启动的时候就生成这设置为true,并且设置
fieldmap,类似elasticsearch mapping中每个字段的类型。 esconfig bug处理代码中获取binlog的日志处理时,必须要获取数据库名,但是当获取binlog为type query时,是无法获取
数据库名的,此处有bug,导致出现 "Outer adapter write failed" ,且未输出错误日志,修复此bug.
|