本文实例讲述了php与python实现的线程池多线程爬虫功能。分享给大家供大家参考,具体如下:
多线程爬虫可以用于抓取内容了这个可以提升性能了,这里我们来看php与python 线程池多线程爬虫的例子,代码如下:
php例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
<?php class Connect extends Worker //worker模式 { public function __construct() { } public function getConnection() { if (!self:: $ch ) { self:: $ch = curl_init(); curl_setopt(self:: $ch , CURLOPT_TIMEOUT, 2); curl_setopt(self:: $ch , CURLOPT_RETURNTRANSFER, 1); curl_setopt(self:: $ch , CURLOPT_HEADER, 0); curl_setopt(self:: $ch , CURLOPT_NOSIGNAL, true); curl_setopt(self:: $ch , CURLOPT_USERAGENT, "Firefox" ); curl_setopt(self:: $ch , CURLOPT_FOLLOWLOCATION, 1); } /* do some exception/error stuff here maybe */ return self:: $ch ; } public function closeConnection() { curl_close(self:: $ch ); } /** * Note that the link is stored statically, which for pthreads, means thread local * */ protected static $ch ; } class Query extends Threaded { public function __construct( $url ) { $this ->url = $url ; } public function run() { $ch = $this ->worker->getConnection(); curl_setopt( $ch , CURLOPT_URL, $this ->url); $page = curl_exec( $ch ); $info = curl_getinfo( $ch ); $error = curl_error( $ch ); $this ->deal_data( $this ->url, $page , $info , $error ); $this ->result = $page ; } function deal_data( $url , $page , $info , $error ) { $parts = explode ( "." , $url ); $id = $parts [1]; if ( $info [ 'http_code' ] != 200) { $this ->show_msg( $id , $error ); } else { $this ->show_msg( $id , "OK" ); } } function show_msg( $id , $msg ) { echo $id . "\t$msg\n" ; } public function getResult() { return $this ->result; } protected $url ; protected $result ; } function check_urls_multi_pthreads() { global $check_urls ; //定义抓取的连接 $check_urls = array ( 'http://xxx.com' => "xx网" ,); $pool = new Pool(10, "Connect" , array ()); //建立10个线程池 foreach ( $check_urls as $url => $name ) { $pool ->submit( new Query( $url )); } $pool ->shutdown(); } check_urls_multi_pthreads(); python 多线程 def handle(sid): //这个方法内执行爬虫数据处理 pass class MyThread(Thread): "" "docstring for ClassName" "" def __init__(self, sid): Thread.__init__(self) self.sid = sid def run(): handle(self.sid) threads = [] for i in xrange(1,11): t = MyThread(i) threads.append(t) t.start() for t in threads: t.join() |
python 线程池爬虫:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set ([ '/' ]) lock = Lock() class Fetcher(Thread): def __init__( self , tasks): Thread.__init__( self ) self .tasks = tasks self .daemon = True self .start() def run( self ): while True : url = self .tasks.get() print (url) sock = socket.socket() sock.connect(( 'localhost' , 3000 )) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n' . format (url) sock.send(get.encode( 'ascii' )) response = b'' chunk = sock.recv( 4096 ) while chunk: response + = chunk chunk = sock.recv( 4096 ) links = self .parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self .tasks.put(link) seen_urls.update(links) lock.release() self .tasks.task_done() def parse_links( self , fetched_url, response): if not response: print ( 'error: {}' . format (fetched_url)) return set () if not self ._is_html(response): return set () urls = set (re.findall(r '''(?i)href=["']?([^\s"'<>]+)''' , self .body(response))) links = set () for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in (' ', ' http ', ' https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ( 'localhost' ): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body( self , response): body = response.split(b '\r\n\r\n' , 1 )[ 1 ] return body.decode( 'utf-8' ) def _is_html( self , response): head, body = response.split(b '\r\n\r\n' , 1 ) headers = dict (h.split( ': ' ) for h in head.decode().split( '\r\n' )[ 1 :]) return headers.get( 'Content-Type' , ' ').startswith(' text / html') class ThreadPool: def __init__( self , num_threads): self .tasks = Queue() for _ in range (num_threads): Fetcher( self .tasks) def add_task( self , url): self .tasks.put(url) def wait_completion( self ): self .tasks.join() if __name__ = = '__main__' : start = time.time() pool = ThreadPool( 4 ) pool.add_task( "/" ) pool.wait_completion() print ( '{} URLs fetched in {:.1f} seconds' . format ( len (seen_urls),time.time() - start)) |
希望本文所述对大家PHP程序设计有所帮助。