Python基於Elasticsearch實現搜尋引擎
阿新 • • 發佈:2019-01-03
        ElasticSearch是一個基於Lucene的搜尋伺服器。它提供了一個分散式多使用者能力的全文搜尋引擎,基於RESTful Web介面.Elasticsearch是用Java開發的,並作為Apache許可條款的開放原始碼釋出,是當前流行的企業級搜尋引擎設計用於雲端計算中,能夠達到實時搜尋,穩定,可靠,快速,安裝使用方便。
1.準備工作
2.填充資料
要想製作一款搜尋引擎,首先資料庫裡面得有大量的資料,如果資料庫裡面都沒有資料,那這個搜尋引擎還能教搜尋引擎嗎?所以我們先來爬取大量的資料,這裡寫了一個小說網的爬蟲,以搜尋小說為例。
編寫model.py檔案,編寫完畢呼叫init函式,建立es索引的mapping
#coding:utf-8 from elasticsearch_dsl import DocType,Completion,Text,Boolean,Integer,Date from elasticsearch_dsl.connections import connections from elasticsearch_dsl.analysis import CustomAnalyzer # 1.建立個ES連線 connections.create_connection(hosts=['127.0.0.1']) # 3.自定義分詞器 class MyAnalyzer(CustomAnalyzer): def get_analysis_definition(self): return {} # 建立分析器物件 filter 忽略大小寫 ik_analyzer = MyAnalyzer('ik_max_word',filter=['lowercase']) # 2.建立資料Model class NovelModel(DocType): # 2.1普通欄位 title = Text(analyzer='ik_max_word') author = Text(analyzer='ik_max_word') classify = Text() rate = Text() collect = Integer() number = Text() time = Text() click_week = Integer() click_month = Integer() click_all = Integer() collect_week = Integer() collect_month = Integer() collect_all = Integer() abstract = Text() picture = Text() download_url = Text() # 2.2搜尋建議欄位 suggest = Completion(analyzer=ik_analyzer) # 2.3建立Meta class Meta: # index 索引名(資料庫) index = 'alldata' # doc_type 型別(表名稱) doc_type = 'novel' if __name__ == '__main__': NovelModel.init()
寫一個Pipeline來儲存資料
因為考慮到一個爬蟲專案可能不止一個爬蟲,每個爬蟲的Item又不一樣,所以在每一個Item類中來進行寫入儲存操作,然後每次當Item交給Pipeline來處理的時候,會根據不同的Item來進行不同的處理操作。
|-pipelines檔案 class ToEsPipeline(object): def process_item(self,item,spider): item.save_to_es() return item
編寫Item
import scrapy from elasticsearch_dsl.connections import connections from .es_model import NovelModel # 1.建立連線,獲得連線物件 es = connections.create_connection(hosts=['http://39.107.255.196']) # 3.處理搜尋意見分詞 def process_suggest(index,*args): ''' :param index: index 索引(資料庫) :param args: 需要進行分詞的內容 :return: 返回分詞之後的列表,不允許有重複的資料 ''' #建立一個空集合 use_words = set() #宣告搜尋建議分詞列表 suggest = [] for text,weight in args: # text 需要分詞的文字 # weight 權重 # 呼叫es的分詞analyzer介面進行分詞 words = es.indices.analyze( # es索引(資料庫) index = index, analyzer='ik_max_word', # 其他引數,顧慮器 params={ 'filter':['lowercase'], }, body={ 'text':text } ) # 列表生成式 並轉換set集合進行去重 analyzer_words = set([dic['token'] for dic in words['tokens']]) new_words = analyzer_words - use_words #把沒有重複的資料追加到列表 suggest.append({'input':list(new_words),'weight':weight}) use_words = analyzer_words return suggest # 2.處理Item class MyItem(scrapy.Item): novel_classify = scrapy.Field() novel_title = scrapy.Field() novel_author = scrapy.Field() novel_rate = scrapy.Field() novel_collect = scrapy.Field() novel_number = scrapy.Field() novel_time = scrapy.Field() click_all = scrapy.Field() click_month = scrapy.Field() click_week = scrapy.Field() collect_all = scrapy.Field() collect_month = scrapy.Field() collect_week = scrapy.Field() novel_abstract = scrapy.Field() novel_picture = scrapy.Field() novel_download = scrapy.Field() # 2.建立儲存方法 def save_to_es(self): # 2.1建立Novel資料Model物件 novel = NovelModel() # 2.2普通欄位賦值 novel.title = self['novel_title'] novel.author = self['novel_author'] novel.classify = self['novel_classify'] novel.rate = self['novel_rate'] novel.collect = self['novel_collect'] novel.number = self['novel_number'] novel.time = self['novel_time'] novel.click_week = self['click_week'] novel.click_month = self['click_month'] novel.click_all = self['click_all'] novel.collect_week = self['collect_week'] novel.collect_month = self['collect_month'] novel.collect_all = self['collect_all'] novel.bstract = self['novel_abstract'] novel.picture = self['novel_picture'] novel.download_url = self['novel_download'] # 2.3搜尋建議 novel.suggest = process_suggest(NovelModel._doc_type.index,(novel.title,10),(novel.author,8)) # 2.4儲存 novel.save()
3.Django專案
由於在Django專案中也會用到我們在scrapy爬蟲專案中的model.py檔案,所以複製一份到django專案中
import math
from redis import Redis
from urllib import parse
from datetime import datetime
from django.shortcuts import render, redirect
from django.http import JsonResponse
from elasticsearch_dsl.connections import connections
from .es_models.es_types import NovelModel
rds = Redis(host='127.0.0.1',port=6379)
es = connections.create_connection(hosts=['127.0.0.1'])
def index(request):
# 定義搜搜哦資料的型別
navs = [
{'type': 'novel', 'title': '小說'},
{'type': 'movie', 'title': '電影'},
{'type': 'job', 'title': '職位'},
{'type': 'news', 'title': '新聞'},
]
content = {
'navs': navs,
'search_type': 'novel'
}
if request.method == 'GET':
return render(request, 'index.html', content)
def result(request):
if request.method == 'GET':
# 取出關鍵詞,型別
keyword = request.GET.get('kw')
s_type = request.GET.get('s_type')
# 如果沒有頁碼引數,預設為1
page_num = request.GET.get('pn', 1)
# 如果沒有搜尋關鍵詞,重定向到主頁
if not keyword:
return redirect('index')
rds.zincrby('hotkey',keyword)
hot_top5 = rds.zrevrange('hotkey',0,5)
history = request.COOKIES.get('history',None)
cookie_str = ''
if history:
cookies = history.split(',')
if parse.quote(keyword) in cookies:
cookies.remove(parse.quote(keyword))
cookies.insert(0,parse.quote(keyword))
if len(cookies) > 5:
cookies.pop()
cookie_str = ','.join(cookies)
else:
cookies = []
cookie_str = parse.quote(keyword)
# 判斷搜尋型別
if s_type == 'novel':
# 1.搜尋的索引
index = 'alldata'
# 2.type名
doc_type = 'novel'
# 3.獲取資料欄位
fields = ['title', 'bstract']
start_time = datetime.now()
rs = es.search(
index=index,
doc_type=doc_type,
body={
"query": {
"multi_match": {
"query": keyword,
"fields": fields
}
},
"from": (int(page_num) - 1) * 10,
"size": 10,
'highlight': {
'pre_tags': ['<span class="keyWord">'],
"post_tags": ['</span>'],
"fields": {
"title": {},
"bstract": {}
}
}
}
)
use_time = (datetime.now() - start_time).total_seconds()
hits_list = []
for hit in rs['hits']['hits']:
h_dic = {}
if 'title' in hit['highlight'].keys():
h_dic['title'] = hit['highlight']['title'][0]
else:
h_dic['title'] = hit['_source']['title']
if 'bstract' in hit['highlight'].keys():
h_dic['abstract'] = hit['highlight']['bstract']
else:
h_dic['abstract'] = hit['_source']['bstract']
h_dic['detail_url'] = hit['_source']['download_url'][0]
hits_list.append(h_dic)
navs = [
{'type': 'novel', 'title': '部落格'},
{'type': 'job', 'title': '職位'},
{'type': 'movie', 'title': '電影'},
{'type': 'news', 'title': '新聞'},
]
# 總記錄條數
totle = rs['hits']['total']
# 頁數,向上取證
page_nums = math.ceil(totle / 10)
page_num = int(page_num)
if page_num - 4 <= 0:
pages = range(1, 11)
elif page_num + 5 >= page_nums:
pages = range(page_nums - 9, page_nums + 1)
else:
pages = range(page_num - 4, page_num + 6)
content = {
'hits': hits_list,
'kw': keyword,
'use_time': use_time,
'total': totle,
'page_nums': page_nums,
'navs': navs,
'search_type': s_type,
'pages': pages,
'history':[his for his in parse.unquote(cookie_str).split(',')],
'hot_top5':hot_top5
}
response = render(request,'result.html',content)
response.set_cookie('history',cookie_str)
return response
def suggest(request):
if request.method == 'GET':
# 取出搜尋內容、型別
s = request.GET.get('s', None)
s_type = request.GET.get('s_type')
content = {}
if s:
# 去ES中根據搜尋關鍵詞、搜尋型別
datas = get_suggest(s, s_type)
content['status'] = 0
content['datas'] = datas
content['s_type'] = s_type
if len(datas) == 0:
content['status'] = -1
else:
content['status'] = -1
return JsonResponse(content)
# 在es中搜索資料
def get_suggest(keyword, s_type):
'''
:param keyword: 搜尋關鍵詞
:param s_type: 搜尋型別
:return: 搜尋結果
'''
# 建立一個search物件用於搜尋
if s_type == 'novel':
search = NovelModel.search()
elif s_type == 'job':
pass
# suggest()獲取搜尋建議的介面
# 1.自定義搜尋結果對應的key
# 2.搜尋關鍵詞
result = search.suggest(
'r_suggest',
keyword,
completion={
'field': 'suggest',
'fuzzy': {
'fuzziness': 2
},
'size': 5
}
)
# s返回一個字典
s = result.execute_suggest()
fileds = {'novel': 'title'}
# 定義一個結果列表
datas = []
for dic in s['r_suggest'][0]['options']:
sug = dic._source[fileds[s_type]]
datas.append(sug)
# 返回搜尋建議
return datas