1. 程式人生 > >python3+PyQt5 實現支援多執行緒的頁面索引器應用程式

python3+PyQt5 實現支援多執行緒的頁面索引器應用程式

本文通過Python3+pyqt5實現了python Qt GUI 快速程式設計的19章的頁面索引器應用程式例子。

/home/yrd/eric_workspace/chap19/walker_ans.py

#!/usr/bin/env python3

import codecs
import html.entities
import re
import sys
from PyQt5.QtCore import (QMutex, QThread,pyqtSignal,Qt)

class Walker(QThread):
    finished = pyqtSignal(bool,int)
    indexed = pyqtSignal(str,int)
    COMMON_WORDS_THRESHOLD = 250
MIN_WORD_LEN = 3 MAX_WORD_LEN = 25 INVALID_FIRST_OR_LAST = frozenset("0123456789_") STRIPHTML_RE = re.compile(r"<[^>]*?>", re.IGNORECASE|re.MULTILINE) ENTITY_RE = re.compile(r"&(\w+?);|&#(\d+?);") SPLIT_RE = re.compile(r"\W+", re.IGNORECASE|re.MULTILINE) def
__init__(self, index, lock, files, filenamesForWords, commonWords, parent=None):
super(Walker, self).__init__(parent) self.index = index self.lock = lock self.files = files self.filenamesForWords = filenamesForWords self.commonWords = commonWords self.stopped = False
self.mutex = QMutex() self.completed = False def stop(self): try: self.mutex.lock() self.stopped = True finally: self.mutex.unlock() def isStopped(self): try: self.mutex.lock() return self.stopped finally: self.mutex.unlock() def run(self): self.processFiles() self.stop() self.finished.emit(self.completed,self.index) def processFiles(self): def unichrFromEntity(match): text = match.group(match.lastindex) if text.isdigit(): return chr(int(text)) u = html.entities.name2codepoint.get(text) return chr(u) if u is not None else "" for fname in self.files: if self.isStopped(): return words = set() fh = None try: fh = codecs.open(fname, "r", "UTF8", "ignore") text = fh.read() except EnvironmentError as e: sys.stderr.write("Error: {0}\n".format(e)) continue finally: if fh is not None: fh.close() if self.isStopped(): return text = self.STRIPHTML_RE.sub("", text) text = self.ENTITY_RE.sub(unichrFromEntity, text) text = text.lower() for word in self.SPLIT_RE.split(text): if (self.MIN_WORD_LEN <= len(word) <= self.MAX_WORD_LEN and word[0] not in self.INVALID_FIRST_OR_LAST and word[-1] not in self.INVALID_FIRST_OR_LAST): try: self.lock.lockForRead() new = word not in self.commonWords finally: self.lock.unlock() if new: words.add(word) if self.isStopped(): return for word in words: try: self.lock.lockForWrite() files = self.filenamesForWords[word] if len(files) > self.COMMON_WORDS_THRESHOLD: del self.filenamesForWords[word] self.commonWords.add(word) else: files.add(str(fname)) finally: self.lock.unlock() self.indexed.emit(fname,self.index) self.completed = True /home/yrd/eric_workspace/chap19/pageindexer_ans.pyw #!/usr/bin/env python3 import collections import os import sys from PyQt5.QtCore import (QDir, QReadWriteLock, QMutex,Qt) from PyQt5.QtWidgets import (QApplication, QDialog, QFileDialog, QFrame, QHBoxLayout, QLCDNumber, QLabel, QLineEdit, QListWidget, QPushButton, QVBoxLayout) import walker_ans as walker def isAlive(qobj): import sip try: sip.unwrapinstance(qobj) except RuntimeError: return False return True class Form(QDialog): def __init__(self, parent=None): super(Form, self).__init__(parent) self.mutex = QMutex() self.fileCount = 0 self.filenamesForWords = collections.defaultdict(set) self.commonWords = set() self.lock = QReadWriteLock() self.path = QDir.homePath() pathLabel = QLabel("Indexing path:") self.pathLabel = QLabel() self.pathLabel.setFrameStyle(QFrame.StyledPanel|QFrame.Sunken) self.pathButton = QPushButton("Set &Path...") self.pathButton.setAutoDefault(False) findLabel = QLabel("&Find word:") self.findEdit = QLineEdit() findLabel.setBuddy(self.findEdit) commonWordsLabel = QLabel("&Common words:") self.commonWordsListWidget = QListWidget() commonWordsLabel.setBuddy(self.commonWordsListWidget) filesLabel = QLabel("Files containing the &word:") self.filesListWidget = QListWidget() filesLabel.setBuddy(self.filesListWidget) filesIndexedLabel = QLabel("Files indexed") self.filesIndexedLCD = QLCDNumber() self.filesIndexedLCD.setSegmentStyle(QLCDNumber.Flat) wordsIndexedLabel = QLabel("Words indexed") self.wordsIndexedLCD = QLCDNumber() self.wordsIndexedLCD.setSegmentStyle(QLCDNumber.Flat) commonWordsLCDLabel = QLabel("Common words") self.commonWordsLCD = QLCDNumber() self.commonWordsLCD.setSegmentStyle(QLCDNumber.Flat) self.statusLabel = QLabel("Click the 'Set Path' " "button to start indexing") self.statusLabel.setFrameStyle(QFrame.StyledPanel|QFrame.Sunken) topLayout = QHBoxLayout() topLayout.addWidget(pathLabel) topLayout.addWidget(self.pathLabel, 1) topLayout.addWidget(self.pathButton) topLayout.addWidget(findLabel) topLayout.addWidget(self.findEdit, 1) leftLayout = QVBoxLayout() leftLayout.addWidget(filesLabel) leftLayout.addWidget(self.filesListWidget) rightLayout = QVBoxLayout() rightLayout.addWidget(commonWordsLabel) rightLayout.addWidget(self.commonWordsListWidget) middleLayout = QHBoxLayout() middleLayout.addLayout(leftLayout, 1) middleLayout.addLayout(rightLayout) bottomLayout = QHBoxLayout() bottomLayout.addWidget(filesIndexedLabel) bottomLayout.addWidget(self.filesIndexedLCD) bottomLayout.addWidget(wordsIndexedLabel) bottomLayout.addWidget(self.wordsIndexedLCD) bottomLayout.addWidget(commonWordsLCDLabel) bottomLayout.addWidget(self.commonWordsLCD) bottomLayout.addStretch() layout = QVBoxLayout() layout.addLayout(topLayout) layout.addLayout(middleLayout) layout.addLayout(bottomLayout) layout.addWidget(self.statusLabel) self.setLayout(layout) self.walkers = [] self.completed = [] self.pathButton.clicked.connect(self.setPath) self.findEdit.returnPressed.connect(self.find) self.setWindowTitle("Page Indexer") def stopWalkers(self): for walker in self.walkers: if isAlive(walker) and walker.isRunning(): walker.stop() for walker in self.walkers: if isAlive(walker) and walker.isRunning(): walker.wait() self.walkers = [] self.completed = [] def setPath(self): self.stopWalkers() self.pathButton.setEnabled(False) path = QFileDialog.getExistingDirectory(self, "Choose a Path to Index", self.path) if not path: self.statusLabel.setText("Click the 'Set Path' " "button to start indexing") self.pathButton.setEnabled(True) return self.statusLabel.setText("Scanning directories...") QApplication.processEvents() # Needed for Windows self.path = QDir.toNativeSeparators(path) self.findEdit.setFocus() self.pathLabel.setText(self.path) self.statusLabel.clear() self.filesListWidget.clear() self.fileCount = 0 self.filenamesForWords = collections.defaultdict(set) self.commonWords = set() nofilesfound = True files = [] index = 0 for root, dirs, fnames in os.walk(str(self.path)): for name in [name for name in fnames if name.endswith((".htm", ".html"))]: files.append(os.path.join(root, name)) if len(files) == 1000: self.processFiles(index, files[:]) files = [] index += 1 nofilesfound = False if files: self.processFiles(index, files[:]) nofilesfound = False if nofilesfound: self.finishedIndexing() self.statusLabel.setText( "No HTML files found in the given path") def processFiles(self, index, files): thread = walker.Walker(index, self.lock, files, self.filenamesForWords, self.commonWords, self) thread.indexed[str,int].connect(self.indexed) thread.finished[bool,int].connect(self.finished) thread.finished.connect(thread.deleteLater) self.walkers.append(thread) self.completed.append(False) thread.start() thread.wait(300) # Needed for Windows def find(self): word = str(self.findEdit.text()) if not word: try: self.mutex.lock() self.statusLabel.setText("Enter a word to find in files") finally: self.mutex.unlock() return try: self.mutex.lock() self.statusLabel.clear() self.filesListWidget.clear() finally: self.mutex.unlock() word = word.lower() if " " in word: word = word.split()[0] try: self.lock.lockForRead() found = word in self.commonWords finally: self.lock.unlock() if found: try: self.mutex.lock() self.statusLabel.setText("Common words like '{0}' " "are not indexed".format(word)) finally: self.mutex.unlock() return try: self.lock.lockForRead() files = self.filenamesForWords.get(word, set()).copy() finally: self.lock.unlock() if not files: try: self.mutex.lock() self.statusLabel.setText("No indexed file contains " "the word '{0}'".format(word)) finally: self.mutex.unlock() return files = [QDir.toNativeSeparators(name) for name in sorted(files, key=str.lower)] try: self.mutex.lock() self.filesListWidget.addItems(files) self.statusLabel.setText( "{0} indexed files contain the word '{1}'".format( len(files), word)) finally: self.mutex.unlock() def indexed(self, fname, index): try: self.mutex.lock() self.statusLabel.setText(fname) self.fileCount += 1 count = self.fileCount finally: self.mutex.unlock() if count % 25 == 0: try: self.lock.lockForRead() indexedWordCount = len(self.filenamesForWords) commonWordCount = len(self.commonWords) finally: self.lock.unlock() try: self.mutex.lock() self.filesIndexedLCD.display(count) self.wordsIndexedLCD.display(indexedWordCount) self.commonWordsLCD.display(commonWordCount) finally: self.mutex.unlock() elif count % 101 == 0: try: self.lock.lockForRead() words = self.commonWords.copy() finally: self.lock.unlock() try: self.mutex.lock() self.commonWordsListWidget.clear() self.commonWordsListWidget.addItems(sorted(words)) finally: self.mutex.unlock() def finished(self, completed, index): done = False if self.walkers: self.completed[index] = True if all(self.completed): try: self.mutex.lock() self.statusLabel.setText("Finished") done = True finally: self.mutex.unlock() else: try: self.mutex.lock() self.statusLabel.setText("Finished") done = True finally: self.mutex.unlock() if done: self.finishedIndexing() def reject(self): if not all(self.completed): self.stopWalkers() self.finishedIndexing() else: self.accept() def closeEvent(self, event=None): self.stopWalkers() def finishedIndexing(self): self.filesIndexedLCD.display(self.fileCount) self.wordsIndexedLCD.display(len(self.filenamesForWords)) self.commonWordsLCD.display(len(self.commonWords)) self.pathButton.setEnabled(True) QApplication.processEvents() # Needed for Windows app = QApplication(sys.argv) form = Form() form.show() app.exec_()

執行結果:

這裡寫圖片描述