flume自定义拦截器 + mysql增量同步到kafka



一、搭建可运行的flume环境

0、下载 apache-flume-1.9.0-bin


1、下载 mysql 依赖包

1、下载 mysql-connector-java-5.1.18.jar 放在 apache-flume-1.9.0-bin\lib 下

2、下载 flume-ng-sql-source

https://github.com/keedio/flume-ng-sql-source/releases

mvn package

把生成的 flume-ng-sql-source-1.5.2.jar 放在 apache-flume-1.9.0-bin\lib 下


2、创建kafka topic

1
./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic tzds-login

3、配置 flume

新建 conf/mysql2kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
a1.channels=ch-1
a1.sources=src-1
a1.sinks=k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type=org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url=jdbc:mysql://10.10.209.27:3306/crm

# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user=crm
a1.sources.src-1.hibernate.connection.password=crm@2015
a1.sources.src-1.hibernate.connection.autocommit=true
a1.sources.src-1.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class=com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path=./
a1.sources.src-1.status.file.name=sqlsource.status
#sqlsource.status文件中记录了增量字段的值 $@$

# Custom query="你的SQL语句"
a1.sources.src-1.start.from=0
a1.sources.src-1.custom.query=select * from crm.c_cust_login_his_2019 where id > $@$ order by id asc
a1.sources.src-1.batch.size=1000
a1.sources.src-1.max.rows=1000

a1.sources.src-1.hibernate.connection.provider_class=org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

a1.sources.src-1.interceptors=i1
a1.sources.src-1.interceptors.i1.type=com.jzsec.flume.interceptor.TzdsLoginBuilder

##############################
a1.channels.ch-1.type=memory
a1.channels.ch-1.capacity=10000
a1.channels.ch-1.transactionCapacity=10000
a1.channels.ch-1.byteCapacityBufferPercentage=20
a1.channels.ch-1.byteCapacity=800000

#Kafka sink配置 a1.sinks.k1.topic=你的topic名字
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic=tzds-login
a1.sinks.k1.brokerList=172.18.1.51:9092
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20

# combination a1.sources.src-1.channels不要漏掉s
a1.sinks.k1.channel=ch-1
a1.sources.src-1.channels=ch-1

4、启动flume agent

linux:

1
./bin/flume-ng agent -n a1 -c conf -f conf/mysql2kafka.conf -Dflume.root.logger=INFO,console

windows:

1
bin\flume-ng agent -n a1 -c conf -f conf\mysql2kafka.conf

5、查看 kafka topic

1
2
3
4
docker exec -it b6276c6648c2 /bin/bash
cd /opt/kafka/bin/
./kafka-console-consumer.sh --bootstrap-server 172.18.1.51:9092 --topic tzds-login --from-beginning
./kafka-console-consumer.sh --bootstrap-server 172.18.1.51:9092 --topic tzds-login

二、自定义Interceptor流程

  1. 搭建flume开发环境(巧妇难为无米之炊,你没开发环境怎么玩,程序都不知道你写的类是个啥)
  2. 新建一个类,实现Interceptor接口,重写intercept(Event event)方法
  3. 新建一个类,实现Interceptor.Builder接口,重写configure(Context context)和build()方法
  4. 打成jar包放到flume的lib目录下
  5. 编写相应的flume.conf文件,将type值使用类的全限定名指定我们的拦截器。如果有自定义属性,需要配置该自定义属性。

三、搭建开发环境


1、新建一个maven工程,在 pom.xml 中引入如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jzsec</groupId>
<artifactId>flume-custom-interceptor</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
</plugin>

</plugins>
</build>

</project>

2、自定义 Interceptor 和 Builder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TzdsLoginInterceptor implements Interceptor {

public void initialize() {}

public Event intercept(Event event) {
//获得body的内容
Map<String, String> eventHeader = event.getHeaders();
String eventBody = new String(event.getBody(), Charsets.UTF_8);
String fmt = "%s, %s";
try {
eventBody = procLog(eventBody);
} catch (ParseException e) {
eventBody = "";
e.printStackTrace();
}
event.setBody(String.format(fmt, JSONObject.toJSONString(eventHeader), eventBody).getBytes());
return event;
}

public List<Event> intercept(List<Event> list) {
for (Event event : list) {
intercept(event);
}
return list;
}

public void close() {}
}
1
2
3
4
5
6
7
8
9
10
11
import org.apache.flume.Context;
import org.apache.flume.interceptor.Interceptor;

public class TzdsLoginBuilder implements Interceptor.Builder {

public void configure(Context context) {}

public Interceptor build() {
return new TzdsLoginInterceptor();
}
}

3、打包部署

mvn clean package

将 flume-custom-interceptor-1.0.jar 部署到 apache-flume-1.9.0-bin\lib 下


4、在 mysql2kafka.conf 中配置 interceptors

1
2
a1.sources.src-1.interceptors=i1
a1.sources.src-1.interceptors.i1.type=com.jzsec.flume.interceptor.TzdsLoginInterceptor

5、启动flume agent

linux:

1
./bin/flume-ng agent -n a1 -c conf -f conf/mysql2kafka.conf -Dflume.root.logger=INFO,console

windows:

1
bin\flume-ng agent -n a1 -c conf -f conf\mysql2kafka.conf

6、查看 kafka topic