mysql2ch-0.5.3-clickhouse从MySQL增量时时同步数据


介绍

mysql2ch 是一个用于同步 MySQL 到 ClickHouse 的工具,支持全量同步与增量同步。

特性

支持全量同步与增量同步。
支持 DDL 与 DML,当前支持 DDL 字段新增与删除,支持所有的 DML。
丰富的配置项。
依赖软件版本


kafka-2.1.0,用户缓冲 MySQL binlog 的消息队列。需要先安装好
redis-5.0.6,缓存 MySQL binlog position 与 file。需要先安装好
Python 3.6.8 运行环境
安装


pip3 install mysql2ch
yum install postgresql-devel 




[root@k8smaster ~]# mysql2ch --version
mysql2ch version, 0.5.3


这个软件有以下BUG,需要修改后才能使用:
BUG1:
pyproject.toml报错
pyproject.toml报错,提示找不到文件
解决方法: 修改成绝对路径


[root@k8smaster mysql2ch-master]# vim /usr/local/lib/python3.6/site-packages/mysql2ch/cli.py
38 def version():
39     with open("/usr/local/lib/python3.6/site-packages/mysql2ch/pyproject.toml") as f:
40         ret = re.findall(r'version = "(\d+\.\d+\.\d+)"', f.read())
41         return ret[0]




BUG2:
cat  /usr/local/lib/python3.6/site-packages/mysql2ch/convert.py 
 37                 token_list.tokens.append(SQLToken(Keyword, "column"))
38                 token_list.tokens.append(SQLToken(Whitespace, " "))
39                # tokens = parsed.token_next(i)[1].tokens        #这段代码注释
41                 token_list.tokens.append(tokens[0])        
  
配置文件:


cat /etc/mysql2ch.ini


 [root@k8smaster ~]# cat /etc/mysql2ch.ini


[core]
# when set True, will display sql information.
debug = True
# current support redis and kafka
broker_type = redis
mysql_server_id = 33
# optional, read from `show master status` result if empty
init_binlog_file =mysql-bin.000980
# optional, read from `show master status` result if empty
init_binlog_pos =3492738
# these tables skip delete, multiple separated with comma, format with schema.table
skip_delete_tables = mysql.user
# these tables skip update, multiple separated with comma, format with schema.table
skip_update_tables = mysql.user
# skip delete or update dmls, multiple separated with comma, example: delete,update
skip_dmls =
# how many num to submit,recommend set 20000 when production
insert_num = 1
# how many seconds to submit,recommend set 60 when production
insert_interval = 1
# auto do full etl at first when table not exists
auto_full_etl = True


[sentry]
# sentry environment
environment = development
sentry dsn
dsn = https://xxxxxxxx@sentry.test.com/1


[redis]
host = 127.0.0.1
port = 6379
password =
db = 0
prefix = mysql2ch
# enable redis sentinel
sentinel = false
# redis sentinel hosts,multiple separated with comma
sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002
sentinel_master = master
# stream max len, will delete redundant ones with FIFO
queue_max_len = 200000


[mysql]
host = 192.168.66.33
port = 3306
user = root
password = Hexin2007


# sync schema, format with mysql.schema, each schema for one section.
[mysql.test]
# multiple separated with comma
tables = rsc
# kafka partition, need when broker_type=kafka
kafka_partition = 0


[clickhouse]
host = localhost
port = 9000
user = default
password =


# need when broker_type=kafka
[kafka]
# kafka servers,multiple separated with comma
servers = 127.0.0.1:9092
topic = mysql2ch
全量同步


你可能需要在开始增量同步之前进行一次全量导入,或者使用--renew重新全量导入。


注意:


如果字段decimal字段为空时,会报错,可以通过以下方式手动进行全量同步:




+++++++开始+++++++++++
单次单表同步
方法一:
先在clickhouse创建表结构
CREATE TABLE hexin.hexin_erp_order_goods(`id` Int32, `order_id` Int32, `product_id` String, `variant_id` String, `product_name` String, `product_count` Int32, `product_price` Decimal(10, 2), `payment` Decimal(10, 2), `currency` String, `exchange_rate` Decimal(12, 4), `product_img_url` String, `product_unit` String, `product_attributes` String, `store_sku` String, `asin` String, `apply_cn` String, `apply_en` String, `apply_price` Decimal(10, 2), `apply_code` String, `discount` Decimal(10, 2), `apply_weight` Decimal(10, 2), `humanity_created` Int8, `storage_id` Nullable(Int32), `order_time` Int32, `is_less` Nullable(Int8), `less_num` Nullable(Int32))ENGINE = MergeTree ORDER BY id SETTINGS index_granularity = 8192


再通过以下语句将mysql的数据导入到clickhouse中
INSERT INTO hexin.hexin_erp_order_goods select * from mysql('192.168.66.33:3306','hexin','hexin_erp_order_goods','root','Hexin2007')
++++++++++++结束+++++++++++++++++++++


