python 비동기(2/2) - asyncio
asyncio(Asynchronous I/O)는 비동기 프로그래밍을 위한 모듈이며 CPU 작업과 블로킹 I/O를 병렬로 처리하게 해준다. 즉, 비동기 프로그래밍이 만능이 아니며 멀티 스레드와 차이를 이해해야 한다.(간혹 초보 개발자 중에 비동기 프로그래밍이 동기 처리보다 무조건적으로 좋다고 이해하시는 분들이 있다)
asyncio 사용법
코루틴 함수를 호출하여 생성한 코루틴 객체는 그 스스로 실행되지 않는다. 따라서 객체를 만든 후 따로 실행을 해줘야하는데 asyncio
에서 코루틴을 실행하는 방법은 대략 3가지이다.
- 함수 외부
asyncio.run()
함수 -> 일반적인 비동기 프로그래밍의 시작점, 3.7 버전부터 사용 가능
- 함수 내부
await
키워드 -> 코루틴 내에서만 사용, 맨 처음 코루틴 실행하는 용도로는 사용할 수 없음asyncio.create_task()
함수 -> task를 동시적으로 실행하고 싶은 경우에 사용
위 실행 방법을 알아보기 앞서 기본 개념부터 정리해보자.
async def
async def
키워드는 파이썬 3.5 이상부터 사용 가능하다. 함수 앞에 아래와 같이 async
를 붙여줘서 네이티브 코루틴을 만들어준다.
import asyncio
async def hello(): # async def로 네이티브 코루틴을 만듦
print('Hello, world!')
asyncio.run(hello())
Hello, world!
async
키워드만 붙여준다고 비동기를 사용할 수 있는건 아니다. asyncio.get_event_loop
함수로 이벤트 루프를 얻고 run_until_complete
에 네이티브 코루틴 객체를 넣어준다.
run_until_complete
는 네이티브 코루틴이 이벤트 루프에서 실행되도록 예약하고, 해당 네이티브 코루틴이 끝날 때까지 기다린다. 이렇게 하면 이벤트 루프를 통해서 hello 코루틴이 실행되고, 할 일이 끝났으면 loop.close()
로 이벤트 루프를 닫아준다.
이벤트 루프(event loop)란?
무한 루프를 돌며 매 루프마다 작업(= task)을 하나씩 실행시키는 로직을 의미
await
주의할 점은 위에서도 말했지만 await
는 네이티브 코루틴 안에서만 사용할 수 있다. 즉, 맨 처음 코루틴 실행에서는 사용할 수 없다.
그래서 코루틴을 맨처음 실행하는 함수로 asyncio.run()
를 일반적으로 사용하게 되고, await
를 통해 코루틴 함수 내부에서 코루틴을 실행하거나 코루틴 체인으로 들어가는 엔트리 포인트를 만들어 줄 때 사용한다.
await
는 다음과 같이 변수에 awaitable(대기가능) 객체를 바인딩하게 된다.
변수 = await 코루틴객체
변수 = await 퓨처객체
변수 = await 태스크객체
import asyncio
async def add(a, b):
print('add: {0} + {1}'.format(a, b))
await asyncio.sleep(1.0) # 1초 대기. asyncio.sleep도 네이티브 코루틴
return a + b # 두 수를 더한 결과 반환
async def print_add(a, b):
result = await add(a, b) # await로 다른 네이티브 코루틴 실행하고 반환값을 변수에 저장
print('print_add: {0} + {1} = {2}'.format(a, b, result))
asyncio.run(print_add(1,2)) # asyncio.run()을 통해 실행
add: 1 + 2
print_add: 1 + 2 = 3
먼저 print_add부터 보자. print_add에서는 await
로 add를 실행하고 반환값을 변수에 저장했다. 이렇게 코루틴 안에서 다른 코루틴을 실행할 때는 await
를 사용한다.
add에서는 await asyncio.sleep(1.0)
로 1초 대기한 뒤 return a + b로 두 수를 더한 결과를 반환한다.
asyncio.run()로 실행하기
asyncio.run()
함수는 현재의 쓰레드에 새 이벤트 루프를 설정하고, 해당 이벤트 루프에서 인자로 넘어오는 코루틴 객체에 해당하는 코루틴을 태스크로 예약하여 실행시킨 뒤, 해당 태스크의 실행이 완료되면 이벤트 루프를 닫는 역할을 수행한다. 단, 이 함수는 3.7 버전 이상의 Python에서만 사용할 수 있다.
import asyncio
async def add(a, b):
print('add: {0} + {1}'.format(a, b))
await asyncio.sleep(1.0) # 1초 대기. asyncio.sleep도 네이티브 코루틴
return a + b # 두 수를 더한 결과 반환
async def print_add(a, b):
result = await add(a, b) # await로 다른 네이티브 코루틴 실행하고 반환값을 변수에 저장
print('print_add: {0} + {1} = {2}'.format(a, b, result))
asyncio.run(print_add(1,2)) # asyncio.run()을 통해 실행
이런식으로 네이티브 코루틴 함수를 asyncio.run()
함수 안에 넣어주면 된다.
만약 파이썬 버전 3.7 이전이라면 아래와 같이 사용해주어야 한다.
loop = asyncio.get_event_loop() # 이벤트 루프를 얻음
loop.run_until_complete(print_add(1, 2)) # print_add가 끝날 때까지 이벤트 루프를 실행
loop.close() # 이벤트 루프를 닫음
asyncio.create_task()로 실행하기
파이썬은 싱글 스레드 인터프리터 언어이기 때문에 동시에 여러 작업을 하더라도 Parallel이 아닌 Concurrent하게 실행한다(엄밀한 의미의 동시가 아니라 여러 태스크들을 왔다 갔다 하며 한 쓰레드에서 실행하는 개념).
asyncio.run()
함수는 기본적으로 하나의 태스크만을 생성하여 실행한다. 따라서 코루틴 체인 과정에서 추가적인 태스크를 생성하여 실행하지 않았다면 현재의 태스크가 중단되었을 때 이벤트 루프는 실행시킬 다른 태스크가 없게 된다.
따라서 동시적인(Concurrent) 실행을 위해서는 asyncio.create_task()
함수를 호출함으로써 태스크를 추가로 생성하여 실행해야 한다. python 3.7 이전 버전은 asyncio.future_ensure
를 사용해야 한다.
다음으로, 모든 퓨처 객체(태스크 객체 포함)들이 완료 상태가 될 때까지 기다리는 함수가 asyncio.gather()
이다. 이 함수는 인자로 여러 개의 Awaitable 객체들을 받을 수 있는데, 만약 코루틴 객체를 받으면 이는 자동으로 태스크 객체로 래핑이 된다. 따라서 사실상 퓨처 객체(태스크 객체 포함)만 넘어간다고 생각해도 된다. 그리고 모든 퓨처 객체들이 완료 상태가 되면 그것들의 결과 값들을 리스트 형태로 반환한다.
import asyncio
async def sleep(sec):
await asyncio.sleep(sec)
return sec
async def main():
sec_list = [1, 2]
tasks = [asyncio.create_task(sleep(sec)) for sec in sec_list] # create_task를 통해 2개의 Task(Task 1 객체, Task 2 객체)를 만듦
tasks_results = await asyncio.gather(*tasks) # gather를 통해 2개의 Task 값(Task 1 객체의 결과 값, Task 2 객체의 결과 값)을 리스트로 바인딩
return tasks_results
result = asyncio.run(main()) # asyncio.run()을 통해 실행 후 result에 결과 저장
print('result : {}'.format(result))
result : [1, 2]
실습 - 웹 페이지 가져오기
순차적으로 가져오기
from time import time
from urllib.request import Request, urlopen
urls = ['https://www.google.co.kr/search?q=' + i
for i in ['apple', 'pear', 'grape', 'pineapple', 'orange', 'strawberry']]
begin = time()
result = []
for url in urls:
request = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) # UA가 없으면 403 에러 발생
response = urlopen(request)
page = response.read()
result.append(len(page))
print(result)
end = time()
print('실행 시간: {0:.3f}초'.format(end - begin))
[81906, 144177, 104536, 68569, 91128, 142558]
실행 시간: 5.422초
비동기로 가져오기
from time import time
from urllib.request import Request, urlopen
import asyncio
urls = ['https://www.google.co.kr/search?q=' + i
for i in ['apple', 'pear', 'grape', 'pineapple', 'orange', 'strawberry']]
async def fetch(url):
loop = asyncio.get_event_loop()
request = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) # UA가 없으면 403 에러 발생
response = await loop.run_in_executor(None, urlopen, request) # run_in_executor 사용
page = await loop.run_in_executor(None, response.read) # run in executor 사용
return len(page)
async def main():
futures = [asyncio.create_task(fetch(url)) for url in urls]
# 태스크(퓨처) 객체를 리스트로 만듦
result = await asyncio.gather(*futures) # 결과를 한꺼번에 가져옴
print(result)
begin = time()
asyncio.run(main())
end = time()
print('실행 시간: {0:.3f}초'.format(end - begin))
[81906, 144177, 104536, 68569, 91128, 142558]
실행 시간: 1.314초
비동기를 사용하니 5초대에서 1초대로 줄었다.
urlopen
이나 response.read
같은 함수(메서드)는 결과가 나올 때까지 코드 실행이 중단(block)되는데 이런 함수들을 블로킹 I/O(blocking I/O) 함수라고 부른다. 특히 네이티브 코루틴 안에서 블로킹 I/O 함수를 실행하려면 asyncio.get_event_loop()
를 통해 이벤트 생성하고, run_in_executor
함수를 사용하여 다른 스레드에서 병렬로 실행시켜야 한다.
Reference