threading模塊

局域網(wǎng)IP掃描實(shí)例
# 單線程:
import subprocess,time,threading
a = time.clock()
with open("check_ping.txt",'w') as f:
for i in range(1,20):
my_ip = ".".join(["192.163.37",str(i)])
try:
subprocess.check_call('ping -n 1 -w 1 %s'%(my_ip),shell=True)
except subprocess.CalledProcessError:
pass
else:
f.write("%s 可以ping通\n"%my_ip)
b = time.clock()
print(b)
# 總共花費(fèi)8s多# 多線程(一)創(chuàng)建 Thread 的實(shí)例,傳給它一個(gè)函數(shù)
a = time.clock()
def check_ping(IP,obj):
try:
subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
except subprocess.CalledProcessError:
pass
else:
obj.write("%s 可以ping通\n" % IP)
def main():
threads = []
with open("check_ping_1.txt", 'w') as f:
for i in range(1, 20):
my_ip = ".".join(["192.163.37", str(i)])
t = threading.Thread(target=check_ping,args=(my_ip,f)) #Thread方法:實(shí)例化一個(gè)線程對(duì)象,把函數(shù)(target)和參數(shù)(args)傳進(jìn)去,然后返回Thread實(shí)例,這里并沒有執(zhí)行。
threads.append(t)
num = range(len(threads))
for i in num:
threads[i].start() #執(zhí)行線程的start方法,線程開始執(zhí)行
for i in num:
threads[i].join() #這行線程的join方法,等待線程結(jié)束,如果主進(jìn)程不需要等待線程結(jié)束,可以不需要調(diào)用join方法。
if __name__ == '__main__':
main()
b = time.clock()
print(b)
# 總共花費(fèi)0.9s# 多線程(二)創(chuàng)建 Thread 的實(shí)例,傳給它一個(gè)可調(diào)用的類實(shí)例
a = time.clock()
class Thread_func:
def __init__(self,func,args):
self.func = func
self.args = args
def __call__(self): #__call__方法:將類模擬成函數(shù),實(shí)例化后的類再次實(shí)例化相當(dāng)于執(zhí)行了__call__方法。
self.func(*self.args)
def check_ping(IP,obj):
try:
subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
except subprocess.CalledProcessError:
pass
else:
obj.write("%s 可以ping通\n" % IP)
def main():
threads = []
with open("check_ping_2.txt", 'w') as f:
for i in range(1, 20):
my_ip = ".".join(["192.163.37", str(i)])
t = threading.Thread(target=Thread_func(check_ping,(my_ip,f))) #Thread_func實(shí)例化時(shí)已經(jīng)傳入了參數(shù),所以Thread方法中就不用args來傳參了。
threads.append(t)
num = range(len(threads))
for i in num:
threads[i].start()
for i in num:
threads[i].join()
if __name__ == '__main__':
main()
b = time.clock()
print(b)
# 總共花費(fèi)1# 多線程(三)派生 Thread 的子類,并創(chuàng)建子類的實(shí)例
a = time.clock()
class Thread_func(threading.Thread): #繼承Thread,調(diào)用更靈活。
def __init__(self,func,args):
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self): #這里必須使用run方法,當(dāng)線程開啟后執(zhí)行。
self.func(*self.args)
def check_ping(IP,obj):
try:
subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True)
except subprocess.CalledProcessError:
pass
else:
obj.write("%s 可以ping通\n" % IP)
def main():
threads = []
with open("check_ping_3.txt", 'w') as f:
for i in range(1, 20):
my_ip = ".".join(["192.163.37", str(i)])
t = Thread_func(check_ping,(my_ip,f))
threads.append(t)
num = range(len(threads))
for i in num:
threads[i].start()
for i in num:
threads[i].join()
if __name__ == '__main__':
main()
b = time.clock()
print(b)
# 花費(fèi)0.7另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
分享標(biāo)題:python多線程--threading簡(jiǎn)單實(shí)現(xiàn)-創(chuàng)新互聯(lián)
分享網(wǎng)址:http://chinadenli.net/article32/eospc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計(jì)、用戶體驗(yàn)、外貿(mào)建站、面包屑導(dǎo)航、域名注冊(cè)、動(dòng)態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容