欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

python轉化異步函數(shù) python 異步

python2.7怎么實現(xiàn)異步

改進之前

在思明等地區(qū),都構建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務理念,為客戶提供做網(wǎng)站、成都網(wǎng)站建設 網(wǎng)站設計制作按需求定制開發(fā),公司網(wǎng)站建設,企業(yè)網(wǎng)站建設,成都品牌網(wǎng)站建設,全網(wǎng)營銷推廣,外貿(mào)網(wǎng)站制作,思明網(wǎng)站建設費用合理。

之前,我的查詢步驟很簡單,就是:

前端提交查詢請求 -- 建立數(shù)據(jù)庫連接 -- 新建游標 -- 執(zhí)行命令 -- 接受結果 -- 關閉游標、連接

這幾大步驟的順序執(zhí)行。

這里面當然問題很大:

建立數(shù)據(jù)庫連接實際上就是新建一個套接字。這是進程間通信的幾種方法里,開銷最大的了。

在“執(zhí)行命令”和“接受結果”兩個步驟中,線程在阻塞在數(shù)據(jù)庫內(nèi)部的運行過程中,數(shù)據(jù)庫連接和游標都處于閑置狀態(tài)。

這樣一來,每一次查詢都要順序的新建數(shù)據(jù)庫連接,都要阻塞在數(shù)據(jù)庫返回結果的過程中。當前端提交大量查詢請求時,查詢效率肯定是很低的。

第一次改進

之前的模塊里,問題最大的就是第一步——建立數(shù)據(jù)庫連接套接字了。如果能夠一次性建立連接,之后查詢能夠反復服用這個連接就好了。

所以,首先應該把數(shù)據(jù)庫查詢模塊作為一個單獨的守護進程去執(zhí)行,而前端app作為主進程響應用戶的點擊操作。那么兩條進程怎么傳遞消息呢?翻了幾天Python文檔,終于構思出來:用隊列queue作為生產(chǎn)者(web前端)向消費者(數(shù)據(jù)庫后端)傳遞任務的渠道。生產(chǎn)者,會與SQL命令一起,同時傳遞一個管道pipe的連接對象,作為任務完成后,回傳結果的渠道。確保,任務的接收方與發(fā)送方保持一致。

作為第二個問題的解決方法,可以使用線程池來并發(fā)獲取任務隊列中的task,然后執(zhí)行命令并回傳結果。

第二次改進

第一次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應速度有很明顯的加快。

但是對于第二個問題,使用線程池還是有些欠妥當。因為,CPython解釋器存在GIL問題,所有線程實際上都在一個解釋器進程里調(diào)度。線程稍微開多一點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多一點,甚至會出現(xiàn)“抖動”問題(也就是剛剛喚醒一個線程,就進入掛起狀態(tài),剛剛換到棧幀或內(nèi)存的上下文,又被換回內(nèi)存或者磁盤),效率大大降低。也就是說,線程池的并發(fā)量很有限。

試過了多進程、多線程,只能在單個線程里做文章了。

Python中的asyncio庫

Python里有大量的協(xié)程庫可以實現(xiàn)單線程內(nèi)的并發(fā)操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫同樣可以實現(xiàn)協(xié)程并發(fā)。asyncio庫大大降低了Python中協(xié)程的實現(xiàn)難度,就像定義普通函數(shù)那樣就可以了,只是要在def前面多加一個async關鍵詞。async def函數(shù)中,需要阻塞在其他async def函數(shù)的位置前面可以加上await關鍵詞。

import asyncio

async def wait():

await asyncio.sleep(2)

async def execute(task):

process_task(task)

await wait()

continue_job()

async def函數(shù)的執(zhí)行稍微麻煩點。需要首先獲取一個loop對象,然后由這個對象代為執(zhí)行async def函數(shù)。

loop = asyncio.get_event_loop()

loop.run_until_complete(execute(task))

loop.close()

