一、需求说明
登录日志放在mysql里 ==> 现在需要将 mysql中的数据 转换格式以后写入kafka ==> 数据中心 从 kafka里取数据进行 日志分析和监管审查;
首先考虑有flume读取数据,但是在做格式转换时,需要重写 source 和 interceptor,得不偿失,还不如用flink。正好 阿里发布了全新的 flink 1.9,尝鲜使用;
二、环境配置
- flink-1.9.0(The only requirements are working Maven 3.0.4 (or higher) and Java 8.x installations.)
- mysql-5.6
- kafka_2.11-0.11
1、安装 jdk 1.8+
2、下载启动flink-1.9
下载地址: https://flink.apache.org/zh/downloads.html
1 | wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz |
验证启动成功:
jps -l
netstat -nltp | grep 8081
访问管理页面: http://172.18.1.51:8081
三、创建 Flink工程
1 | curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0 |
改造后工程结构如下:
1、定义 Mysql Datasource
1.1、在上述工程的 pom.xml 中添加 mysql 依赖
1 | <dependency> |
1.2、定义 CCustLoginHisSource 读取 mysql
1 | package com.jzsec.tzdslog; |
1.3、写入kafka
1 | import org.apache.flink.configuration.Configuration; |
1.4、写入文本文件
1 | import org.apache.flink.configuration.Configuration; |
1.5、定义Job
1 | import com.alibaba.fastjson.JSON; |
2、编译
mvn clean package