11、Elasticsearch For Hadoop


11.1、单机版Hadoop安装

11.2、ES-Hadoop安装

11.3、从 HDFS 到 Elasticsearch

11.4、从Elasticsearch 到 HDFS

-—————————————————–

11.1、单机版Hadoop安装

hadoop分为 单机模式、伪分布式模式、完全分布式模式

1、ssh免密登录

vi /etc/ssh/ssh_config

文件尾添加:

1
2
StrictHostKeyChecking no
UserKnownHostsFile /dev/null

2、hadoop下载安装

[root@localhost data]# mkdir -p /data/hadoop

[root@localhost data]# cd /data/hadoop

[root@localhost hadoop]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz

3、haoop 单机模式

[root@localhost hadoop]# tar -zxf hadoop-2.7.7.tar.gz

[root@localhost hadoop]# mkdir input

[root@localhost hadoop]# echo “hello world” > input/file1.txt

[root@localhost hadoop]# echo “hello hadoop” > input/file2.txt

[root@localhost hadoop]# ./hadoop-2.7.7/bin/hadoop jar ./hadoop-2.7.7/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount /data/hadoop/input/ /data/hadoop/output

[root@localhost hadoop]# cat output/part-r-00000

4、hadoop 伪分布式模式

[root@localhost hadoop]# vi hadoop-2.7.7/etc/hadoop/hadoop-env.sh

1
2
3
4
5
6
修改JAVA_HOME 为:
export JAVA_HOME=/data/tools/jdk1.8.0_65

export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"
改为:
export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc="

[root@localhost hadoop]# mkdir -p /data/hadoop/hdfs/tmp

[root@localhost hadoop]# vi hadoop-2.7.7/etc/hadoop/core-site.xml

1
2
3
4
5
6
7
8
9
10
11
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/data/hadoop/hdfs/tmp</value>
<description>A base for other temporary directories</description>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

hadoop.tmp.dir 可以自定义;

fs.default.name 保存了NameNode的位置,HDFS和MapReduce组件都需要用到它;

[root@localhost hadoop]# vi hadoop-2.7.7/etc/hadoop/mapred-site.xml.template

1
2
3
4
5
6
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9010</value>
</property>
<tconfiguration>

mapred.job.tracker 保存JobTracker的位置,只有MapReduce需要知道;

[root@localhost hadoop]# vi hadoop-2.7.7/etc/hadoop/hdfs-site.xml

配置HDFS数据库的复制次数:

1
2
3
4
5
6
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

以上就配置完了,接下来格式化 namenode:

[root@localhost hadoop]# ./hadoop-2.7.7/bin/hadoop namenode -format

启动hadoop:

[root@localhost hadoop]# ./hadoop-2.7.7/sbin/start-all.sh

启动成功;

添加hadoop到环境变量:

[root@localhost hadoop-2.7.7]# vi /etc/profile

1
2
export HADOOP_HOME=/data/hadoop/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

[root@localhost hadoop-2.7.7]# source /etc/profile

5、HDFS常用操作

开放 50070 端口

[root@localhost hadoop-2.7.7]# firewall-cmd –permanent –add-port=50070/tcp

success

[root@localhost hadoop-2.7.7]# firewall-cmd –reload

success

[root@localhost hadoop-2.7.7]# firewall-cmd –query-port=50070/tcp

yes

访问hdfs:http://172.18.1.51:50070/

[root@localhost hadoop]# hadoop fs -ls /

[root@localhost hadoop]# hadoop fs -mkdir /work

[root@localhost hadoop]# touch aa.txt

[root@localhost hadoop]# hadoop fs -put aa.txt /work

[root@localhost hadoop]# hadoop fs -test -e /work/aa.txt

[root@localhost hadoop]# echo $?

0

[root@localhost hadoop]# echo “hello hdfs” > aa.txt

[root@localhost hadoop]# hadoop fs -appendToFile aa.txt /work/aa.txt

[root@localhost hadoop]# hadoop fs -cat /work/aa.txt

hello hdfs

[root@localhost hadoop]# hadoop fs -rm /work/aa.txt

19/06/10 05:04:30 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.

Deleted /work/aa.txt

[root@localhost hadoop]# hadoop fs -rmr /work/a

rmr: DEPRECATED: Please use ‘rm -r’ instead.

