1. 程式人生 > >[python] 詞雲:wordcloud包的安裝、使用、原理(源碼分析)、中文詞雲生成、代碼重寫

[python] 詞雲:wordcloud包的安裝、使用、原理(源碼分析)、中文詞雲生成、代碼重寫

possible 渲染 alias com 表達 問題 compute ural pty

詞雲,又稱文字雲、標簽雲,是對文本數據中出現頻率較高的“關鍵詞”在視覺上的突出呈現,形成關鍵詞的渲染形成類似雲一樣的彩色圖片,從而一眼就可以領略文本數據的主要表達意思。常見於博客、微博、文章分析等。

除了網上現成的Wordle、Tagxedo、Tagul、Tagcrowd等詞雲制作工具,在python中也可以用wordcloud包比較輕松地實現(官網、github項目):

from wordcloud import WordCloud
import matplotlib.pyplot as plt

# Read the whole text.
text = open(
constitution.txt).read() # Generate a word cloud image wordcloud = WordCloud().generate(text) # Display the generated image: # the matplotlib way: plt.imshow(wordcloud, interpolation=bilinear) plt.axis("off")

生成的詞雲如下:

技術分享圖片

還可以設置圖片作為mask:

alice_mask = np.array(Image.open(path.join(d, "
alice_mask.png"))) wc = WordCloud(background_color="white", max_words=2000, mask=alice_mask, stopwords=stopwords, contour_width=3, contour_color=steelblue) wc.generate(text)

技術分享圖片

1. 安裝

pip install wordcloud

詞雲:解決pip install wordcloud安裝過程中報錯“error: command ‘x86_64-linux-gnu-gcc‘ failed with exit status 1”問題

2. 根據源碼分析wordcloud的實現原理

總的來說,wordcloud做的是三件事:

(1) 文本預處理

(2) 詞頻統計

(3) 將高頻詞以圖片形式進行彩色渲染

從上面的代碼可以看到,用 wordcloud.generate(text) 就完成了這三項工作。

源碼:

技術分享圖片
def generate(self, text):
    """Generate wordcloud from text.

    The input "text" is expected to be a natural text. If you pass a sorted
    list of words, words will appear in your output twice. To remove this
    duplication, set ``collocations=False``.

    Alias to generate_from_text.

    Calls process_text and generate_from_frequencies.

    Returns
    -------
    self
    """
    return self.generate_from_text(text)

def generate_from_text(self, text):
    """Generate wordcloud from text.

    The input "text" is expected to be a natural text. If you pass a sorted
    list of words, words will appear in your output twice. To remove this
    duplication, set ``collocations=False``.

    Calls process_text and generate_from_frequencies.

    ..versionchanged:: 1.2.2
        Argument of generate_from_frequencies() is not return of
        process_text() any more.

    Returns
    -------
    self
    """
    words = self.process_text(text)
    self.generate_from_frequencies(words)
    return self
generate()和generate_from_text()

它的調用順序是:

generate(self, text)
=>
self.generate_from_text(text)
=>
words = self.process_text(text)
self.generate_from_frequencies(words)

其中 process_text(text) 對應的是文本預處理和詞頻統計,而 generate_from_frequencies(words) 對應的是根據詞頻中生成詞雲

(1) process_text(text) 主要是進行分詞和去噪。

具體地,它做了以下操作:

  • 檢測文本編碼
  • 分詞(根據規則進行tokenize)、保留單詞字符(A-Za-z0-9_)和單引號(‘)、去除單字符
  • 去除停用詞
  • 去除後綴(‘s) -- 針對英文
  • 去除純數字
  • 統計一元和二元詞頻計數(unigrams_and_bigrams) -- 可選

返回的結果是一個字典 dict(string, int) ,表示的是分詞後的token以及對應出現的次數

這裏有一些需要註意的地方,文章後面會再提到。

源碼如下:

技術分享圖片
def process_text(self, text):
    """Splits a long text into words, eliminates the stopwords.

    Parameters
    ----------
    text : string
        The text to be processed.

    Returns
    -------
    words : dict (string, int)
        Word tokens with associated frequency.

    ..versionchanged:: 1.2.2
        Changed return type from list of tuples to dict.

    Notes
    -----
    There are better ways to do word tokenization, but I don‘t want to
    include all those things.
    """

    stopwords = set([i.lower() for i in self.stopwords])

    flags = (re.UNICODE if sys.version < 3 and type(text) is unicode
             else 0)
    regexp = self.regexp if self.regexp is not None else r"\w[\w‘]+"

    words = re.findall(regexp, text, flags)
    # remove stopwords
    words = [word for word in words if word.lower() not in stopwords]
    # remove ‘s
    words = [word[:-2] if word.lower().endswith("‘s") else word
             for word in words]
    # remove numbers
    words = [word for word in words if not word.isdigit()]

    if self.collocations:
        word_counts = unigrams_and_bigrams(words, self.normalize_plurals)
    else:
        word_counts, _ = process_tokens(words, self.normalize_plurals)

    return word_counts
def process_text(self, text)

(2) generate_from_frequencies(words) 主要是根據上一步的結果生成詞雲分布。

具體地,它做了以下操作:

  • 對詞計數結果進行排序,並歸一化(normalized)到0~1之間,得到詞頻
  • 創建圖像並確定font_size初始值
  • 給self.words_賦值,記錄的是出現頻率最高的前max_words個詞,以及對應的歸一化後的詞頻,即dict(token, normalized_frequency)
  • 畫出灰度圖:詞頻越大,font_size越大;根據生成的隨機數來決定字的水平/垂直方向
    • 若隨機數小於self.prefer_horizontal則為水平方向,否則為垂直方向;
    • 如果空間不足,優先考慮旋轉方向,其次考慮將字體變小
  • 給self.layout_賦值,記錄的是詞和詞頻、字體大小、位置、方向、以及顏色,即list(zip(frequencies, font_sizes, positions, orientations, colors))

可以看到,這個函數的主要目的在於得到self.layout_的值,記錄了要生成詞雲分布圖所需要的信息。

後面wordcloud.to_file(filename)或者plt.imshow(wordcloud)會把結果以圖像的形式呈現出來。其中to_file()函數就會先檢測是否已經給self.layout_賦值,如果沒有的話會報錯。

源碼如下:

技術分享圖片
def generate_from_frequencies(self, frequencies, max_font_size=None):
    """Create a word_cloud from words and frequencies.

    Parameters
    ----------
    frequencies : dict from string to float
        A contains words and associated frequency.

    max_font_size : int
        Use this font-size instead of self.max_font_size

    Returns
    -------
    self

    """
    # make sure frequencies are sorted and normalized
    frequencies = sorted(frequencies.items(), key=itemgetter(1), reverse=True)
    if len(frequencies) <= 0:
        raise ValueError("We need at least 1 word to plot a word cloud, "
                         "got %d." % len(frequencies))
    frequencies = frequencies[:self.max_words]

    # largest entry will be 1
    max_frequency = float(frequencies[0][1])

    frequencies = [(word, freq / max_frequency)
                   for word, freq in frequencies]

    if self.random_state is not None:
        random_state = self.random_state
    else:
        random_state = Random()

    if self.mask is not None:
        mask = self.mask
        width = mask.shape[1]
        height = mask.shape[0]
        if mask.dtype.kind == f:
            warnings.warn("mask image should be unsigned byte between 0"
                          " and 255. Got a float array")
        if mask.ndim == 2:
            boolean_mask = mask == 255
        elif mask.ndim == 3:
            # if all channels are white, mask out
            boolean_mask = np.all(mask[:, :, :3] == 255, axis=-1)
        else:
            raise ValueError("Got mask of invalid shape: %s"
                             % str(mask.shape))
    else:
        boolean_mask = None
        height, width = self.height, self.width
    occupancy = IntegralOccupancyMap(height, width, boolean_mask)

    # create image
    img_grey = Image.new("L", (width, height))
    draw = ImageDraw.Draw(img_grey)
    img_array = np.asarray(img_grey)
    font_sizes, positions, orientations, colors = [], [], [], []

    last_freq = 1.

    if max_font_size is None:
        # if not provided use default font_size
        max_font_size = self.max_font_size

    if max_font_size is None:
        # figure out a good font size by trying to draw with
        # just the first two words
        if len(frequencies) == 1:
            # we only have one word. We make it big!
            font_size = self.height
        else:
            self.generate_from_frequencies(dict(frequencies[:2]),
                                           max_font_size=self.height)
            # find font sizes
            sizes = [x[1] for x in self.layout_]
            try:
                font_size = int(2 * sizes[0] * sizes[1] 
                                / (sizes[0] + sizes[1]))
            # quick fix for if self.layout_ contains less than 2 values
            # on very small images it can be empty
            except IndexError:
                try:
                    font_size = sizes[0]
                except IndexError:
                    raise ValueError(canvas size is too small)
    else:
        font_size = max_font_size

    # we set self.words_ here because we called generate_from_frequencies
    # above... hurray for good design?
    self.words_ = dict(frequencies)

    # start drawing grey image
    for word, freq in frequencies:
        # select the font size
        rs = self.relative_scaling
        if rs != 0:
            font_size = int(round((rs * (freq / float(last_freq))
                                   + (1 - rs)) * font_size))
        if random_state.random() < self.prefer_horizontal:
            orientation = None
        else:
            orientation = Image.ROTATE_90
        tried_other_orientation = False
        while True:
            # try to find a position
            font = ImageFont.truetype(self.font_path, font_size)
            # transpose font optionally
            transposed_font = ImageFont.TransposedFont(
                font, orientation=orientation)
            # get size of resulting text
            box_size = draw.textsize(word, font=transposed_font)
            # find possible places using integral image:
            result = occupancy.sample_position(box_size[1] + self.margin,
                                               box_size[0] + self.margin,
                                               random_state)
            if result is not None or font_size < self.min_font_size:
                # either we found a place or font-size went too small
                break
            # if we didn‘t find a place, make font smaller
            # but first try to rotate!
            if not tried_other_orientation and self.prefer_horizontal < 1:
                orientation = (Image.ROTATE_90 if orientation is None else
                               Image.ROTATE_90)
                tried_other_orientation = True
            else:
                font_size -= self.font_step
                orientation = None

        if font_size < self.min_font_size:
            # we were unable to draw any more
            break

        x, y = np.array(result) + self.margin // 2
        # actually draw the text
        draw.text((y, x), word, fill="white", font=transposed_font)
        positions.append((x, y))
        orientations.append(orientation)
        font_sizes.append(font_size)
        colors.append(self.color_func(word, font_size=font_size,
                                      position=(x, y),
                                      orientation=orientation,
                                      random_state=random_state,
                                      font_path=self.font_path))
        # recompute integral image
        if self.mask is None:
            img_array = np.asarray(img_grey)
        else:
            img_array = np.asarray(img_grey) + boolean_mask
        # recompute bottom right
        # the order of the cumsum‘s is important for speed ?!
        occupancy.update(img_array, x, y)
        last_freq = freq

    self.layout_ = list(zip(frequencies, font_sizes, positions,
                            orientations, colors))
    return self       
def generate_from_frequencies(self, frequencies, max_font_size=None)

3. 應用到中文語料應該要註意的點

wordcloud包是由Andreas Mueller在2015-03-20發布1.0.0版本,現在最新的是2018-03-13發布的1.4.1版本。

英文語料可以直接輸入到wordcloud中,但是對於中文語料,僅僅用wordcloud不能直接生成中文詞雲圖。

原因:

英文單詞以空格分隔,而我們從前面process_text(text)看到源碼中是直接用正則表達式(默認為r"\w[\w‘]+")進行處理:

In  : re.findall(r"\w[\w‘]+", "It‘s Monday today.")
Out: ["It‘s", Monday, today]

但是中文裏面詞與詞之間一般不用字符分隔:

In : re.findall(r"\w[\w‘]+", "今天天氣不錯,藍天白雲,還有溫暖的陽光 哈 哈哈")
Out: [今天天氣不錯, 藍天白雲, 還有溫暖的陽光, 哈哈]

可以看出,原生的wordcloud是為英文服務的,去除標點符號(單符號‘除外)並分割成token;

而應用到中文語料上的時候,註意要先分好詞,再用空格分隔連接成字符串,最後輸入到wordcloud。

另外要註意的是,無論是對英文還是中文,默認是把單字符剔除掉(因為 regexp = self.regexp if self.regexp is not None else r"\w[\w‘]+" ),如果想要保留單字符,將regexp參數講表達式設置為 r"\w[\w‘]*" 即可。

from wordcloud import WordCloud
from scipy.misc import imread

def generate_wordcloud(text, max_words=200, pic_path=None):
    """
    生成詞雲
    :param text: 一段以空格為間斷的字符串
    :param max_words: 詞數目上限
    :param pic_path: 輸出圖片路徑
    :return:
    """
    mk = imread("tuoyuan.jpg")
    wc = WordCloud(font_path="/usr/share/fonts/myfonts/msyh.ttf", background_color="white", max_words=max_words,
                   mask=mk, width=1000, height=500, max_font_size=100, prefer_horizontal=0.95, collocations=False)
    wc.generate(text=text)
    if pic_path:
        wc.to_file(pic_path)
    else:
        plt.imshow(wc)
        plt.axis("off")
        plt.show()
    return wc.words_

def run_wordcloud(corpus, max_words, pic_path=None):
    text = " ".join([" ".join(line) for line in corpus])   # 將分詞後的結果用空格連接
    word2weight = generate_wordcloud(text=text, max_words=max_words, pic_path=pic_path)
    word2weight_sorted = sorted(word2weight.items(), key=lambda x: x[1], reverse=True)
    logging.info([(k, float("%.5f" % v)) for k, v in word2weight_sorted]) 

更多參考:word_cloud/examples/wordcloud_cn.py

4. 重寫代碼

用詞雲是為了直觀地看語料的關鍵信息,在本人的實際工作應用中,主要目的在於獲取關鍵信息,而不太關註界面的呈現方式。

所以在了解wordcloud源碼實現原理之後,決定自己用代碼實現。

一方面,使得代碼的實現更公開透明,在效率相當的情況下盡量避免使用第三方庫,效果可控,甚至還可以提升效率;

另一方面,能結合實際情況更靈活地處理問題。

針對中文的預處理,可以和分詞結合一起完成。這裏主要進行:分詞和詞性標註、小寫化、去停用詞、去數字、去單字符、以及保留指定詞性

import jieba
import jieba.posseg as pseg

class Utils(object):
    def __init__(self, utils_data=None):
        self.stopwords = self.init_utils(utils_data)
        self.pos_save = {
            "n", "an", "Ng", "nr", "ns", "nt", "nz", "vn", "un",  #
            "v", "vg", "vd",  #
            "a", "ag", "ad",  #
            "j", "l", "i", "z", "b", "g", "s", "h",  # j簡稱略語、l習用語、i成語、z狀態詞、b區別詞、g語素、s處所詞、h前接成分
            "zg", "eng",
            "x"}  # 未知(自定義詞)

    def _init_utils(self, utils_data):
        for wd in utils_data["user_dict"]:
            jieba.add_word(wd)
        return set(utils_data["stopwords"])

    def _token_filter(self, token):  # 去停用詞; 去數字; 去單字
        return token not in self.stopwords and not token.isdigit() and len(token) >= 2

    def _token_filter_with_flag(self, pair_word_flag):  # 保留指定詞性
        return self.token_filter(pair_word_flag.word) and pair_word_flag.flag in self.pos_save

    def cut(self, text):
        return list(filter(self._token_filter, list(jieba.cut(text.lower()))))  # 分詞; 小寫化;

    def cut_with_flag(self, text):
        pairs = list(filter(self._token_filter_with_flag,  list(pseg.cut(text.lower()))))  # 分詞和詞性標註; 小寫化;
        return [p.word for p in pairs]

做完文本分詞和其它預處理之後,直接統計詞及對應的出現次數即可。為了更直觀,這裏輸出的是詞計數,而不是歸一化後的詞頻。排序結果與wordcloud等同。

    def word_count(corpus, n_gram=1, n=None):
        counter = Counter()
        if n_gram == 1:
            for line in corpus:
                counter.update(line)
        elif n_gram == 2:
            for line in corpus:
                size = len(line)
                counter.update(["%s_%s" % (line[idx], line[idx + 1]) for idx in range(size) if idx + 1 < size])  # 有序
        else:
            logging.info("[Error] Invalid value of param n_gram: %s (only 1 or 2 accepted)" % n_gram)
        return counter.most_common(n=n)

另外還可以統計高頻詞的共現情況、把高頻詞/詞共現反向映射到對應的句子等等,便於從高頻詞層面到高頻句子類型層面的歸納。

參考:

https://pypi.org/project/wordcloud/

https://github.com/amueller/word_cloud

http://python.jobbole.com/87496/

https://www.jianshu.com/p/ead991a08563

https://blog.csdn.net/qq_34739497/article/details/78285972

https://www.cnblogs.com/sunnyeveryday/p/7043399.html

https://www.cnblogs.com/naraka/p/8992058.html

https://www.cnblogs.com/franklv/p/6995150.html

https://blog.csdn.net/Tang_Chuanlin/article/details/79862505

[python] 詞雲:wordcloud包的安裝、使用、原理(源碼分析)、中文詞雲生成、代碼重寫