Flink-SQL网页管理工具-flink-streaming-platform-web配置


1.简介

flink-streaming-platform-web系统是基于Apache Flink 封装的一个可视化的、轻量级的flink web客户端系统,用户只需在web 界面进行sql配置就能完成流计算任务。

主要功能:包含任务配置、启/停任务、告警、日志等功能,支持sql语法提示,格式化、sql语句校验。

目的:减少开发、降低成本 完全实现sql化 流计算任务。  

1、主要功能

[1] 任务支持单流 、双流、 单流与维表等。
[2] 支持本地模式、yarn-per模式、STANDALONE模式。
[3] 支持catalog、hive。
[4] 支持自定义udf、连接器等,完全兼容官方连接器。
[5] 支持sql的在线开发,语法提示,格式化。
[6] 支持钉钉告警、自定义回调告警、自动拉起任务。
[7] 支持自定义Jar提交任务。
[8] 支持多版本flink版本(需要用户编译对应flink版本)。
[9] 支持自动、手动savepoint备份,并且从savepoint恢复任务。
[10] 支持批任务如:hive。
目前flink版本已经升级到1.13

2、流程说明

2.1 软件版本列表
  flink-streaming-platform-web.tar.gz
flink-streaming-platform-webtagV20220120(flink1.13.2) (下载地址)

 flink-1.13.2
 下载地址:https://repo.huaweicloud.com/apache/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz

依赖的jar包
操作mysql需要的jar包(flink jdbc connector下载地址
flink-connector-jdbc_2.11-1.13.2.jar
mysql-connector-java-8.0.30.jar(根据使用的mysql版本来选择,各版本下载地址

操作kafka需要的jar包(flink kafka connector下载地址

这里只需要下载这一个jar包即可:flink-sql-connector-kafka_2.11-1.13.6.jar

将上述3个jar包,拷贝到flink的lib目录下,如下图所示:

flink-connector-clickhouse-1.13.2-SNAPSHOT.jar  flink-sql-connector-kafka_2.11-1.13.6.jar  log4j-api-2.12.1.jar
flink-connector-jdbc_2.11-1.13.2.jar            flink-sql-connector-mysql-cdc-2.2.0.jar    log4j-core-2.12.1.jar
flink-csv-1.13.2.jar                            flink-sql-connector-tidb-cdc-2.2.1.jar     log4j-slf4j-impl-2.12.1.jar
flink-dist_2.12-1.13.2.jar                      flink-table_2.12-1.13.2.jar                mysql-connector-java-8.0.30.jar
flink-json-1.13.2.jar                           flink-table-blink_2.12-1.13.2.jar
flink-shaded-zookeeper-3.4.14.jar               log4j-1.2-api-2.12.1.jar
2.2 添加第3放依赖jar包的方法
第一种方法:下载连接器Jar包后后直接放到 flink/lib/目录下就可以使用了,其缺点是:
1、该方案存在jar冲突可能,特别是连接器多了以后
2、在非yarn模式下每次新增jar需要重启flink集群服务器

第二种方法:配置每一个flink任务时,放到http的服务下填写到三方地址,例如设置内部的http repo下载源(公司内部建议放到内网的某个http服务)

http://abc.com/jars/flink-connector-jdbc_2.11-1.12.0.jar
http://abc.com/jars/flink-sql-connector-kafka_2.11-1.12.0.jar
http://abc.com/jars/flink-streaming-udf.jar

http://ccblog.cn/jars/mysql-connector-java-5.1.25.jar

如下图所示:


3. 启动
3.1 启动flink
 cd /work/flink/flink-1.12.0/bin
./start-cluster.sh

3.2 启动flink-streaming-platform-web
从git上下载完,进行解压,记住,要和之前的Flink在一个目录
[root@node212 local]# pwd
/usr/local
[root@node212 local]# ls
bin  etc  flink-1.13.2  flink-streaming-platform-web-1.13.2  games  include  lib  lib64  libexec  sbin  share  src

安装mysql 5.7.36环境(略):

修改配置文件,主要修改数据库地址,在config的application.properties里
####jdbc信息
server.port=9084
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
useSSL=false 注意这个,如果你的MySQL支持ssl那就不用加,不支持,记得加上,防止报错

创建数据表脚本,依然是在flink_web下,脚本在 https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql,请自行下载和创建。

修改flink-streaming-platform-web占用的内存:
[root@node212 bin]# vim /usr/local/flink-streaming-platform-web-1.13.2/bin/deploy.sh 
##JAVA_OPTS设置
JAVA_OPTS="-Xmx512M -Xms512M -Xmn128M -XX:MaxMetaspaceSize=512M -XX:MetaspaceSize=512M -XX:+UseConcMarkSweepGC -Xdebug -Xrunjdwp:transport=dt_socket,address=9901,server=y,suspend=n  -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -Dcom.sun.management.jmxremote.port=8999 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses -XX:+CMSClassUnloadingEnabled -XX:+ParallelRefProcEnabled -XX:+CMSScavengeBeforeRemark -XX:ErrorFile=../logs/hs_err_pid%p.log  -XX:HeapDumpPath=../logs -XX:+HeapDumpOnOutOfMemoryError"

启动进程:
cd /usr/local/flink-streaming-platform-web-1.13.2/bin
sh deploy.sh start

//停止命令
sh deploy.sh stop

解密,为啥在进入bin里面,不是在其他目录启动呢,看启动脚本,你就明白了,主要是目录层级的问题,仔细看看吧。

4. 配置flink-streaming-platform-web

4.1 登录页面
启动后,访问web页面
打开页面查看
http://172.16.1.212:9084

登录号:admin 密码 123456。

4.2 系统配置
如下图,在下拉框中选择参数,在变量值文本框中填入对应的键值,点击保存即可。配置好的参数会出现在下面的列表中

4.3 任务配置
·        新增配置
在配置管理下,新增一个配置,即新建一个flink sql任务,并提交保存。
CREATE TABLE ck_rsc (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://172.16.1.220:8123',
    'database-name' = 'aaaaa',
    'table-name' = 'rsc',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3',
        'username'='default',
        'password'=''
);

CREATE TABLE source (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.1.221',
'port' = '3306',
'username' = 'root',
'password' = 'Hexin123.',
'database-name' = 'source',
'table-name' = 'source'
);

insert into ck_rsc(id,name) select id,name from source;


 数据库建表语句
mysql:
mysql> use source;
Database changed
mysql> show create table source;
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table  | Create Table                                                                                                                                                                  |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| source | CREATE TABLE `source` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)


clickhouse:

node220 :) use aaaaa;

USE aaaaa
Query id: ea03f620-d45b-4b0b-b649-250e929396fc
Ok.
0 rows in set. Elapsed: 0.003 sec.

node220 :) desc rsc;

