flink 从mysql抽取写入kafka和文本文件



一、需求说明

登录日志放在mysql里 ==> 现在需要将 mysql中的数据 转换格式以后写入kafka ==> 数据中心 从 kafka里取数据进行 日志分析和监管审查;

首先考虑有flume读取数据,但是在做格式转换时,需要重写 source 和 interceptor,得不偿失,还不如用flink。正好 阿里发布了全新的 flink 1.9,尝鲜使用;


二、环境配置

  • flink-1.9.0(The only requirements are working Maven 3.0.4 (or higher) and Java 8.x installations.)
  • mysql-5.6
  • kafka_2.11-0.11

1、安装 jdk 1.8+

2、下载启动flink-1.9

下载地址: https://flink.apache.org/zh/downloads.html

说明文档: https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html

1
2
3
4
5
6
7
8
9
10
11
wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
tar zxf flink-1.9.1-bin-scala_2.11.tgz
cd flink-1.9.1
# 启动flink(Standalone形式)
/data/flink/flink-1.9.1/bin/start-cluster.sh
# 停止flink
/data/flink/flink-1.9.1/bin/stop-cluster.sh
开启端口
firewall-cmd --permanent --add-port=8081/tcp
firewall-cmd --reload
firewall-cmd --query-port 8081/tcp

验证启动成功:

jps -l

netstat -nltp | grep 8081

访问管理页面: http://172.18.1.51:8081


三、创建 Flink工程

官网文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/projectsetup/java_api_quickstart.html#requirements

1
2
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.0
mv quickstart/ flink-log-process

改造后工程结构如下:

1、定义 Mysql Datasource

1.1、在上述工程的 pom.xml 中添加 mysql 依赖

1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>

1.2、定义 CCustLoginHisSource 读取 mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.jzsec.tzdslog;


import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.List;

public class CCustLoginHisSource extends RichSourceFunction<CCustLoginHisEntity> implements CheckpointedFunction {
private PreparedStatement ps;
private Connection connection;
private String url, username, password;
private int year;
private Date beginDate, endDate;

public CCustLoginHisSource(String url, String username, String password, LocalDate localDate) {
this.url = url;
this.username = username;
this.password = password;

year = localDate.getYear();
beginDate = new java.sql.Date(localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
endDate = new java.sql.Date(localDate.plusDays(1).atStartOfDay().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
}

private Connection getConnection() {
Connection conn = null;
try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(url, username, password);
} catch (Exception e) {
System.out.println("-----------mysql getConnection() has exception , msg = " + e.getMessage());
}
return conn;
}

@Override
public void open(Configuration parameters) throws Exception {
// 增量抽取数据
super.open(parameters);
connection = getConnection();
String sql = "select * from db.tablename_" + year + " where create_time between ? and ?;";
ps = this.connection.prepareStatement(sql);
ps.setDate(1, beginDate);
ps.setDate(2, endDate);
}

@Override
public void close() throws Exception {
super.close();
if (null != connection) {
connection.close();
}
if (null != ps) {
ps.close();
}
}

@Override
public void cancel() {}

@Override
public void run(SourceContext<CCustLoginHisEntity> sourceContext) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
CCustLoginHisEntity entity = new CCustLoginHisEntity();
entity.setId(resultSet.getInt("id"));
entity.setEnv_type(resultSet.getString("env_type"));
entity.setApp_type(resultSet.getString("app_type"));
entity.setUser_ip(resultSet.getString("user_ip"));
entity.setUser_agent(resultSet.getString("user_agent"));
entity.setIp_addr(resultSet.getString("ip_addr"));
entity.setMac(resultSet.getString("mac"));
entity.setPhone(resultSet.getString("phone"));
entity.setImei(resultSet.getString("imei"));
entity.setRequest_time(resultSet.getDate("request_time"));
entity.setCreate_time(resultSet.getDate("create_time"));

lastIndex = entity.getId();
sourceContext.collect(entity);
}
}
}