loop在執(zhí)行execute(task)函數(shù)時,如果遇到await關鍵字,就會暫時掛起當前協(xié)程,轉而去執(zhí)行其他阻塞在await關鍵詞的協(xié)程,從而實現(xiàn)協(xié)程并發(fā)。

不過需要注意的是,run_until_complete()函數(shù)本身是一個阻塞函數(shù)。也就是說,當前線程會等候一個run_until_complete()函數(shù)執(zhí)行完畢之后,才會繼續(xù)執(zhí)行下一部函數(shù)。所以下面這段代碼并不能并發(fā)執(zhí)行。

for task in task_list:

loop.run_until_complete(task)

對與這個問題,asyncio庫也有相應的解決方案:gather函數(shù)。

loop = asyncio.get_event_loop()

tasks = [asyncio.ensure_future(execute(task))

for task in task_list]

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

當然了,async def函數(shù)的執(zhí)行并不只有這兩種解決方案,還有call_soon與run_forever的配合執(zhí)行等等,更多內(nèi)容還請參考官方文檔。

Python下的I/O多路復用

協(xié)程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復用則完全不存在這個問題。

目前,Linux上比較火的I/O多路復用API要算epoll了。Tornado,就是通過調(diào)用C語言封裝的epoll庫,成功解決了C10K問題(當然還有Pypy的功勞)。

在Linux里查文檔,可以看到epoll只有三類函數(shù),調(diào)用起來比較方便易懂。

創(chuàng)建epoll對象,并返回其對應的文件描述符(file descriptor)。

int epoll_create(int size);

int epoll_create1(int flags);

控制監(jiān)聽事件。第一個參數(shù)epfd就對應于前面命令創(chuàng)建的epoll對象的文件描述符;第二個參數(shù)表示該命令要執(zhí)行的動作:監(jiān)聽事件的新增、修改或者刪除;第三個參數(shù),是要監(jiān)聽的文件對應的描述符;第四個,代表要監(jiān)聽的事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

等候。這是一個阻塞函數(shù),調(diào)用者會等候內(nèi)核通知所注冊的事件被觸發(fā)。

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout);

int epoll_pwait(int epfd, struct epoll_event *events,

int maxevents, int timeout,

const sigset_t *sigmask);

在Python的select庫里:

select.epoll()對應于第一類創(chuàng)建函數(shù);

epoll.register(),epoll.unregister(),epoll.modify()均是對控制函數(shù)epoll_ctl的封裝;

epoll.poll()則是對等候函數(shù)epoll_wait的封裝。

Python里epoll相關API的最大問題應該是在epoll.poll()。相比于其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是后者的第二個參數(shù)struct epoll_event *events。沒法實現(xiàn)精確控制。因此只能使用替代方案:select.select()函數(shù)。

根據(jù)Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統(tǒng)中select函數(shù)的直接調(diào)用,與C語言API的傳參很接近。前三個參數(shù)都是列表,其中的元素都是要注冊到內(nèi)核的文件描述符。如果想用自定義類,就要確保實現(xiàn)了fileno()方法。

其分別對應于:

rlist: 等候直到可讀

wlist: 等候直到可寫

xlist: 等候直到異常。這個異常的定義,要查看系統(tǒng)文檔。

select.select(),類似于epoll.poll(),先注冊文件和事件,然后保持等候內(nèi)核通知,是阻塞函數(shù)。

實際應用

Psycopg2庫支持對異步和協(xié)程,但和一般情況下的用法略有區(qū)別。普通數(shù)據(jù)庫連接支持不同線程中的不同游標并發(fā)查詢;而異步連接則不支持不同游標的同時查詢。所以異步連接的不同游標之間必須使用I/O復用方法來協(xié)調(diào)調(diào)度。

所以,我的大致實現(xiàn)思路是這樣的:首先并發(fā)執(zhí)行大量協(xié)程,從任務隊列中提取任務,再向連接池請求連接,創(chuàng)建游標,然后執(zhí)行命令,并返回結果。在獲取游標和接受查詢結果之前,均要阻塞等候內(nèi)核通知連接可用。

其中,連接池返回連接時,會根據(jù)引用連接的協(xié)程數(shù)量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。