[root@k8smaster mysql2ch-master]# mysql2ch --config /etc/mysql2ch.ini etl --schema test --renew
synch --config /etc/synch.ini etl --schema test --renew
2020-06-24 11:07:06 - mysql2ch.reader:45 - DEBUG - select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA='test' and TABLE_NAME='rsc' and COLUMN_KEY='PRI'
2020-06-24 11:07:06 - mysql2ch.writer:28 - DEBUG - drop table test.rsc
2020-06-24 11:07:06 - mysql2ch.replication:37 - INFO - drop table success:test.rsc
2020-06-24 11:07:06 - mysql2ch.writer:28 - DEBUG - select count(*)from system.tables where database = 'test' and name = 'rsc'
2020-06-24 11:07:06 - mysql2ch.writer:28 - DEBUG - CREATE TABLE test.rsc ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('192.168.66.33:3306', 'test', 'rsc', 'root', 'Hexin2007')
2020-06-24 11:07:07 - mysql2ch.writer:39 - DEBUG - select COLUMN_NAME, COLUMN_TYPE from information_schema.COLUMNS where TABLE_NAME = 'rsc' and COLUMN_TYPE like '%decimal%'and TABLE_SCHEMA = 'test'
2020-06-24 11:07:07 - mysql2ch.replication:44 - INFO - etl success:test.rsc


 $ mysql2ch etl -h
    usage: mysql2ch etl [-h] --schema SCHEMA [--tables TABLES] [--renew]


    optional arguments:


      -h, --help       show this help message and exit


      --schema SCHEMA  Schema to full etl.


      --tables TABLES  Tables to full etl,multiple tables split with comma.


      --renew          Etl after try to drop the target tables.




生产者


监听 MySQL binlog 并生产至 kafka。 (这里有BUG,请查看BUG列表)


 mysql2ch --config /etc/mysql2ch.ini produce


/usr/local/bin/synch --config /etc/synch.ini  produce




[root@k8smaster mysql2ch-master]# mysql2ch --config /etc/mysql2ch.ini produce
2020-06-24 11:44:18 - mysql2ch.producer:39 - INFO - start producer success
2020-06-24 11:44:18 - mysql2ch.reader:45 - DEBUG - select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA='test' and TABLE_NAME='rsc' and COLUMN_KEY='PRI'
2020-06-24 11:44:18 - mysql2ch.reader:88 - INFO - start sync at 2020-06-24 11:44:18
2020-06-24 11:44:18 - mysql2ch.reader:89 - INFO - mysql binlog: mysql-bin.000980:3492738
2020-06-24 11:47:35 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'insert', 'values': {'id': 7, 'name': '111'}, 'event_unixtime': 1592970455335448, 'action_core': '2'}
2020-06-24 11:47:35 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5034453
2020-06-24 11:48:39 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'delete', 'values': {'id': 7, 'name': '111'}, 'event_unixtime': 1592970519132017, 'action_core': '1'}
2020-06-24 11:48:39 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5054489
2020-06-24 11:48:39 - mysql2ch.producer:76 - INFO - success send 2 events in 1 seconds
2020-06-24 11:48:39 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'insert', 'values': {'id': 7, 'name': '3333'}, 'event_unixtime': 1592970519132849, 'action_core': '2'}
2020-06-24 11:48:39 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5054489
2020-06-24 11:49:08 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'delete', 'values': {'id': 7, 'name': '3333'}, 'event_unixtime': 1592970548923095, 'action_core': '1'}
2020-06-24 11:49:08 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5087311
2020-06-24 11:49:08 - mysql2ch.producer:76 - INFO - success send 2 events in 1 seconds


消费者


从kafka 消费并插入 ClickHouse,使用--skip-error跳过错误行。


mysql2ch --config /etc/mysql2ch.ini consume --schema test


/usr/local/bin/synch --config /etc/synch.ini  consume --schema hexin




on': 'delete', 'values': {'id': 7, 'name': '3333'}, 'event_unixtime': 1592970548923095, 'action_core': '1'}
2020-06-24 11:49:08 - mysql2ch.writer:28 - DEBUG - select count(*) from system.mutations where is_done=0 and database = 'test'
2020-06-24 11:49:09 - mysql2ch.writer:28 - DEBUG - alter table test.rsc delete where id in (7)
2020-06-24 11:49:09 - mysql2ch.writer:194 - INFO - test.rsc:success delete 1 rows
2020-06-24 11:49:09 - mysql2ch.consumer:121 - INFO - success commit 1 events
2020-06-24 11:49:10 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:11 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:12 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:13 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:14 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:15 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
    $ mysql2ch consume -h


    usage: mysql2ch consume [-h] --schema SCHEMA [--skip-error] [--auto-offset-reset AUTO_OFFSET_RESET]


    optional arguments:


      -h, --help            show this help message and exit


      --schema SCHEMA       Schema to consume.


     --skip-error          Skip error rows.


      --auto-offset-reset AUTO_OFFSET_RESET


                            Kafka auto offset reset,default earliest.


 






未测试:


使用 docker-compose(推荐)


version: "3"


services:


  producer:


    env_file:


      - .env


    depends_on:


      - redis


    image: long2ice/mysql2ch:latest


    command: mysql2ch produce


  # add more service if you need.


  consumer.test:


    env_file:


      - .env


    depends_on:


      - redis


      - producer


    image: long2ice/mysql2ch:latest


    # consume binlog of test


    command: mysql2ch consume --schema test


  redis:


    hostname: redis


    image: redis:latest


    volumes:


      - redis:/data


  ui:


    env_file:


      - .env


    ports:


      - 5000:5000


    depends_on:


      - redis


      - producer


      - consumer


    image: long2ice/mysql2ch


    command: mysql2ch ui


volumes:


  redis:


可选


Sentry,错误报告,在.env配置 SENTRY_DSN后开启。


开源许可


本项目遵从 MIT开源许可。


 
分割线
感谢打赏
江西数库信息技术有限公司
YWSOS.COM 平台代运维解决方案
 评论
 发表评论
姓   名:

Powered by AKCMS