1. 程式人生 > >python的強大果然是名副其實,製作一個大資料搜尋引擎跟玩一樣!

python的強大果然是名副其實,製作一個大資料搜尋引擎跟玩一樣!

 

搜尋是大資料領域裡常見的需求。Splunk和ELK分別是該領域在非開源和開源領域裡的領導者。本文利用很少的Python程式碼實現了一個基本的資料搜尋功能,試圖讓大家理解大資料搜尋的基本原理。

學習Python中有不明白推薦加入交流群
                號:960410445
                群裡有志同道合的小夥伴,互幫互助,
                群裡有不錯的視訊學習教程和PDF!

布隆過濾器 (Bloom Filter)

第一步我們先要實現一個布隆過濾器。

布隆過濾器是大資料領域的一個常見演算法,它的目的是過濾掉那些不是目標的元素。也就是說如果一個要搜尋的詞並不存在與我的資料中,那麼它可以以很快的速度返回目標不存在。

讓我們看看以下布隆過濾器的程式碼:

 
class Bloomfilter(object): 2 """ 3 A Bloom filter is a probabilistic data-structure that trades space for accuracy 4 when determining if a value is in a set. It can tell you if a value was possibly 5 added, or if it was definitely not added, but it can't tell you for certain that 6 it was added. 7 """ 8 def __init__(self, size): 9 """Setup the BF with the appropriate size""" 10 self.values = [False] * size 11 self.size = size 12 13 def hash_value(self, value): 14 """Hash the value provided and scale it to fit the BF size""" 15 return hash(value) % self.size 16 17 def add_value(self, value): 18 """Add a value to the BF""" 19 h = self.hash_value(value) 20 self.values[h] = True 21 22 def might_contain(self, value): 23 """Check if the value might be in the BF""" 24 h = self.hash_value(value) 25 return self.values[h] 26 27 def print_contents(self): 28 """Dump the contents of the BF for debugging purposes""" 29 print self.values 

1基本的資料結構是個陣列(實際上是個點陣圖,用1/0來記錄資料是否存在),初始化是沒有任何內容,所以全部置False。實際的使用當中,該陣列的長度是非常大的,以保證效率。

2利用雜湊演算法來決定資料應該存在哪一位,也就是陣列的索引

3當一個數據被加入到布隆過濾器的時候,計算它的雜湊值然後把相應的位置為True

4當檢查一個數據是否已經存在或者說被索引過的時候,只要檢查對應的雜湊值所在的位的True/Fasle

看到這裡,大家應該可以看出,如果布隆過濾器返回False,那麼資料一定是沒有索引過的,然而如果返回True,那也不能說資料一定就已經被索引過。在搜尋過程中使用布隆過濾器可以使得很多沒有命中的搜尋提前返回來提高效率。

我們看看這段 code是如何執行的:

 
1bf = Bloomfilter(10) 2bf.add_value('dog') 3bf.add_value('fish') 4bf.add_value('cat') 5bf.print_contents() 6bf.add_value('bird') 7bf.print_contents() 8# Note: contents are unchanged after adding bird - it collides 9for term in ['dog', 'fish', 'cat', 'bird', 'duck', 'emu']: 10 print '{}: {} {}'.format(term, bf.hash_value(term), bf.might_contain(term)) 11 12結果: 13 14[False, False, False, False, True, True, False, False, False, True] 15[False, False, False, False, True, True, False, False, False, True] 16dog: 5 True 17fish: 4 True 18cat: 9 True 19bird: 9 True 20duck: 5 True 21emu: 8 False 

首先建立了一個容量為10的的布隆過濾器

 

然後分別加入 ‘dog’,‘fish’,‘cat’三個物件,這時的布隆過濾器的內容如下:

 

然後加入‘bird’物件,布隆過濾器的內容並沒有改變,因為‘bird’和‘fish’恰好擁有相同的雜湊。

 

最後我們檢查一堆物件('dog', 'fish', 'cat', 'bird', 'duck', 'emu')是不是已經被索引了。結果發現‘duck’返回True,2而‘emu’返回False。因為‘duck’的雜湊恰好和‘dog’是一樣的。

 

