爬蟲實戰 - Using Python3 Part6

Thread 爬蟲應用
前面章節介紹的做法都是一步一步慢慢抓,以Level 2 為例

我們的做法是抓好第n 頁後再往第n+1 頁抓,但這樣的速度真的太慢了,一個時間點內只有送出一個http request 查詢,有沒有辦法是同個時間內送出多個http request?

當然是可以的,這就是我們這一章節要介紹的 Thread

Thread 是什麼?

Thread 是一個跑在 行程(process) 內的執行流程,傳統process 內只會有一個 Thread,也就是同時間內process 只會做一件事,windows 模式下的 cmd 文件介面就是單執行緒,當我輸入指令後必須等該指令執行完成才可以輸入下一個指令。
作業系統上還有另一種多執行緒的模式,也就是一個process 內有多個不同的Thread,multi thread 並不限定只能跑在單核 或者 多核 的CPU上,透過作業系統的 program counter 可以迅速的在不同thread 之間做切換,所以Thread 之間並非同步執行,而是交錯執行,只是交錯的時間非常短暫。

透過Thread 在寫爬蟲程式的時候,我會先規劃出一個目標類別,這個類別本身就是一個Thread,以Level 2 為例,我會把每個page 網頁當成一個Thread,透過 for 迴圈走訪每頁的網址,在迴圈內觸發Thread 後便可以讓它在背景自己執行,過程如下:

1
2
3
for i in range(1, 10):
t = new thread()
t.start()

接下來我們就可以來寫我們的目標類別:

1
2
3
4
5
class axePage(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run():
pass

而主程式的流程如下:

1
2
3
4
5
6
def main():
for i in range(1, 76):
worker = axePage()
worker.start()
if __name__ == "__main__":
main()

當我們宣告worker 的時候,worker 本身並不會開始執行,必須透過 worker.start() 來啟動thread。

以上是我們程式的雛型,接下來我們要開始實作axePage 內的動作。

1
2
3
4
5
6
7
8
9
10
11
12
13
class axePage(threading.Thread):
def __init__(self, url):
self.url = url
threading.Thread.__init__(self)
def run(self):
result = requests.get(self.url)
result.encoding = 'utf-8'
self.jsondata = parseUrlToJson(result.text)

def main():
for i in range(1, 13):
worker = axePage("http://axe-level-1.herokuapp.com/lv2/?page=%d" % i)
worker.start()

把要處理的網址傳給thread,在讓thread下載並解析網頁內容。

解析的方法可以參考前面的章節

1
2
3
4
5
6
7
8
def parseUrlToJson(content):
root = etree.fromstring(content, etree.HTMLParser())
jsondata = ""
rows = root.xpath("//table[@class='table']/tr[position()>1]")
for row in rows:
column = row.xpath("./td/text()")
jsondata += '{"town": "%s", "village": "%s", "name": "%s"},' % (column[0], column[1], column[2])
return jsondata

如此一來就能透過類別內的 jsondata 變數取得網頁資料了。

1
2
3
4
5
6
def main():
jsondata = ""
for i in range(1, 13):
worker = axePage("http://axe-level-1.herokuapp.com/lv2/?page=%d" % i)
worker.start()
jsondata += worker.jsondata

如果你接資料的方式這樣寫的話,跟你保證百分百一定拿不到資料。
這是因為當這該程式執行時,內部已經有一個main thread 開始跑,當執行到這兩行時,會額外產生新的thread。

1
2
worker = axePage(url)
worker.start()

但是main thread 與 新產生的thread 是獨立的,也因此當你想取得 worker.jsondata 時,worker還沒有下載解析完網頁內容,所以我們必須更改成這樣:

1
2
3
4
5
6
7
def main():
jsondata = ""
for i in range(1, 13):
worker = axePage("http://axe-level-1.herokuapp.com/lv2/?page=%d" % i)
worker.start()
worker.join()
jsondata += worker.jsondata

join() 可以讓目標thread 阻塞,待該thread 執行完後程式才會繼續執行,這樣的寫法雖然是多執行緒,但也是等一個執行完才會執行下一個,我們想讓全部thread 一起跑,最後在一統把資料整合起來,所以main function 要做點修改:

1
2
3
4
5
6
7
8
9
10
11
12
def main():
workerList = []
for i in range(1, 13):
worker = axePage("http://axe-level-1.herokuapp.com/lv2/?page=%d" % i)
worker.start()
workerList.append(worker)

jsondata = "["
for i in range(len(workerList)):
workerList[i].join()
jsondata += workerList[i].jsondata
print(jsondata [0: -1] + "]")

把每個執行的thread透過list 存起來,最後再跑for 迴圈走訪list 等待thread 處理完資料。

完整的程式碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# -*- coding: utf8 -*-
from lxml import etree, html
import requests, threading

class axePage(threading.Thread):
def __init__(self, url):
self.url = url
threading.Thread.__init__(self)
def run(self):
result = requests.get(self.url)
result.encoding = 'utf-8'
self.jsondata = parseUrlToJson(result.text)

def parseUrlToJson(content):
root = etree.fromstring(content, etree.HTMLParser())
jsondata = ""
rows = root.xpath("//table[@class='table']/tr[position()>1]")
for row in rows:
column = row.xpath("./td/text()")
jsondata += '{"town": "%s", "village": "%s", "name": "%s"},' % (column[0], column[1], column[2])
return jsondata

def main():
workerList = []
for i in range(1, 13):
worker = axePage("http://axe-level-1.herokuapp.com/lv2/?page=%d" % i)
worker.start()
workerList.append(worker)

jsondata = "["
for i in range(len(workerList)):
workerList[i].join()
jsondata += workerList[i].jsondata
print(jsondata [0: -1] + "]")

if __name__ == "__main__":
main()