1.3、写入kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class SinkToKafka011 extends RichSinkFunction<String> {

private Properties props;
private String topic;
private KafkaProducer<Integer, byte[]> producerB;
private AtomicInteger messageNo = new AtomicInteger(0);

public SinkToKafka011(Properties props, String topic) {
this.props = props;
this.topic = topic;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
producerB = new KafkaProducer(props);
}

@Override
public void invoke(String value, Context context) {
try {
producerB.send(new ProducerRecord(
topic,
messageNo.getAndIncrement(),
value.getBytes()))
.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

@Override
public void close() throws Exception {
if (null != producerB) producerB.close();
super.close();
}
}

1.4、写入文本文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;

public class SinkToTextFile extends RichSinkFunction<String> {
private String filepath;
private String filename;
private FileOutputStream fos;
private OutputStreamWriter osw;
private BufferedWriter bufferedWriter;

public SinkToTextFile(String filepath, String filename) {
this.filepath = filepath;
this.filename = filename;
File dir = new File(filepath);
if (!dir.isDirectory()) {
dir.mkdir();
}
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
File file = new File(filepath);
if (!file.isDirectory()) {
file.mkdir();
}
fos = new FileOutputStream(new File(filepath + "/" + filename), true);
osw = new OutputStreamWriter(fos);
bufferedWriter = new BufferedWriter(osw);
}

@Override
public void close() throws Exception {
if (bufferedWriter != null) bufferedWriter.close();
if (osw != null) osw.close();
if (fos != null) fos.close();
super.close();
}

@Override
public void invoke(String value, Context context) throws Exception {
bufferedWriter.write(value);
}
}

1.5、定义Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class TzdsLoginLogProcessJob {

private static ObjectMapper mapper = new ObjectMapper();

public static void main(String[] args) throws Exception {

final ParameterTool argsParamTool = ParameterTool.fromArgs(args);
// --logdate 2019-10-15
String logdate = argsParamTool.get("logdate", "2019-10-15");
final String propertiesFile = argsParamTool.get("conf", "conf.properties");
final ParameterTool propParamTool = ParameterTool.fromPropertiesFile(propertiesFile);

// 创建上下文
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(propParamTool);

DataStreamSource<CCustLoginHisEntity> dataStreamSource = env.addSource(
new CCustLoginHisSource(
propParamTool.get("mysql.jdbc.url"),
propParamTool.get("mysql.jdbc.username"),
propParamTool.get("mysql.jdbc.password"),
LocalDate.parse(logdate))
);
SingleOutputStreamOperator<String> logStream = dataStreamSource.map(entity -> {
try {
LogModel log = new LogModel();
log.setAppType("accessLog");
log.setCid("0x000000000001");
log.setMid("0x000000000002");
log.setName("tagname");
log.setIp(entity.getUser_ip());
log.setIndicator(entity2Map(entity));
log.setTime_stamp(Long.toString(entity.getRequest_time().getTime()));
return JSONObject.toJSONString(log) + "\n";
} catch (Exception e) {
return "";
}
});
SingleOutputStreamOperator<String> unifyLogStream = dataStreamSource.map(entity -> {
try {
Map unifyLog = new HashMap();
initLoginLog(unifyLog);
procLoginLog(unifyLog, entity2Map(entity), Long.toString(entity.getRequest_time().getTime()));
return JSONObject.toJSONString(unifyLog) + "\n";
} catch (Exception e) {
return "";
}
});
// 写入日志
logStream.addSink(new SinkToTextFile(
propParamTool.get("log.filepath") + "/" + logdate,
"login_log_" + logdate.replace("-", "") + ".log"
));
// 写入kafka
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, propParamTool.get("kafka.bootstrap.servers"));
kafkaProps.put("client.id", propParamTool.get("kafka.client.id"));
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
String topic = propParamTool.get("kafka.topic.login-log");
unifyLogStream.addSink(new SinkToKafka011(kafkaProps, topic)).name("sink-to-kafka").setParallelism(1);

env.execute();
}
}

2、编译

mvn clean package