分詞

下面一步我們要實現分詞。 分詞的目的是要把我們的文字資料分割成可搜尋的最小單元,也就是詞。這裡我們主要針對英語,因為中文的分詞涉及到自然語言處理,比較複雜,而英文基本只要用標點符號就好了。

下面我們看看分詞的程式碼:

 
1def major_segments(s): 2 """ 3 Perform major segmenting on a string. Split the string by all of the major 4 breaks, and return the set of everything found. The breaks in this implementation 5 are single characters, but in Splunk proper they can be multiple characters. 6 A set is used because ordering doesn't matter, and duplicates are bad. 7 """ 8 major_breaks = ' ' 9 last = -1 10 results = set() 11 12 # enumerate() will give us (0, s[0]), (1, s[1]), ... 13 for idx, ch in enumerate(s): 14 if ch in major_breaks: 15 segment = s[last+1:idx] 16 results.add(segment) 17 18 last = idx 19 20 # The last character may not be a break so always capture 21 # the last segment (which may end up being "", but yolo)  22 segment = s[last+1:] 23 results.add(segment) 24 25 return results 

主要分割

主要分割使用空格來分詞,實際的分詞邏輯中,還會有其它的分隔符。例如Splunk的預設分割符包括以下這些,使用者也可以定義自己的分割符。

 
1] < > ( ) { } | ! ; , ' " * \n \r \s \t & ? + %21 %26 %2526 %3B %7C %20 %2B %3D -- %2520 %5D %5B %3A %0A %2C %28 %29 2 3def minor_segments(s): 4 """ 5 Perform minor segmenting on a string. This is like major 6 segmenting, except it also captures from the start of the 7 input to each break. 8 """ 9 minor_breaks = '_.' 10 last = -1 11 results = set() 12 13 for idx, ch in enumerate(s): 14 if ch in minor_breaks: 15 segment = s[last+1:idx] 16 results.add(segment) 17 18 segment = s[:idx] 19 results.add(segment) 20 21 last = idx 22 23 segment = s[last+1:] 24 results.add(segment) 25 results.add(s) 26 27 return results 28 

次要分割

次要分割和主要分割的邏輯類似,只是還會把從開始部分到當前分割的結果加入。例如“1.2.3.4”的次要分割會有1,2,3,4,1.2,1.2.3

 
1def segments(event): 2 """Simple wrapper around major_segments / minor_segments""" 3 results = set() 4 for major in major_segments(event): 5 for minor in minor_segments(major): 6 results.add(minor) 7 return results 

分詞的邏輯就是對文字先進行主要分割,對每一個主要分割在進行次要分割。然後把所有分出來的詞返回。

我們看看這段 code是如何執行的:

 
1for term in segments('src_ip = 1.2.3.4'): 2 print term 3 4src 51.2 61.2.3.4 7src_ip 83 91 101.2.3 11ip 122 13= 144 

搜尋

好了,有個分詞和布隆過濾器這兩個利器的支撐後,我們就可以來實現搜尋的功能了。

上程式碼:

 
1class Splunk(object): 2 def __init__(self): 3 self.bf = Bloomfilter(64) 4 self.terms = {} # Dictionary of term to set of events 5 self.events = [] 6 7 def add_event(self, event): 8 """Adds an event to this object""" 9 10 # Generate a unique ID for the event, and save it 11 event_id = len(self.events) 12 self.events.append(event) 13 14 # Add each term to the bloomfilter, and track the event by each term 15 for term in segments(event): 16 self.bf.add_value(term) 17 18 if term not in self.terms: 19 self.terms[term] = set() 20 self.terms[term].add(event_id) 21 22 def search(self, term): 23 """Search for a single term, and yield all the events that contain it""" 24 25 # In Splunk this runs in O(1), and is likely to be in filesystem cache (memory) 26 if not self.bf.might_contain(term): 27 return 28 29 # In Splunk this probably runs in O(log N) where N is the number of terms in the tsidx 30 if term not in self.terms: 31 return 32 33 for event_id in sorted(self.terms[term]): 34 yield self.events[event_id] 1Splunk代表一個擁有搜尋功能的索引集合 2每一個集合中包含一個布隆過濾器,一個倒排詞表(字典),和一個儲存所有事件的陣列 3當一個事件被加入到索引的時候,會做以下的邏輯 4 為每一個事件生成一個unqie id,這裡就是序號 5 對事件進行分詞,把每一個詞加入到倒排詞表,也就是每一個詞對應的事件的id的對映結構,注意,一個詞可能對應多個事件,所以倒排表的的值是一個Set。倒排表是絕大部分搜尋引擎的核心功能。 6當一個詞被搜尋的時候,會做以下的邏輯 7 檢查布隆過濾器,如果為假,直接返回 8 檢查詞表,如果被搜尋單詞不在詞表中,直接返回 9 在倒排表中找到所有對應的事件id,然後返回事件的內容 

