10、ES新闻搜索实战


10.0、本文前提

10.1、从新浪采集新闻存储到mysql

10.2、将mysql数据 同步到 Elasticsearch

10.3、从Elasticsearch中搜索数据,返回给前端

10.4、ES新闻搜索实战 —— 查询界面

10.5、ES新闻搜索实战 —— 结果展示

-————————————————–

项目仓库:https://gitee.com/carloz/elastic-learn.git

具体地址:https://gitee.com/carloz/elastic-learn/tree/master/elasticsearch-news

-————————————————–

本文前提

已经部署好了ELK系统

10.1、从新浪采集新闻存储到mysql

http://finance.sina.com.cn/7x24/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 新浪新闻采集脚本

### 1、使用python 2.7 解析器

配置环境变量,path下添加:
D:\python\tools\Python27;D:\python\tools\Python27\Scripts;

安装依赖模块:
pip install requests
https://sourceforge.net/projects/mysql-python/
下载64位版本的mysql: http://www.codegood.com/download/11/



### 2、新建数据库

CREATE DATABASE `sina_news`;
CREATE TABLE `news` (
`id` int(11) NOT NULL,
`news_type` enum('其他','央行','观点','市场','数据','公司','行业','宏观','A股') NOT NULL,
`create_time` datetime DEFAULT NULL,
`rich_text` text,
PRIMARY KEY (`id`)
);



### 3、运行,即可在数据库里看到数据

python数据采集脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import string
import json
import time
import requests
import MySQLdb
import random
import re

conn = MySQLdb.connect("10.10.87.38", "crm", "crm@2015", "sina_news", charset='utf8')
cursor = conn.cursor()
template_url = string.Template(
'http://zhibo.sina.com.cn/api/zhibo/feed?callback=jQuery$jQueryId&page=1&page_size=$page_size&zhibo_id=152&tag_id=$tag_id&dire=f&dpc=1&pagesize=$page_size&_=$datetime')
tag_ids = {u'A股': 10, u'宏观': 1, u'行业': 2, u'公司': 3, u'数据': 4, u'市场': 5, u'观点': 6, u'央行': 7, u'其他': 8,}
headers = {
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36',
'Accept': '*/*',
# 'Referer' : 'http://finance.sina.com.cn/7x24/?tag=10',
'Connection': 'keep-alive',
'Cookie': 'U_TRS1=000000d5.5a546170.5cb7de02.83b7c4e0; U_TRS2=000000d5.5a616170.5cb7de02.256bb0da; UOR=www.baidu.com,blog.sina.com.cn,; SINAGLOBAL=114.114.114.114_1555553794.454804; Apache=114.114.114.213_1555553794.454805; ULV=1555553794485:1:1:1:114.114.114.114_1555553794.454805:; SCF=AhOLahPmRlTviyZ4YQHaxRNdunCqZL3kO2SBnELkwjeVg8ZMdSXgud0IsBd4CaJIt5s-9YmaaRxgNVK4w6koPXE.; ULOGIN_IMG=gz-d89f6db983d2c25da42c59504991a4867f53; sso_info=v02m6alo5qztLSNk4S5jJOQs46TnKadlqWkj5OEuI6DnLCOg4y1jbOMwA==; SUBP=0033WrSXqPxfM725Ws9jqgMF55529P9D9WhM3uQ2UWBVDQgNwIoS4aG35NHD95Qp1hnNehn0SKM0Ws4Dqcjci--Xi-zRiKn7i--fiKysi-8Wi--fi-2Xi-2Ni--RiK.7iKyhi--fiKnfiK.Xi--fi-82iK.7; _s_upa=5; lxlrttp=1556243090; NEWSCENTER=522b9f1b6a2f61766931ac50242bed94; SUB=_2A25x5Y0UDeRhGedG41UR-C3JzD-IHXVSkvncrDV_PUNbm9BeLWitkW9NUOtWwD6pEwnHVFGGf0Y42aAcKr49dHwM; ALF=1589850308',
}

for news_type, tag_id in tag_ids.items():
headers['Referer'] = 'http://finance.sina.com.cn/7x24/?tag=%s' % random.choice(tag_ids.values())
datetime = int(1000 * time.time())
crawlurl = template_url.substitute(datetime=datetime, jQueryId="111207214587420816325_%s" % datetime, tag_id=tag_id,
page_size=20)
try:
text = requests.get(crawlurl, timeout=2, headers=headers).text
news = json.loads(re.sub('^try[^\(]*\(|\);}catch\(e\){};$', '', text))['result']['data']['feed']['list']
except Exception, e:
print str(e)
continue

for data in news:
unique_id = data['id']
rich_text = data['rich_text']
create_time = data['create_time']
try:
mysql_command = u"insert into sina_news.news (id,news_type,create_time,rich_text) values ('%s','%s','%s','%s')" % (unique_id, news_type, create_time, rich_text)
mysql_command += u" on duplicate key update news_type='%s', create_time='%s',rich_text='%s';" % (news_type, create_time, rich_text)
cursor.execute(mysql_command)
conn.commit()
except Exception, e:
print mysql_command
print str(e)
pass

