本文实例为大家分享了Python编写下载器的具体代码,供大家参考,具体内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#!/bin/python3 # author: lidawei # create: 2016-07-11 # version: 1.0 # 功能说明: # 从指定的URL将文件取回本地 ##################################################### import http.client import os import threading import time import logging import unittest from queue import Queue from urllib.parse import urlparse logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' , datefmt = '%a, %d %b %Y %H:%M:%S' , filename = 'Downloader_%s.log' % (time.strftime( '%Y-%m-%d' )), filemode = 'a' ) class Downloader( object ): '''''文件下载器''' url = '' filename = '' def __init__( self , full_url_str, filename): '''''初始化''' self .url = urlparse(full_url_str) self .filename = filename def download( self ): '''''执行下载,返回True或False''' if self .url = = ' ' or self.url == None or self.filename == ' ' or self .filename = = None : logging.error( 'Invalid parameter for Downloader' ) return False successed = False conn = None if self .url.scheme = = 'https' : conn = http.client.HTTPSConnection( self .url.netloc) else : conn = http.client.HTTPConnection( self .url.netloc) conn.request( 'GET' , self .url.path) response = conn.getresponse() if response.status = = 200 : total_size = response.getheader( 'Content-Length' ) total_size = ( int )(total_size) if total_size > 0 : finished_size = 0 file = open ( self .filename, 'wb' ) if file : progress = Progress() progress.start() while not response.closed: buffers = response.read( 1024 ) file .write(buffers) finished_size + = len (buffers) progress.update(finished_size, total_size) if finished_size > = total_size: break # ... end while statment file .close() progress.stop() progress.join() else : logging.error( 'Create local file %s failed' % ( self .filename)) # ... end if statment else : logging.error( 'Request file %s size failed' % ( self .filename)) # ... end if statment else : logging.error( 'HTTP/HTTPS request failed, status code:%d' % (response.status)) # ... end if statment conn.close() return successed # ... end download() method # ... end Downloader class class DataWriter(threading.Thread): filename = '' data_dict = { 'offset' : 0 , 'buffers_byte' : b''} queue = Queue( 128 ) __stop = False def __init__( self , filename): self .filename = filename threading.Thread.__init__( self ) #Override def run( self ): while not self .__stop: self .queue.get( True , 1 ) def put_data(data_dict): '''''将data_dict的数据放入队列,data_dict是一个字典,有两个元素:offset是偏移量,buffers_byte是二进制字节串''' self .queue.put(data_dict) def stop( self ): self .__stop = True class Progress(threading.Thread): interval = 1 total_size = 0 finished_size = 0 old_size = 0 __stop = False def __init__( self , interval = 0.5 ): self .interval = interval threading.Thread.__init__( self ) #Override def run( self ): # logging.info(' Total Finished Percent Speed') print ( ' Total Finished Percent Speed' ) while not self .__stop: time.sleep( self .interval) if self .total_size > 0 : percent = self .finished_size / self .total_size * 100 speed = ( self .finished_size - self .old_size) / self .interval msg = '%12d %12d %10.2f%% %12d' % ( self .total_size, self .finished_size, percent, speed) # logging.info(msg) print (msg) self .old_size = self .finished_size else : logging.error( 'Total size is zero' ) # ... end while statment # ... end run() method def stop( self ): self .__stop = True def update( self , finished_size, total_size): self .finished_size = finished_size self .total_size = total_size class TestDownloaderFunctions(unittest.TestCase): def setUp( self ): print ( 'setUp' ) def test_download( self ): url = 'http://dldir1.qq.com/qqfile/qq/QQ8.4/18376/QQ8.4.exe' filename = 'QQ8.4.exe' dl = Downloader(url, filename) dl.download() def tearDown( self ): print ( 'tearDown' ) if __name__ = = '__main__' : unittest.main() |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/ccpw_cn/article/details/51880860