1. 初探
在了解異步協程之前,我們首先得了解一些基礎概念,如阻塞和非阻塞、同步和異步、多進程和協程。
1.1 阻塞
阻塞狀態指程序未得到所需計算資源時被掛起的狀態。程序在等待某個操作完成期間,自身無法繼續處理其他的事情,則稱該程序在該操作上是阻塞的。
常見的阻塞形式有:
- 網絡 I/O 阻塞
- 磁盤 I/O 阻塞
- 用戶輸入阻塞等。
阻塞是無處不在的,包括 CPU 切換上下文時,所有的進程都無法真正處理事情,它們也會被阻塞。如果是多核 CPU 則正在執行上下文切換操作的核不可被利用。
1.2 非阻塞
程序在等待某操作過程中,自身不被阻塞,可以繼續處理其他的事情,則稱該程序在該操作上是非阻塞的。
非阻塞并不是在任何程序級別、任何情況下都可以存在的。僅當程序封裝的級別可以囊括獨立的子程序單元時,它才可能存在非阻塞狀態。
非阻塞的存在是因為阻塞存在,正因為某個操作阻塞導致的耗時與效率低下,我們才要把它變成非阻塞的。
1.3 同步
不同程序單元為了完成某個任務,在執行過程中需靠某種通信方式以協調一致,我們稱這些程序單元是同步執行的。
例如購物系統中更新商品庫存,需要用“行鎖”作為通信信號,讓不同的更新請求強制排隊順序執行,那更新庫存的操作是同步的。
簡言之,同步意味著有序。
1.4 異步
為完成某個任務,不同程序單元之間過程中無需通信協調,也能完成任務的方式,不相關的程序單元之間可以是異步的。
例如,爬蟲下載網頁。調度程序調用下載程序后,即可調度其他任務,而無需與該下載任務保持通信以協調行為。不同網頁的下載、保存等操作都是無關的,也無需相互通知協調。這些異步操作的完成時刻并不確定。
簡言之,異步意味著無序。
1.5 多進程
多進程就是利用 CPU 的多核優勢,在同一時間并行地執行多個任務,可以大大提高執行效率。
1.6 協程
協程,英文叫作 Coroutine,又稱微線程、纖程,協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此協程能保留上一次調用時的狀態,即所有局部狀態的一個特定組合,每次過程重入時,就相當于進入上一次調用的狀態。
協程本質上是個單進程,協程相對于多進程來說,無需線程上下文切換的開銷,無需原子操作鎖定及同步的開銷,編程模型也非常簡單。
我們可以使用協程來實現異步操作,比如在網絡爬蟲場景下,我們發出一個請求之后,需要等待一定的時間才能得到響應,但其實在這個等待過程中,程序可以干許多其他的事情,等到響應得到之后才切換回來繼續處理,這樣可以充分利用 CPU 和其他資源,這就是協程的優勢。
1.7 協程相對于多線程的優點
多線程編程是比較困難的, 因為調度程序任何時候都能中斷線程, 必須記住保留鎖, 去保護程序中重要部分, 防止多線程在執行的過程中斷。
而協程默認會做好全方位保護, 以防止中斷。我們必須顯示產出才能讓程序的余下部分運行。對協程來說, 無需保留鎖, 而在多個線程之間同步操作, 協程自身就會同步, 因為在任意時刻, 只有一個協程運行。總結下大概下面幾點:
- 無需系統內核的上下文切換,減小開銷;
- 無需原子操作鎖定及同步的開銷,不用擔心資源共享的問題;
- 單線程即可實現高并發,單核 CPU 即便支持上萬的協程都不是問題,所以很適合用于高并發處理,尤其是在應用在網絡爬蟲中。
2. 協程用法
接下來,我們來了解下協程的實現,從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程還是以生成器對象為基礎的,在 Python 3.5 則增加了 async/await,使得協程的實現更加方便。
Python 中使用協程最常用的庫莫過于 asyncio,所以本文會以 asyncio 為基礎來介紹協程的使用。
首先我們需要了解下面幾個概念。
- event_loop:事件循環,相當于一個無限循環,我們可以把一些函數注冊到這個事件循環上,當滿足條件發生的時候,就會調用對應的處理方法。
- coroutine:中文翻譯叫協程,在 Python 中常指代為協程對象類型,我們可以將協程對象注冊到事件循環中,它會被事件循環調用。我們可以使用 async 關鍵字來定義一個方法,這個方法在調用時不會立即被執行,而是返回一個協程對象。
- task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。
- future:代表將來執行或沒有執行的任務的結果,實際上和 task 沒有本質區別。
另外我們還需要了解 async/await 關鍵字,它是從 Python 3.5 才出現的,專門用于定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。
2.1 定義協程
協程就是一個函數,只是它滿足以下幾個特征:
- 依賴 I/O 操作(有 I/O 依賴的操作)
- 可以在進行 I/O 操作時暫停
- 無法直接運行
它的作用就是對有大量 I/O 操作的程序進行加速。
Python 協程屬于可等待對象,因此可以在其他協程中被等待。
什么叫可等待對象?——await,如果前面被標記 await 就表明他是個協程,我們需要等待它返回一個數據。
舉個例子,我從網絡上下載某個數據文件下載到我的本地電腦上,這很顯然是一個 I/O 操作。比方這個文件較大(2GB),可能需要耗時 30min 才能下載成功。而在這 30min 里面,它會卡在 await 后面。這個 await 標記了協程,那就意味著它可以被暫停,那既然該任務可以被暫停,我們就把它分離出去。我這個線程繼續執行其它任務,它這個 30min 分出去慢慢的傳輸,我這個程序再運行其他操作。
上面的代碼,Python 3.6 會給你報錯。報錯信息如下:
為什么會出現這樣的報錯呢?
因為從 Python 3.7+ 之后 Python 已經完全支持異步了,Python 3.6 之前只是支持部分異步,許多的方法是非常冗長的。
一個異步函數調用另一個異步函數:
tips:
異步主要做得是 I/O 類型,CPU 密集型就不需要使用異步。
一個異步調用另一個異步函數,不能直接被調用,必須添加 await
我們使用代碼驗證一下,不加 await 調用試一試:
輸出結果:
我們添加上 await 即可正常運行:
運行結果:
運行成功并沒有報錯,接下來我們要輸出得到的結果該怎么編寫代碼呢?直接賦值即可:
Ps:async 標記異步,await 標記等待。
如果我們不想使用 await 來運行異步函數,那這個時候我們就可以按如下方法來運行代碼:
首先我們來定義一個協程,體驗一下它和普通進程在實現上的不同之處,代碼如下:
代碼示例二中,我們首先引入了 asyncio這個包,這樣我們才可以使用 async和 await,然后我們使用 async定義了一個 execute方法,方法接收一個數字參數,方法執行之后會打印這個數字。
隨后我們直接調用了這個方法,然而這個方法并沒有執行,而是返回了一個 coroutine協程對象。隨后我們使用 get_event_loop方法創建了一個事件循環 loop,并調用了 loop對象的 run_until_complete方法將協程注冊到事件循環 loop中,然后啟動。最后我們才看到了 execute方法打印了輸出結果。
可見,async定義的方法就會變成一個無法直接執行的 coroutine對象,必須將其注冊到事件循環中才可以執行。
上面我們還提到了 task,它是對 coroutine 對象的進一步封裝,它里面相比 coroutine 對象多了運行狀態,比如 running、finished 等,我們可以用這些狀態來獲取協程對象的執行情況。
在上面的例子中,當我們將 coroutine 對象傳遞給 run_until_complete 方法的時候,實際上它進行了一個操作就是將 coroutine 封裝成了 task 對象,我們也可以顯式地進行聲明,如下所示:
運行結果:
這里我們定義了 loop 對象之后,接著調用了它的 create_task 方法將 coroutine 對象轉化為了 task 對象,隨后我們打印輸出一下,發現它是 pending 狀態。接著我們將 task 對象添加到事件循環中得到執行,隨后我們再打印輸出一下 task 對象,發現它的狀態就變成了 finished,同時還可以看到其 result 變成了 1,也就是我們定義的 execute 方法的返回結果。
另外定義 task 對象還有一種方式,就是直接通過 asyncio 的 ensure_future 方法,返回結果也是 task 對象,這樣的話我們就可以不借助于 loop 來定義,即使我們還沒有聲明 loop 也可以提前定義好 task 對象,寫法如下:
運行結果:
發現其運行效果都是一樣的。
2.2 創建 task 總結
- 以下代碼都是異步函數
- loop = asyncio.get_event_loop()task = loop.create_task(coroutine) # 需要提前聲明 loop
- task = asyncio.create_task.NET())
- task = asyncio.ensure_future(coroutine) # 不需要提前聲明
2.3 綁定回調
另外我們也可以為某個 task 綁定一個回調方法,比如我們來看下面的例子:
在這里我們定義了一個 request 方法,請求了百度,獲取其狀態碼,但是這個方法里面我們沒有任何 print 語句。隨后我們定義了一個 callback 方法,這個方法接收一個參數,是 task 對象,然后調用 print 方法打印了 task 對象的結果。這樣我們就定義好了一個 coroutine 對象和一個回調方法,我們現在希望的效果是,當 coroutine 對象執行完畢之后,就去執行聲明的 callback 方法。
那么它們二者怎樣關聯起來呢?
很簡單,只需要調用 add_done_callback方法即可,我們將 callback 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢之后就可以調用 callback 方法了,同時 task 對象還會作為參數傳遞給 callback 方法,調用 task 對象的 result 方法就可以獲取返回結果了。
運行結果:
實際上不用回調方法,直接在 task 運行完畢之后也可以直接調用 result 方法獲取結果,如下所示:
運行結果是一樣的:
2.4 多任務協程
上面的例子我們只執行了一次請求,如果我們想執行多次請求應該怎么辦呢?我們可以定義一個 task 列表,然后使用 asyncio 的 wait 方法即可執行,看下面的例子:
這里我們使用一個 for 循環創建了五個 task,組成了一個列表,然后把這個列表首先傳遞給了 asyncio 的 wait() 方法,然后再將其注冊到時間循環中,就可以發起五個任務了。最后我們再將任務的運行結果輸出出來,運行結果如下:
可以看到五個任務被順次執行了,并得到了運行結果。
2.5 協程之間的鏈式調用
我們可以通過使用 await 關鍵字,在一個協程中調用一個協程。一個協程可以啟動另一個協程,從而可以使任務根據工作內容,封裝到不同的協程中。我們可以在協程中使用 await 關鍵字,鏈式地調度協程,來形成一個協程任務流。像下面的例子一樣:
輸出:
在上面,我們知道調用協程需要通過創建一個事件循環然后再去運行。這里我們需要了解的是如果在協程里想調用一個協程我們需要使用 await 關鍵字,就拿上面的例子來說在 main 函數里調用協程 result1 和 result2。那么問題來了:await 干了什么呢?
2.6 await 的作用
我們前面使用到了許多次 await 那它的作用到底是什么呢?
await 的作用就是等待當前的協程運行結束之后再繼續進行下面代碼。因為我們執行 result1 的時間很短,所以在表面上看 result1 和 result2 是一起執行的。這就是 await 的作用。等待一個協程的執行完畢,如果有返回結果,那么就會接收到協程的返回結果,通過使用 return 可以返回協程的一個結果,這個和同步函數的 return 使用方法一樣。
2.7 并發的執行任務
一系列的協程可以通過 await 鏈式調用,但是有的時候我們需要在一個協程里等待多個協程,比如我們在一個協程里等待 1000 個異步網絡請求,對于訪問次序沒有要求的時候,就可以使用關鍵字 wait 來解決了。wait 可以暫停一個協程,直到后臺操作完成。
Task 的使用
輸出:
如果運行的話會發現首先會打印 10 次數字,但是并不是順序執行的,這也說明 asyncio.wait 并發執行的時候是亂序的。如果想保證順序只要使用 gather 把 task 寫成解包的形式就行了,也就是上面的注釋部分的代碼。
2.8 如何在協程中使用普通的函數呢?
我們知道在普通函數中調用普通函數之間,函數名加括號即可,像下面這樣:
那么在協程中如何使用一個普通函數呢?在協程中可以通過一些方法去調用普通的函數。可以使用的關鍵字有 call_soon 等。
2.9 call_soon
可以通過字面意思理解調用立即返回。下面來看一下具體的使用例子:
輸出結果:
通過輸出結果我們可以發現我們在協程中成功調用了一個普通函數,順序地打印了 1 和 2。
看過這些例子之后,也許你就有疑問了,協程沒有缺點的么?
3. 協程的缺點
同樣的總結下大概以下 2 點。
3.1 無法使用 CPU 的多核
協程的本質是個單線程,它不能同時用上單個 CPU 的多個核,協程需要和進程配合才能運行在多 CPU 上。當然我們日常所編寫的絕大部分應用都沒有這個必要,就比如網絡爬蟲來說,限制爬蟲的速度還有其他的因素,比如網站并發量、網速等問題都會是爬蟲速度限制的因素。除非做一些密集型應用,這個時候才可能會用到多進程和協程。
3.2 處處都要使用非阻塞代碼
寫協程就意味著你要一值寫一些非阻塞的代碼,使用各種異步版本的庫,比如后面的異步爬蟲教程中用的 aiohttp 就是一個異步版本的request庫等。不過這些缺點并不能影響到使用協程的優勢。
4. 協程與異步
上面想必你已經完全掌握了,接下來,我們用睡眠來模仿一下耗時的 IO 操作。
輸出結果:
tips:
注意區別 time.sleep() 這個是不能使用到異步里面的 sleep,如果你直接用 time 模塊里面的 說了 sleep 那代碼是真正睡眠了,不會執行其他任務了。所以需要使用 asyncio.sleep() 的睡眠才可以。requests 包也是同理,所以接下來我會給大家講解一個新的包(aiohttp),我們將用 aiohttp 來代替 requests。
接下來我們來分析一下輸出結果:
這時候細心的小伙伴有可能會說,我們添加任務進去的時候是 0、1、2、3,可是在執行的時候卻是 3、1、0、2這就是我上面說的異步是不可控,隨機的。
小結:
我在使用異步的時候,上面一共說到了三種:
執行單個任務:
- await 執行異步
- asyncio.create_task(function)
執行多個任務:
- 獲取事件循環:loop = asyncio.get_event_loop()、loop.run_until_complete(asyncio.wait(list))
5. 異步爬蟲實戰
抓取目標網站:百思不得姐
輸出結果:
補充:
那到這里,同學們已經掌握了:多線程、多進程、線程池、進程池、異步。那有同學可能會問:可不可以把這幾個方法結合起來呢?
那我告訴你們的是,異步只能用異步的方法執行,不過大家是否用過 concurrent.future 模塊呢?這個模塊是底層是 異步,所以這也是我接下來所要說的。
6. 異步使用線程池與進程池
Concurrent.futures 這個模塊可以和異步連接,具有線程池和進程池。管理并發編程,處理非確定性的執行流程,同步功能。
使用 requests 的異步
代碼如下:
編寫程序測試時間,建議不要同時運行,注釋掉其他運行方法再運行:
輸出結果:
我們來分析一下輸出結果,我們會分析進程池花費的時間會比線程池更多,這是為什么呢?
- 多線程非常適合 I/O 密集型,不適合 CPU 密集型;
- 進程池創建銷毀的資源開銷大,創建一個進程所耗費的資源要比創建一個線程耗費的時間大很多,銷毀它也需要很長的時間。(準備工作非常多)
7. 小結
對于協程的入門來說,這些知識已經夠用了。當然協程涉及到的知識不止這些,這里只是為了大家提前對協程有一定的了解,后面將繼續講解協程的其他知識,一切的協程知識基礎都是為后面的異步爬蟲教程做準備,只有熟悉了使用協程才能在后面教程中快速上手操作。接下來將進一步提到本文沒有提及的事件循環、Task、Future、Awaitable 等一系列知識點,以及協程的高層 API 知識。敬請期待!