采集到的数据:

10.2、将mysql数据 同步到 Elasticsearch

在Elasticsearch中新建索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PUT sina_news
{
"mappings": {
"properties": {
"id": {
"type": "integer"
},
"news_type": {
"type": "keyword"
},
"create_time": {
"type": "date"
},
"rich_text": {
"type": "text",
"analyzer": "ik_smart"
}
}
}
}

同步可选技术:

本次只是一个示例,而且需求里也没有实时同步删除数据,采用官方软件:logstash-input-jdbc;

https://github.com/logstash-plugins/logstash-input-jdbc

https://github.com/logstash-plugins/logstash-input-jdbc/releases

给Logstash安装jdbc插件:

cd /data/carloz/tools/logstash-7.0.0/

./bin/logstash-plugin install logstash-input-jdbc

上传jdbc包:

[root@10-10-139-42 logstash-7.0.0]# mkdir -p mylib

[root@10-10-139-42 mylib]# cd /data/carloz/tools/logstash-7.0.0/mylib

上传mysql-connector-java-8.0.15.jar到该目录:

设置配置文件:

[root@10-10-139-42 myconf]# vi sina_news.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
input {
jdbc {
jdbc_driver_library => "/data/carloz/tools/logstash-7.0.0/mylib/mysql-connector-java-8.0.15.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.10.87.38:3306/sina_news"
jdbc_user => "crm"
jdbc_password => "crm@2015"
schedule => "* * * * *"
statement => "select * from news where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
}
}
output {
elasticsearch {
index => "sina_news"
hosts => "localhost:9200"
}
}

开始同步:

[root@10-10-139-42 logstash-7.0.0]# ./bin/logstash -f myconf/sina_news.conf &

GET sina_news/_search

至此,数据上传成功,并且程序持续监听中。

查看数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
GET sina_news/_search
{
"size": 0,
"aggs": {
"news_stats": {
"stats": {
"field": "id"
}
}
}
}

GET sina_news/_search
{
"sort": [
{
"id": {
"order": "desc"
}
}
],
"from": 0,
"size": 20
}

10.3、从Elasticsearch中搜索数据,返回给前端

工程结构如图:

使用springboot开发,在pom.xml中键入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.learn</groupId>
<artifactId>elasticsearch-news</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>elasticsearch-news</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.0.0</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

搜索入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @param keywords
* @return 搜索结果页面
*/
@RequestMapping("/search")
public ModelAndView searchFile(String keywords, @Nullable Integer from, @Nullable Integer size) {
ArrayList<NewsModel> hitsList = null;
try {
hitsList = mySearchService.searchSinaNews(keywords, from, size);
} catch (Exception e) {
e.printStackTrace();
}
log.info(keywords + ":共搜到:" + hitsList.size() + " 条数据!");

ModelAndView mv = new ModelAndView("result.html");
mv.addObject("keywords", keywords);
mv.addObject("resultList", hitsList);
return mv;
}

搜索核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.learn.elasticsearchnews.service;

import com.learn.elasticsearchnews.model.NewsModel;
import com.learn.elasticsearchnews.utils.EsUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Service
public class MySearchService {

public ArrayList<NewsModel> searchSinaNews(String keywords, Integer from, Integer size) throws Exception {
if (null == from) from = 0;
if (null == size) size = 10;
RestHighLevelClient client = EsUtils.getClient();

String index = "sina_news";
String field1 = "news_type";
String field2 = "rich_text";
MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(keywords, field1, field2)
.fuzziness(Fuzziness.AUTO);
HighlightBuilder highlightBuilder = new HighlightBuilder()
.preTags("<span style=\"color:red\">")
.postTags("</span>")
.field(field1)
.field(field2);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(multiMatchQueryBuilder);
searchSourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.DESC));
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
searchSourceBuilder.timeout(new TimeValue(5, TimeUnit.SECONDS));
searchSourceBuilder.highlighter(highlightBuilder);

SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.source(searchSourceBuilder);

SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
ArrayList<NewsModel> resultList = new ArrayList<>();
if(searchResponse.status() == RestStatus.OK) {
SearchHits searchHits = searchResponse.getHits();
for (SearchHit hit : searchHits) {
Map<String, Object> resMap =hit.getSourceAsMap();
Map<String, NewsModel> newsMap = new HashMap<>();
NewsModel news = new NewsModel(
Integer.valueOf(resMap.get("id").toString()),
resMap.get("news_type").toString(),
resMap.get("rich_text").toString(),
resMap.get("create_time").toString());
resultList.add(news);
}
}
return resultList;
}
}

10.4、ES新闻搜索实战 —— 查询界面

在浏览器中访问:http://localhost:18080/

10.5、ES新闻搜索实战 —— 结果展示

在搜索框中搜索“美国”, 因为只是demo,就不做分页了,大家需要自己做即可