Mysql将数据同步到hive - binlog模式。
这种方法有两个优点:
也有不好的地方:
这一步最重要的细节是将mysql库的所有binlog数据输入到一个kafka主题中,格式为json。格式如下:
这一步的主要细节是写入hdfs的结构以及为什么不直接写入hive。
不写入hive表的原因是binlog的数据结构不固定,而hive的结构相对固定。如果要写入hive,需要将不同表的binlog写入不同的hive表,维护成本太高。而且spark其实可以直接读取hdfs的json文件,所以直接放hdfs就好了。
如果写入hdfs,考虑到这个数据的后续读取是根据表读取增量数据,那么写入的目录必须有日期和表名。我在这里使用的目录结构如下:
也就是说,数据要按照数据所属的db、table_name、date写入flink中的不同目录。
在这一步的过程中,会遇到一些重要的参数问题。
2.如上所述的检查点的时间间隔。它不仅会影响检查点的频率,还会影响hdfs文件的大小,这可能会对hdfs的性能产生很大的影响。如果这个值太大,会造成数据延迟过高,如果太小,会造成小文件过多。我这里的设置是5分钟。
仔细阅读,这个时候会问,既然你的目录是分表的,那么每个表每5分钟的binlog数据量是不一样的。对于一些大型的mysql表,我们每5分钟生成一个文件是可以接受的。对于一些小表,每五分钟生成一个文件,所以文件会很小。所以我在这里又做了一层筛选。我筛选掉了mysql的大表,只把大表同步到hdfs进行binlog的数据同步。因为binlog本身同步mysql数据的方式是为了节省mysql的读取压力,而小表不会有太大的压力,这些表可以直接通过jdbc同步。
这是整个环节中最复杂的部分,涉及的细节很多。
首先要明确大致思路。总体思路是读取hdfs上的旧历史数据,然后与新的binlog数据合并,生成新的快照。
其实还涉及到一些其他的细节,比如mysql表结构的变化,或者mysql和hive的数据结构不一致。
另外,将多个db的同一个表导入到hive的一个表中还有其他问题,就不赘述了。