我的代碼位于:bottle-blog/dbservice.py

存在問題

當然了,這個流程目前還一些問題。

首先就是每次輪詢拿到任務之后,都會走這么一個流程。

獲取連接 -- 新建游標 -- 執(zhí)行任務 -- 關閉游標 -- 取消連接引用

本來,最好的情況應該是:在輪詢之前,就建好游標;在輪詢時,直接等候內(nèi)核通知,執(zhí)行相應任務。這樣可以減少輪詢時的任務量。但是如果協(xié)程提前對應好連接,那就不能保證在獲取任務時,保持各連接負載均衡了。

所以這一塊,還有工作要做。

還有就是epoll沒能用上,有些遺憾。

以后打算寫點C語言的內(nèi)容,或者用Python/C API,或者用Ctypes包裝共享庫,來實現(xiàn)epoll的調(diào)用。

最后,請允許我吐槽一下Python的epoll相關文檔:簡直太弱了!!!必須看源碼才能弄清楚功能。

python異步有哪些方式

yield相當于return,他將相應的值返回給調(diào)用next()或者send()的調(diào)用者,從而交出了CPU使用權,而當調(diào)用者再次調(diào)用next()或者send()的時候,又會返回到y(tǒng)ield中斷的地方,如果send有參數(shù),還會將參數(shù)返回給yield賦值的變量,如果沒有就和next()一樣賦值為None。但是這里會遇到一個問題,就是嵌套使用generator時外層的generator需要寫大量代碼,看如下示例:?

注意以下代碼均在Python3.6上運行調(diào)試

#!/usr/bin/env python# encoding:utf-8def inner_generator():

i = 0

while True:

i = yield i ? ? ? ?if i 10: ? ? ? ? ? ?raise StopIterationdef outer_generator():

print("do something before yield")

from_inner = 0

from_outer = 1

g = inner_generator()

g.send(None) ? ?while 1: ? ? ? ?try:

from_inner = g.send(from_outer)

from_outer = yield from_inner ? ? ? ?except StopIteration: ? ? ? ? ? ?breakdef main():

g = outer_generator()

g.send(None)

i = 0

while 1: ? ? ? ?try:

i = g.send(i + 1)

print(i) ? ? ? ?except StopIteration: ? ? ? ? ? ?breakif __name__ == '__main__':

main()1234567891011121314151617181920212223242526272829303132333435363738394041

為了簡化,在Python3.3中引入了yield from

yield from

使用yield from有兩個好處,

1、可以將main中send的參數(shù)一直返回給最里層的generator,?

2、同時我們也不需要再使用while循環(huán)和send (), next()來進行迭代。

我們可以將上邊的代碼修改如下:

def inner_generator():

i = 0

while True:

i = yield i ? ? ? ?if i 10: ? ? ? ? ? ?raise StopIterationdef outer_generator():

print("do something before coroutine start") ? ?yield from inner_generator()def main():

g = outer_generator()

g.send(None)

i = 0

while 1: ? ? ? ?try:

i = g.send(i + 1)

print(i) ? ? ? ?except StopIteration: ? ? ? ? ? ?breakif __name__ == '__main__':

main()1234567891011121314151617181920212223242526

執(zhí)行結果如下:

do something before coroutine start123456789101234567891011

這里inner_generator()中執(zhí)行的代碼片段我們實際就可以認為是協(xié)程,所以總的來說邏輯圖如下:?

接下來我們就看下究竟協(xié)程是啥樣子

協(xié)程coroutine

協(xié)程的概念應該是從進程和線程演變而來的,他們都是獨立的執(zhí)行一段代碼,但是不同是線程比進程要輕量級,協(xié)程比線程還要輕量級。多線程在同一個進程中執(zhí)行,而協(xié)程通常也是在一個線程當中執(zhí)行。它們的關系圖如下:

我們都知道Python由于GIL(Global Interpreter Lock)原因,其線程效率并不高,并且在*nix系統(tǒng)中,創(chuàng)建線程的開銷并不比進程小,因此在并發(fā)操作時,多線程的效率還是受到了很大制約的。所以后來人們發(fā)現(xiàn)通過yield來中斷代碼片段的執(zhí)行,同時交出了cpu的使用權,于是協(xié)程的概念產(chǎn)生了。在Python3.4正式引入了協(xié)程的概念,代碼示例如下:

import asyncio# Borrowed from countdown(number, n):

while n 0:

print('T-minus', n, '({})'.format(number)) ? ? ? ?yield from asyncio.sleep(1)

n -= 1loop = asyncio.get_event_loop()

tasks = [

asyncio.ensure_future(countdown("A", 2)),

asyncio.ensure_future(countdown("B", 3))]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()12345678910111213141516

示例顯示了在Python3.4引入兩個重要概念協(xié)程和事件循環(huán),?

通過修飾符@asyncio.coroutine定義了一個協(xié)程,而通過event loop來執(zhí)行tasks中所有的協(xié)程任務。之后在Python3.5引入了新的async await語法,從而有了原生協(xié)程的概念。

async await

在Python3.5中,引入了ayncawait 語法結構,通過”aync def”可以定義一個協(xié)程代碼片段,作用類似于Python3.4中的@asyncio.coroutine修飾符,而await則相當于”yield from”。

先來看一段代碼,這個是我剛開始使用asyncawait語法時,寫的一段小程序。

#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time

async def wait_download(url):

response = await requets.get(url)

print("get {} response complete.".format(url))

async def main():

start = time.time()

await asyncio.wait([

wait_download(""),

wait_download(""),

wait_download("")])

end = time.time()

print("Complete in {} seconds".format(end - start))

loop = asyncio.get_event_loop()

loop.run_until_complete(main())12345678910111213141516171819202122232425

這里會收到這樣的報錯:

Task exception was never retrieved

future: Task finished coro=wait_download() done, defined at asynctest.py:9 exception=TypeError("object Response can't be used in 'await' expression",)

Traceback (most recent call last):

File "asynctest.py", line 10, in wait_download

data = await requests.get(url)

TypeError: object Response can't be used in 'await' expression123456

這是由于requests.get()函數(shù)返回的Response對象不能用于await表達式,可是如果不能用于await,還怎么樣來實現(xiàn)異步呢??

原來Python的await表達式是類似于”yield from”的東西,但是await會去做參數(shù)檢查,它要求await表達式中的對象必須是awaitable的,那啥是awaitable呢? awaitable對象必須滿足如下條件中其中之一:

1、A native coroutine object returned from a native coroutine function .

原生協(xié)程對象

2、A generator-based coroutine object returned from a function decorated with types.coroutine() .

types.coroutine()修飾的基于生成器的協(xié)程對象,注意不是Python3.4中asyncio.coroutine

3、An object with an await method returning an iterator.

實現(xiàn)了await method,并在其中返回了iterator的對象

根據(jù)這些條件定義,我們可以修改代碼如下:

#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time

async def download(url): # 通過async def定義的函數(shù)是原生的協(xié)程對象

response = requests.get(url)

print(response.text)

async def wait_download(url):

await download(url) # 這里download(url)就是一個原生的協(xié)程對象

print("get {} data complete.".format(url))

async def main():

start = time.time()

await asyncio.wait([

wait_download(""),

wait_download(""),

wait_download("")])

end = time.time()

print("Complete in {} seconds".format(end - start))

loop = asyncio.get_event_loop()

loop.run_until_complete(main())123456789101112131415161718192021222324252627282930

好了現(xiàn)在一個真正的實現(xiàn)了異步編程的小程序終于誕生了。?

而目前更牛逼的異步是使用uvloop或者pyuv,這兩個最新的Python庫都是libuv實現(xiàn)的,可以提供更加高效的event loop。

uvloop和pyuv

pyuv實現(xiàn)了Python2.x和3.x,但是該項目在github上已經(jīng)許久沒有更新了,不知道是否還有人在維護。?

