一、搭建可运行的flume环境
0、下载 apache-flume-1.9.0-bin
1、下载 mysql 依赖包
1、下载 mysql-connector-java-5.1.18.jar 放在 apache-flume-1.9.0-bin\lib 下
2、下载 flume-ng-sql-source
https://github.com/keedio/flume-ng-sql-source/releases
mvn package
把生成的 flume-ng-sql-source-1.5.2.jar 放在 apache-flume-1.9.0-bin\lib 下
2、创建kafka topic
1 | ./kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic tzds-login |
3、配置 flume
新建 conf/mysql2kafka.conf
1 | a1.channels=ch-1 |
4、启动flume agent
linux:
1 | ./bin/flume-ng agent -n a1 -c conf -f conf/mysql2kafka.conf -Dflume.root.logger=INFO,console |
windows:
1 | bin\flume-ng agent -n a1 -c conf -f conf\mysql2kafka.conf |
5、查看 kafka topic
1 | docker exec -it b6276c6648c2 /bin/bash |
二、自定义Interceptor流程
- 搭建flume开发环境(巧妇难为无米之炊,你没开发环境怎么玩,程序都不知道你写的类是个啥)
- 新建一个类,实现Interceptor接口,重写intercept(Event event)方法
- 新建一个类,实现Interceptor.Builder接口,重写configure(Context context)和build()方法
- 打成jar包放到flume的lib目录下
- 编写相应的flume.conf文件,将type值使用类的全限定名指定我们的拦截器。如果有自定义属性,需要配置该自定义属性。
三、搭建开发环境
1、新建一个maven工程,在 pom.xml 中引入如下依赖:
1 | <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
2、自定义 Interceptor 和 Builder
1 | public class TzdsLoginInterceptor implements Interceptor { |
1 | import org.apache.flume.Context; |
3、打包部署
mvn clean package
将 flume-custom-interceptor-1.0.jar 部署到 apache-flume-1.9.0-bin\lib 下
4、在 mysql2kafka.conf 中配置 interceptors
1 | a1.sources.src-1.interceptors=i1 |
5、启动flume agent
linux:
1 | ./bin/flume-ng agent -n a1 -c conf -f conf/mysql2kafka.conf -Dflume.root.logger=INFO,console |
windows:
1 | bin\flume-ng agent -n a1 -c conf -f conf\mysql2kafka.conf |