我們執行下看看把:

 
1s = Splunk() 2s.add_event('src_ip = 1.2.3.4') 3s.add_event('src_ip = 5.6.7.8') 4s.add_event('dst_ip = 1.2.3.4') 5 6for event in s.search('1.2.3.4'): 7 print event 8print '-' 9for event in s.search('src_ip'): 10 print event 11print '-' 12for event in s.search('ip'): 13 print event 14 15src_ip = 1.2.3.4 16dst_ip = 1.2.3.4 17- 18src_ip = 1.2.3.4 19src_ip = 5.6.7.8 20- 21src_ip = 1.2.3.4 22src_ip = 5.6.7.8 23dst_ip = 1.2.3.4 24 25是不是很贊! 26更復雜的搜尋 27 28更進一步,在搜尋過程中,我們想用And和Or來實現更復雜的搜尋邏輯。 29 30上程式碼: 31 32class SplunkM(object): 33 def __init__(self): 34 self.bf = Bloomfilter(64) 35 self.terms = {} # Dictionary of term to set of events 36 self.events = [] 37 38 def add_event(self, event): 39 """Adds an event to this object""" 40 41 # Generate a unique ID for the event, and save it 42 event_id = len(self.events) 43 self.events.append(event) 44 45 # Add each term to the bloomfilter, and track the event by each term 46 for term in segments(event): 47 self.bf.add_value(term) 48 if term not in self.terms: 49 self.terms[term] = set() 50 51 self.terms[term].add(event_id) 52 53 def search_all(self, terms): 54 """Search for an AND of all terms""" 55 56 # Start with the universe of all events... 57 results = set(range(len(self.events))) 58 59 for term in terms: 60 # If a term isn't present at all then we can stop looking 61 if not self.bf.might_contain(term): 62 return 63 if term not in self.terms: 64 return 65 66 # Drop events that don't match from our results 67 results = results.intersection(self.terms[term]) 68 69 for event_id in sorted(results): 70 yield self.events[event_id] 71 72 73 def search_any(self, terms): 74 """Search for an OR of all terms""" 75 results = set() 76 77 for term in terms: 78 # If a term isn't present, we skip it, but don't stop 79 if not self.bf.might_contain(term): 80 continue 81 if term not in self.terms: 82 continue 83 84 # Add these events to our results 85 results = results.union(self.terms[term]) 86 87 for event_id in sorted(results): 88 yield self.events[event_id] 

利用Python集合的intersection和union操作,可以很方便的支援And(求交集)和Or(求合集)的操作。

執行結果如下:

 
1s = SplunkM() 2s.add_event('src_ip = 1.2.3.4') 3s.add_event('src_ip = 5.6.7.8') 4s.add_event('dst_ip = 1.2.3.4') 5 6for event in s.search_all(['src_ip', '5.6']): 7 print event 8print '-' 9for event in s.search_any(['src_ip', 'dst_ip']): 10 print event 11 12src_ip = 5.6.7.8 13- 14src_ip = 1.2.3.4 15src_ip = 5.6.7.8 16dst_ip = 1.2.3.4 

總結

以上的程式碼只是為了說明大資料搜尋的基本原理,包括布隆過濾器,分詞和倒排表。如果大家真的想要利用這程式碼來實現真正的搜尋功能,還差的太遠。