uvloop只實現(xiàn)了3.x, 但是該項目在github上始終活躍。

它們的使用也非常簡單,以uvloop為例,只需要添加以下代碼就可以了

import asyncioimport uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())123

理解python異步機制

最重要的是生成器函數(shù)碰到y(tǒng)ield停止執(zhí)行,收到next或send才會繼續(xù)執(zhí)行的機制。

而且send方法令我們可以傳遞值到生成器暫停的地方。

生成器執(zhí)行結束拋出 StopIteration 異常。

yield from用于把其他生成器當做子例程調(diào)用。

然后它可以被其他用 async def 定義的的協(xié)程函數(shù)B和C await ,只有當 await 返回時,B和C才繼續(xù)執(zhí)行。

這樣我們就可以有效地控制B和C的執(zhí)行順序。

然后我們創(chuàng)建了一個調(diào)度器,它對列表進行了兩次深拷貝以避免問題。它循環(huán)協(xié)程隊列,使用 send 方法對每個協(xié)程依次遞進,如果有協(xié)程已經(jīng)完成則將其移出隊列,當列表中的協(xié)程全部完成時結束。

然后通過 args=coro.send(None) 與該函數(shù)碰撞,得到含有 delay 參數(shù)的字典作為 send 的返回值。便可以判斷出是否調(diào)用調(diào)度器的睡眠機制。

最后在調(diào)度器中實現(xiàn)每一次協(xié)程列表循環(huán)結束后判斷在睡眠列表中的協(xié)程是否有到時間的,到時間或時間超出則添加到運行協(xié)程列表中進入循環(huán)執(zhí)行。如果運行列表中的協(xié)程都執(zhí)行完了,則查看睡眠列表中的協(xié)程中還需睡眠的最少時間,線程睡眠,睡眠完成再將其添加到運行隊列。

該裝飾器能將一個比較耗時的計算函數(shù)封裝為一個協(xié)程,使其可以被其他協(xié)程 await 。在調(diào)度器中利用 send 函數(shù)的返回值可以獲取它的類型為 background 、函數(shù)入口地址以及函數(shù)的傳參,然后在調(diào)度器中按相應機制執(zhí)行。

第二部分 是在調(diào)度器中的修改:我們讓調(diào)度器類擁有了一個私有的 concurrent.futures.ThreadPoolExecutor() 對象。并在運行協(xié)程隊列的循環(huán)判斷中將 background 類型的操作提交給線程池對象,并將當前的協(xié)程移出運行隊列,添加到 futures 隊列中。然后在每次運行隊列循環(huán)后判斷 futures 中的任務是否有完成的(使用的參數(shù)為一旦有任一任務完成或被取消都返回),如果主線程此時處于將要睡眠的狀態(tài),就等待相應的時間,沒有的話則立刻返回,下次再查詢,完成的任務將其所在協(xié)程帶入運行隊列,任務結果通過調(diào)度器 send 傳回該協(xié)程。

Python異步編程4:協(xié)程函數(shù),協(xié)程對象,await關鍵字

協(xié)程函數(shù):async def?函數(shù)名。3.5+

協(xié)程對象:執(zhí)行協(xié)程函數(shù)()得到的協(xié)程對象。

3.5之后的寫法:

3.7之后的寫法:更簡便

await后面?跟?可等待的對象。(協(xié)程對象,F(xiàn)uture,Task對象?約等于IO等待)

await實例2:串行執(zhí)行。 一個協(xié)程函數(shù)里面可以支持多個await ,雖然會串行,但是如果有其他協(xié)程函數(shù),任務列表也在執(zhí)行,依然會切換。只是案例中的main對應執(zhí)行的others1和others2串行 。 await會等待對象的值得到之后才繼續(xù)往下走。

當前標題:python轉化異步函數(shù) python 異步
轉載源于:http://chinadenli.net/article42/dogceec.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供用戶體驗品牌網(wǎng)站設計云服務器外貿(mào)網(wǎng)站建設ChatGPT網(wǎng)站排名

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

h5響應式網(wǎng)站建設