rmr: `/work/a’: No such file or directory

[root@localhost hadoop]# hadoop dfsadmin -report

[root@localhost hadoop]#

11.2、引入ES-Hadoop依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.7</version>
</dependency>

11.3、从 HDFS 到 Elasticsearch

准备json文档,上传到HDFS中

vi blog.json

1
2
3
{"id":5,"title":"JavaScript高级程序设计","language":"javascript","author":"NicholasC.Zakas","price":66.4,"publish_time":"2012-03-02","desc":"JavaScript技术经典名著。"}
{"id":3,"title":"Python科学计算","language":"python","author":"张若愚","price":81.4,"publish_time":"2014-01-02","desc":"零基础学python,光盘中坐着度假开发winPython运行环境,涵盖了Python各个扩展库。"}
{"id":4,"title":"Python基础教程","language":"python","author":"Helant","price":54.5,"publish_time":"2014-03-02","desc":"经典的python入门教程,层次鲜明,结构严谨,内容详实。"}

[root@localhost hadoop]# hadoop fs -put blog.json /work

编写代码,从 hdfs 读取数据,写入到 Elasticsearch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.learn.eshdoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

import java.io.IOException;

/**
* 读取 HDFS 上的内容然后写入 Elasticsearch
*/
public class HdfsToEs {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("es.nodes", "10.10.139.42");
conf.set("es.port", "9200");
conf.set("es.nodes.wan.only", "true");
conf.set("es.resource", "blog/_doc");
conf.set("es.mapping.id", "id");
conf.set("es.input.json", "yes");
Job job = Job.getInstance(conf, "EmrToES");
job.setJarByClass(HdfsToEs.class);
job.setMapperClass(MyMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

public static class MyMapper extends Mapper<Object, Text, NullWritable, Text> {
private Text line = new Text();

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (value.getLength() > 0) {
line.set(value);
context.write(NullWritable.get(), line);
}
}
}
}

pom中指定主类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.learn.eshdoop.HdfsToEs</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.learn.eshdoop.HdfsToEs</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

运行:mvn clean package 生成对应的jar 包

[root@localhost hadoop]# mv es-hadoop-learn-1.0-SNAPSHOT-jar-with-dependencies.jar HdfsToEs.jar

[root@localhost hadoop]# hadoop jar HdfsToEs.jar /work/blog.json

1566720615487

在Kibana 中 查看

GET blog/_mapping

GET blog/_search

11.4、从Elasticsearch 到 HDFS

1、将索引 blog 保存到 hdfs

添加Java类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.learn.eshdoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsInputFormat;

import java.io.IOException;

/**
* 查询 Elasticsearch 索引然后将结果写入 HDFS
*/
public class EsToHdfs {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set("es.nodes", "10.10.139.42");
conf.set("es.port", "9200");
conf.set("es.resource", "blog/_doc");
conf.set("es.output.json", "true");
Job job = Job.getInstance(conf, "hadoop es write test");
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(EsInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
job.waitForCompletion(true);
}

public static class MyMapper extends Mapper<Writable, Writable, NullWritable, Text> {
@Override
protected void map(Writable key, Writable value, Context context) throws IOException, InterruptedException {
Text text = new Text();
text.set(value.toString());
context.write(NullWritable.get(), text);
}
}

}

修改pom.xml中的主类为:com.learn.eshdoop.EsToHdfs。

mvn clean package 重新打包

上传以后执行:

[root@localhost hadoop]# mv es-hadoop-learn-1.0-SNAPSHOT-jar-with-dependencies.jar EsIndexToHdfs.jar

[root@localhost hadoop]# hadoop jar EsIndexToHdfs.jar /work/blog_mapping

[root@localhost hadoop]# hadoop fs -cat /work/blog_mapping/part-r-00000

2、将带条件查询 blog 的 数据 保存到 hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.learn.eshdoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsInputFormat;

import java.io.IOException;

/**
* 查询 Elasticsearch 数据 然后将结果写入 HDFS
*/
public class EsQueryToHdfs {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set("es.nodes", "10.10.139.42");
conf.set("es.port", "9200");
conf.set("es.resource", "blog/_doc");
conf.set("es.output.json", "true");
conf.set("es.query", "?q=title:python");
Job job = Job.getInstance(conf, "query es to HDFS");
job.setMapperClass(EsToHdfs.MyMapper.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(EsInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
job.waitForCompletion(true);
}

public static class MyMapper extends Mapper<Writable, Writable, NullWritable, Text> {
@Override
protected void map(Writable key, Writable value, Context context) throws IOException, InterruptedException {
Text text = new Text();
text.set(value.toString());
context.write(NullWritable.get(), text);
}
}
}

在pom.xml中修改主类名称为:com.learn.eshdoop.EsQueryToHdfs;

mvn clean package 重新打包,然后上传;

[root@localhost hadoop]# mv es-hadoop-learn-1.0-SNAPSHOT-jar-with-dependencies.jar EsQueryToHdfs.jar

[root@localhost hadoop]# hadoop jar EsQueryToHdfs.jar /work/EsQueryToHdfs

[root@localhost hadoop]# hadoop fs -cat /work/EsQueryToHdfs/part-r-00000