DESCRIBE TABLE rsc

Query id: 9f85e006-4787-4644-90b4-31885bea372b


┌─name─┬─type─────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ id   │ Int32            │              │                    │         │                  │                │
│ name │ Nullable(String) │              │                    │         │                  │                │
└──────┴──────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘


2 rows in set. Elapsed: 0.021 sec. 

·        依次点击 开启配置 -> 提交任务


5. 观察运行结果

5.1 观察任务提交结果


点解日志详情,观察已提交任务的运行状况,如下图则表示任务提交成功,并且任务正在成功运行。


如果提交失败,请排查flink的log/下日志文件,和flink-streaming-platform-web的日志文件


5.2 观察数据变化

5.3 在flink的ui上观察任务

访问flink的UI地址 http://172.16.1.212:8081/#/overview
观察正在运行的任务





结束
整体就是这个样子,其实这些使用sql-client也可以实现,不过有了可视化页面,更方便人们操作,并且web也提供了很多其他的功能,并且也在长期迭代中,希望大家多多支持(还有我)
另外如果有问题,多看看日志。

下面给几个主要查资料的网站

Flink-streaming-platform-web
https://github.com/zhp8341/flink-streaming-platform-web

Flink相关jar的repo
https://repo1.maven.org/maven2/org/apache/flink/

Flink官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/

分割线
感谢打赏
江西数库信息技术有限公司
YWSOS.COM 平台代运维解决方案
 评论
 发表评论
姓   名:

Powered by AKCMS