通过Flink SQL基于binlog将MySQL数据时时同步到另一个库中

环境配置:
CentOS:
[root@node212 ~]# cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
[root@node212 ~]# uname -a
Linux node212 4.4.219-1.el7.elrepo.x86_64 #1 SMP Sun Apr 12 16:13:06 EDT 2020 x86_64 x86_64 x86_64 GNU/Linux
[root@node212 ~]#


flink-1.13.2(这个版本不需要编译,直接解压就可以使用)
        flink-1.13.2-bin-scala_2.12.tgz
        下载地址:https://repo.huaweicloud.com/apache/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz
下载完成后,解压到/usr/local目录下:
        tar xf /root/flink-1.13.2-bin-scala_2.12.tgz -C /usr/local/
解压后的目录如下:
[root@node212 flink-1.13.2]# pwd
/usr/local/flink-1.13.2
[root@node212 flink-1.13.2]# ls
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt


启动服务:
[root@node212 bin]# /usr/local/flink-1.13.2/bin/start-cluster.sh 


java环境安装:
yum install java-1.8.0-openjdk* -y
验证:
[root@node212 lib]# java -version
openjdk version "1.8.0_332"
OpenJDK Runtime Environment (build 1.8.0_332-b09)
OpenJDK 64-Bit Server VM (build 25.332-b09, mixed mode)


删除java ssl验证:
    vim /lib/jvm/java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.x86_64/jre/lib/security/java.security
        定位到Java的安装目录conf\security,用编辑器打开java.security文件,搜索关键字
        将其中的TLSv1, TLSv1.1配置删除掉即可,这2个配置是Java高版本中增加的,低版本不存在这2个配置,如下图所示:
jdk.tls.disabledAlgorithms=SSLv3, TLSv1, TLSv1.1, RC4, DES, MD5withRSA, \
    DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL, \
    include jdk.disabled.namedCurves  


mvn环境安装:
wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
yum -y install apache-maven


验证:
[root@node212 lib]# mvn -version
Apache Maven 3.5.2 (138edd61fd100ec658bfa2d307c43b76940a5d7d; 2017-10-18T15:58:13+08:00)
Maven home: /usr/share/apache-maven
Java version: 1.8.0_332, vendor: Red Hat, Inc.
Java home: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.x86_64/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "4.4.219-1.el7.elrepo.x86_64", arch: "amd64", family: "unix"


如果要连接mysql,并且用于mysql-cdc,就需要下载以下jar包:
https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar

注意,这个只能用于mysql-cdc,也就是只能用于源来取数据,不能用来作为目标推送数据
如果需要把数据推送到目标库,可以用jdbc的connector来处理


如果需要连接目标库,需要下载jdbc连接包:
flink-connector-jdbc_2.11-1.13.2.jar


将以上两个jar包直接放在 /usr/local/flink-1.13.2/lib 目录下
[root@node212 lib]# pwd
/usr/local/flink-1.13.2/lib
[root@node212 lib]# ls
flink-connector-jdbc_2.11-1.13.2.jar  flink-sql-connector-mysql-cdc-2.2.0.jar  log4j-api-2.12.1.jar
flink-csv-1.13.2.jar                  flink-sql-connector-tidb-cdc-2.2.1.jar   log4j-core-2.12.1.jar
flink-dist_2.12-1.13.2.jar            flink-table_2.12-1.13.2.jar              log4j-slf4j-impl-2.12.1.jar
flink-json-1.13.2.jar                 flink-table-blink_2.12-1.13.2.jar
flink-shaded-zookeeper-3.4.14.jar     log4j-1.2-api-2.12.1.jar


将包放到以下目录后,需要重新重启服务后,才会生效
/usr/local/flink-1.13.2/bin/stop-cluster.sh 
/usr/local/flink-1.13.2/bin/start-cluster.sh 


以上环境就安装完成了,


现在通过flink sql来实现不同的mysql库数据时时同步:
运行以下命令打开flink sql命令行客户端:


/usr/local/flink-1.13.2/bin/sql-client.sh


将以下命令输入到命令行中:
源库表:
CREATE TABLE source (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.1.221',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'source',
'table-name' = 'source'
);


目标库表:
CREATE TABLE target (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.1.221:3306/target',
'username' = 'root',
'password' = '123456',
'table-name' = 'target'
);


建立关系:
insert into target(id,name) select id,name from source;


成功建立后,可以通过网页查看具体的JOB:
http://172.16.1.212:8081/










分割线
感谢打赏
江西数库信息技术有限公司
YWSOS.COM 平台代运维解决方案
 评论
 发表评论
姓   名:

Powered by AKCMS