需要 Python 3.4+,一个参数用来选择测试搜索服务还是 GAE 服务。测试 GAE 服务的话需要先修改开头的两个变量。从标准输入读取 IP 地址或者 IP 段(形如 192.168.0.0/16)列表,每行一个。可用 IP 输出到标准输出。实时测试结果输出到标准错误。50 线程并发。
checkgoogleip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
#!/usr/bin/env python3 import sys from ipaddress import IPv4Network import http.client as client from concurrent.futures import ThreadPoolExecutor import argparse import ssl import socket # 先按自己的情况修改以下几行 APP_ID = 'your_id_here' APP_PATH = '/fetch.py' context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations( '/etc/ssl/certs/ca-certificates.crt' ) class HTTPSConnection(client.HTTPSConnection): def __init__( self , * args, hostname = None , * * kwargs): self ._hostname = hostname super ().__init__( * args, * * kwargs) def connect( self ): super (client.HTTPSConnection, self ).connect() if self ._tunnel_host: server_hostname = self ._tunnel_host else : server_hostname = self ._hostname or self .host sni_hostname = server_hostname if ssl.HAS_SNI else None self .sock = self ._context.wrap_socket( self .sock, server_hostname = sni_hostname) if not self ._context.check_hostname and self ._check_hostname: try : ssl.match_hostname( self .sock.getpeercert(), server_hostname) except Exception: self .sock.shutdown(socket.SHUT_RDWR) self .sock.close() raise def check_ip_p(ip, func): if func(ip): print (ip, flush = True ) def check_for_gae(ip): return _check(APP_ID + '.appspot.com' , APP_PATH, ip) def check_for_search(ip): return _check( 'www.google.com' , '/' , ip) def _check(host, path, ip): for chance in range ( 1 , - 1 , - 1 ): try : conn = HTTPSConnection( ip, timeout = 5 , context = context, hostname = host, ) conn.request( 'GET' , path, headers = { 'Host' : host, }) response = conn.getresponse() if response.status < 400 : print ( 'GOOD:' , ip, file = sys.stderr) else : raise Exception( 'HTTP Error %s %s' % ( response.status, response.reason)) return True except KeyboardInterrupt: raise except Exception as e: if isinstance (e, ssl.CertificateError): print ( 'WARN: %s is not Google\'s!' % ip, file = sys.stderr) chance = 0 if chance = = 0 : print ( 'BAD :' , ip, e, file = sys.stderr) return False else : print ( 'RE :' , ip, e, file = sys.stderr) def main(): parser = argparse.ArgumentParser(description = 'Check Google IPs' ) parser.add_argument( 'service' , choices = [ 'search' , 'gae' ], help = 'service to check' ) args = parser.parse_args() func = globals ()[ 'check_for_' + args.service] count = 0 with ThreadPoolExecutor(max_workers = 50 ) as executor: for l in sys.stdin: l = l.strip() if '/' in l: for ip in IPv4Network(l).hosts(): executor.submit(check_ip_p, str (ip), func) count + = 1 else : executor.submit(check_ip_p, l, func) count + = 1 print ( '%d IP checked.' % count) if __name__ = = '__main__' : main() |