Codong's Development Diary RSS 태그 관리 글쓰기 방명록
python/Scraping (2)
2021-08-07 16:55:06

개요


내가 예전에 멀티 프로세싱(Multi processing)을 이용한 scraping 방법에 대해 포스팅을 한 적이 있다.
(궁금하신 분들은 아래 링크 참조)

2021.03.31 - [python/Scraping] - [Python] beautifulsoup multiprocessing으로 속도 올리기

 

[Python] beautifulsoup multiprocessing으로 속도 올리기

개요 scraping을 진행하면서, beautiful soup으로 하나씩 셀렉터로 접근해서 a_tag의 href를 가져와서 그곳에 들어가서 데이터를 가져오는것이 목표였다. 문제는 속도가 너무 느린것이다..... 약 10000번 정

codong.tistory.com

하지만, 멀티 프로세싱을 이용하면 메모리를 많이 잡아먹는 문제가 생긴다. 그래서 다른 방법을 강구하기 시작했다.
그래서 찾은 방법은 바로
.
.
.
.
동시성 프로그래밍을 이용한 비동기처리다!!!!
하지만 우선 이녀석이 무엇인지를 알고 써야 좋지 않겠는가..?
간략하게 사전지식을 알아보고 넘어가자😆

동시성(Concurrency) vs 병렬성(Parallelism)

이것에 대해 잘 설명해준 글이 있어서 링크를 달아둔다.
https://nachwon.github.io/asyncio-futures/

내가 이해한 결과로는 동시성은 주어진 과제를 멀티 쓰레드나 비동기 통신을 이용하여 효율적으로 처리하여 대기시간을 줄여 결과적으로 전체 실행 시간을 줄여 병렬적으로 처리되는 듯한 효과를 낼 수 있고, 병렬성은 멀티 프로세싱을 활용하여 각각의 프로세스들이 독립적으로 과제를 처리하므로 전체 실행 시간을 줄인다.

여기서 중요한 것은 위의 말에는 전제조건이 있다는 것이다. 동시성은 각각의 프로세스들이 독립적으로 자원을 가지고 처리하는 것이 아니다보니, CPU bound 작업(네트워크나 파일에 엑세스하는 작업 없이 계산 작업 등)에서는 좋은 효과를 볼 수가 없다. 즉 외부 I/O 작업을 진행할 때에 발생하는 대기 시간을 줄여 동시적으로 처리하는 것 뿐이지, 외부가 아닌 컴퓨터의 연산작업을 진행할 때에는 좋은 효과를 볼 수 없다. 글에서 말하기로는 프로그램의 속도를 결정짓는 자원이 CPU이기 때문이라고 한다.

그렇다고 마냥 멀티프로세싱이 좋은가? 프로세스들을 여러개를 생성한다는 것은 곧 새로운 인터프리터를 띄우는 것이므로, 하나의 프로세스에서 여러 쓰레드를 생성하는 것보다는 훨씬 무겁고 제한적이며 어려움도 많을 것이기 때문이다.

결론은 상황에 맞게 잘 선택해서 쓴다면 좋은 효과를 얻을 수 있을 것이다. 🤩

그럼 일단 내 task를 생각해보자. 다른 사이트에서 request를 보내 response로 받은 데이터를 사용할 것이기 때문에, 내 컴퓨터가 아닌 외부 서버에서 하는 작업이니 I/O 작업임을 알 수 있다 ! 즉, 굳이 멀티프로세싱을 사용하지 않아도 동시성을 이용해 해결이 가능하다 !

그렇다면 멀티 스레딩이냐, asyncio냐 그것이 문제로다....🤔

왜 asyncio 냐!

이것에 대한 답은 이 글에서 도움을 받았다.

python에서는 GIL(Global interpreter Lock) 때문에 메인쓰레드에서 모든 연산을 처리하기에 Threading을 사용한 동시성 제어는 느리다고 한다. 그리고 우리는 HTTP 통신 대기와 같은 Blocking IO의 대기시간을 줄여 보고자, 쉽게 사용할 수 있는 asyncio 택할 수 있다.

자, 그러면 python에서 지원하는 Asyncio에 대해 알아보자!🔜

 

Asyncio


시작하기에 앞서 파이썬은 공식문서가 잘 되어 있기에 읽으면서 내가 이해한 부분을 적은 것이라 틀린 것이 있을 수 있다. 그렇기에 잘못된 것이 있거나 수정해야할 부분이 있으면 언제든지 댓글 달아주시면 감사하겠습니다,,

1. 코루틴(coroutine)과 태스크(task)

관련 공식문서 >> https://docs.python.org/ko/3/library/asyncio-task.html#id2

 

코루틴과 태스크 — Python 3.9.6 문서

코루틴과 태스크 이 절에서는 코루틴과 태스크로 작업하기 위한 고급 asyncio API에 관해 설명합니다. async/await 문법으로 선언된 코루틴은 asyncio 응용 프로그램을 작성하는 기본 방법입니다. 예를

docs.python.org

제일 먼저, 작업의 기본이 되는 코루틴과 태스크부터 시작하겠다. 공식문서의 설명은 async/await 문법으로 선언된 코루틴은 asyncio 응용 프로그램을 작성하는 기본 방법이라 설명하는데 감이 잘 안온다.

공식문서의 예제를 그대로 들고오겠다.

>>> import asyncio
>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> main()
<coroutine object main at 0x1053bb7c8>

함수를 만들어줄 때 앞에 async를 적어준다는 뜻은, 해당 함수를 코루틴 객체로 선언한다는 의미임을 알 수 있다. 평소 함수처럼 호출하면 실행되지 않고, 객체를 반환하는 것을 위 예제를 통해 확인할 수 있다.

즉, 코루틴은 asyncio 프로그램을 실행할 때 실행될 객체라고 생각하는게 편할 것 같다. 그리고 이런 코루틴이 실행되도록 예약을 해주는 것이 태스크이다. 예제를 통해 태스크를 만드는 것을 살펴보자

import asyncio

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

coro 라는 코루틴을 asyncio.create_task()를 통해 태스크로 만들어서 실행되는 것을 예약할 수 있다. 아직은 실행되는 방법도 모르는데, 이런 예약한다는 말이 뭔소리인가 싶을 텐데 나중에 차차 설명하겠다. 우선은 이정도로 넘어가자.

➕ awaitable 이란?

객체가 await 표현식에서 사용될 수 있을 때 어웨이터블 객체라고 말합니다. 많은 asyncio API는 어웨이터블을 받아들이도록 설계되어있다고 말한다. 어웨이터블 객체에는 세 가지 주요 유형이 있다: 코루틴, 태스크 및 퓨처(future)

쉽게 말해 await 사용가능한 객체들을 말하는 것 같다. 그럼 await는 뭘까?
awaitable 객체 안에서 await가 나오면 그 부분은 실행될 때까지 기다리고 다음 코루틴을 실행시킨다고 이해했다.

2. asyncio 프로그램 실행하기

이제 어떻게 실행 시키는 지를 살펴보자.

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

위 예제처럼 실행시키는 건 간단하다. 하나의 코루틴 객체 main()을 asyncio.run()을 통해 실행시킨다. 이 실행시키는 run()에 대해서 조금 더 자세히 알아보자. 우선 공식 문서에는 다음과 같이 적혀있다.

이 함수는 전달된 코루틴을 실행하고, asyncio 이벤트 루프와 비동기 제너레이터의 파이널리제이션과 스레드 풀 닫기를 관리합니다. 다른 asyncio 이벤트 루프가 같은 스레드에서 실행 중일 때, 이 함수를 호출할 수 없습니다. 이 함수는 항상 새 이벤트 루프를 만들고 끝에 이벤트 루프를 닫습니다. asyncio 프로그램의 메인 진입 지점으로 사용해야 하고, 이상적으로는 한 번만 호출해야 합니다.

눈에 띄는 부분만 보자면, 메인 진입 지점으로 사용해야 하고, 이상적으로는 한 번만 호출해야 한다는 것을 명심하자. 즉, 다른 코루틴들을 실행하는 main() 코루틴만을 실행시키면 될 것 같다.

3. 동시에 태스크 실행하기

실제로 쓸만하려면 동시에 여러 태스크를 실행시킬 수 있어야지 않겠는가? 그것을 위한 asyncio.gather() 함수를 예제를 통해 이제껏 읽은 정보들을 종합적으로 사용한 대략적인 흐름을 살펴보자.

아 잠깐, 일단 asyncio.gather() 함수에 대한 설명을 간단히 알아보자. 들어오는 인풋이 코루틴이면 자동으로 태스크로 예약을 시켜준다. 그리고 모든 awaitable이 성공적으로 완료되면, 결과는 반환된 값들이 합쳐진 리스트이다. 결과값의 순서는 awaitable의 순서와 일치한다.

즉, 코루틴을 인풋으로 넣어주면 그것을 알아서 태스크로 등록해준다는 것이다. 그리고 각 코루틴들이 전부 다 실행될 때까지 기다리고 전부 완료시 결과값을 리스트로 반환해주는 아주 기특한 method임을 알 수 있다.

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

일단 제일 먼저 asyncio.run를 통해 하나의 event loop를 만들고, main() 코루틴을 실행시킨다. main()안에 있는 asyncio.gather가 인풋으로 awaitable 코루틴인 factorial 들을 받는다. 그러면 자동으로 task로 schedule 되고 들어온 순서대로 실행된다. 이 과정을 내가 생각한데로 그려보면 아래 그림과 같다. (실제와 다를 수 있으니 틀리면 말해주세요..)

즉, 제일 처음 코루틴을 실행시키고 await를 만나면 그 코루틴의 상태를 (위 그림에서 waiting)변화시켜놓고 다음 코루틴을 실행시킨다. 계속 반복하다가, 끝에 다다르면 다시 처음으로 돌아가서 await가 아닌 상태 (위 그림에서 ready)인 코루틴이 있으면 다시 나머지를 실행시킨다. 그리고 그 코루틴이 return까지 실행되면 완료되었기에 값을 리스트에 넣어놓고, 전부 완료될 때까지 앞서 말한 과정들을 계속 반복한다.

이 예제에서는 await가 걸린 부분이 asyncio.sleep(1) 으로 되어있어서 감이 안 올 수 있다. 우리의 목표는 scraping의 속도를 올리기위해 이것을 사용해보려는데 어떻게 적용시킬까?

 

asyncio scraping에 적용시키기


조금만 생각해보면 간단할 수 있다. scraping을 할 때에 시간이 오래걸리는 이유는 request를 보내고 response를 받는 데까지의 대기시간이 있기 때문이다. 그 대기시간은 우리의 컴퓨터에서 cpu 연산과 같이 작업을 하느라 걸리는 시간이 아니기 때문에 외부 서버가 처리하고 응답을 줄 때까지 그냥 기다리는 것과 같다. 즉 asyncio.sleep()와 같다는 것이다. 이 부분을 request.get(url)로 바꾸면 어떨까? 우리가 원하는 데로 이루어지지 않을까??

그래서 간단하게 네이버와 다음 홈페이지 html을 받아오는 것을 해보려고한다.

1. resquests 모듈로 될까?

async def get_request_data(url):
    response = await requests.get(url)
    print(response)
    content = response.text
    return content

async def main(url_list):
    input_coroutines = [get_request_data(url_) for url_ in url_list]
    res = await asyncio.gather(*input_coroutines)
    return res

if __name__ == '__main__':
    url_list = ['https://www.naver.com/','https://www.daum.net/']
    res = asyncio.run(main(url_list))
    print(res)

하지만 실행시켜보면 바로 다음과 같은 error를 뱉는다.

File "asyncio_test_blog.py", line 15, in get_request_data
    response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression

내용을 보면 await를 사용할 수 없다고 하는데, asyncio를 사용할 때에는 requests 모듈은 지원?호환?되지 않는다고 어딘가에서 봤던거 같다.

(출처 남길랬는데 다시 찾으려니 못 찾겠음...)

그래도 일단 비교를 위해 네이버와 다음 페이지를 가져오는 시간을 측정해 놓자. request.get으로 두 페이지를 가져오는데 걸리는 시간은 총 0.15484380722045898 sec 정도 걸린다.

그럼 asyncio는 어떻게 해야하나..? asyncio와 찰떡인 라이브러리가 당근 존재한다. "aiohttp"를 이용하면 쉽게 가능하다 !

2. aiohttp 사용하기!

async def get_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.text()
            print(response)
            return content
async def main(url_list):
    input_coroutines = [get_data(url_) for url_ in url_list]
    res = await asyncio.gather(*input_coroutines)
    return res

if __name__ == '__main__':
    url_list = ['https://www.naver.com/','https://www.daum.net/']
    result = asyncio.run(main(url_list))

url들을 하나씩 꺼내서 reponse를 얻어오는 코루틴으로 만들어주고, 그것을 asyncio.gather의 인풋으로 넣어줘서 task로 만들어 모두 끝날 때까지 실행시킨 결과를 list로 받는다. 여기서 중요한 점은 순서가 응답이 빨리 온 순서라서 url 순서와는 다를 수 있다.

<ClientResponse(https://www.daum.net/) [200 OK]>
<ClientResponse(https://www.naver.com/) [200 OK]>

실행 결과 잘 작동한다! 심지어 걸린 시간은 9.5367431640625e-07 sec 정도 걸렸다. 0.0000009초정도,, 거의 뭐.. 0초에 가까운 수치이다. 이렇게 빠르게 request를 보내어 결과값을 받아올 수 있다는 것을 알게 되었다. 페이지를 가져오고 나머지 파싱하고 원하는 데이터를 가져오는 로직만 추가하면 빠르게 scraping이 가능할 것 같다!

 

마무리


asyncio를 통해 python 공식문서를 뒤져가면서 이 모듈에 대해 공부하다보니 이것들 뿐만아니라, 몰라서 못쓰고 있던 좋은 기능들이 많겠구나 라는 생각이 들었다. 지금까지는 매번 알던 것들로 기능구현하기에 급급했는데, 성능이나 효율성을 높이기 위해 투자할 시간과 노력은 더 더욱 필요한 것 같다. 시간과 노력이 많이 들더라도 인내심을 가지고 내가 만들었던 기능들을 성능을 향상시키는 고민과 행동들을 해보는 것이 내 성장에 많이 도움될 것 같음을 느꼈다.

아참, scarping에 asyncio를 이용해 시간을 단축시키는 것은 좋지만, 반대로 생각해보면 동일한 곳에서 짧은 시간동안 많은 request를 날리기 때문에 server 측에서 공격으로 인식하여 IP block이나 등등 제재가 가해질 수 있기 때문에 조심해서 사용하길 바란다 ㅎㅎ.. 사실 scraping 보다 다른 곳에 더 많이 사용되지 않을까 싶다..?

reference

'python > Scraping' 카테고리의 다른 글

[Python] beautifulsoup multiprocessing으로 속도 올리기  (1) 2021.03.31
2021-03-31 21:08:26

개요


scraping을 진행하면서, beautiful soup으로 하나씩 셀렉터로 접근해서 a_tag의 href를 가져와서 그곳에 들어가서 데이터를 가져오는것이 목표였다.

문제는 속도가 너무 느린것이다..... 약 10000번 정도를 진행하는데, 너무 오래걸려서 다른 방법 없을까 찾다가, scraping을 위한 scrapy, autoscraper 등 다른 라이브러리들을 찾아봤다. 하지만 이미 쓰는거에서 Multiprocessing을 이용해서 사용하는 방법이 있길래 이것을 이용해보기로했다.

추가로 동시성을 활용한 Asyncio scraping에 대해서 알고싶은 분은 참고 부탁드립니다!

2021.08.07 - [python/Scraping] - [Python] 동시성(Concurrency) Asyncio scraping

 

[Python] 동시성(Concurrency) Asyncio scraping

개요 내가 예전에 멀티 프로세싱(Multi processing)을 이용한 scraping 방법에 대해 포스팅을 한 적이 있다. (궁금하신 분들은 아래 링크 참조) 2021.03.31 - [python/Scraping] - [Python] beautifulsoup multipr..

codong.tistory.com

 

Task


가져온 데이터를 종류별(ex. 제목:내용)로 dictionary로 구분해서 담고싶었다. 우선 코드부터 확인해보자.

import requests
from bs4 import BeautifulSoup
from multiprocessing import Pool, Manager
from itertools import repeat

def crawl_data(word,dic):
    url=f'https://www.coupang.com/np/search?component=&q={word}&channel=user'
    response = requests.get(url)
    soup=BeautifulSoup(response.text,'html.parser')

    div_=soup.find_all('div',id="name")
    # selector 부분 생략 ...

    dic[title]=price

if __name__=='__main__':
    manager = Manager()
    result_dic = manager.dict()

    # request의 parameter 리스트
    search_list=['삼겹살','불고기','오리고기','돼지고기', ... ]

    pool = Pool(processes=4) #4개의 프로세스 동시에 작동

    # 각 프로세스별로 딕셔너리 적용 시키기 위해 repeat 사용.
    pool.starmap(crawl_data, zip(search_list,repeat(result_dic)))
    print(result_dic)

위와 같이 크롤링 할 검색어와 manager를 사용해서 dictionary를 만들어야 전역으로 사용이 된다고 한다.
게다가 repeat를 사용해서 인자로 주고, 함수와 zip으로 묶어서 pool.starmap의 인자로 넣어준다.

이렇게 딕셔너리로 담아서 json파일로 만들어서 사용할 수도 있다.

 

➕ 추가적인 예제( 2021/04/15 수정 )


다시 사용할 일이 있어서 pool을 이용한 추가 예제를 적어 본다. multiprocessing 은 특히 담으려는 데이터의 순서가 상관없을 때 사용하기 좋은 것 같다. 어떠한 데이터를 불러와서 순서 상관없이 리스트에 담기만 할 것이라면, 이것만 생각하자.

🙋 각 프로세서에게 해야할 일(실행함수) 알려주고, 필요한 재료(파라미터) 손에 쥐어주기!

코드부터 보는게 낫겠지? 예를 들어 문장 데이터를 가져와 명사만 추출해서 리스트에 담는다고 가정해보자.

from multiprocessing import Pool, Manager
from konlpy.tag import Okt


def split_str(str,okt,result_list):
    nouns=okt.nouns(str)
    result_list.append(nouns)

if __name__=='__main__':
      corpus=[                    # 문장 100개가 들어있다 가정하자.
      ['안녕하세요 파이썬 너무 어려워요']
      ['에이 무슨 소리에요']
      ['파이썬이 얼마나 쉬운데']
      ['사실 진짜 어려움']
      ...
      ['그래도 포기하지 말고 해보자']
      ] 
    okt=Okt()                    # Okt 전처리 라이브러리 객체 생성

    process = multiprocessing.cpu_count())     # 프로세서 개수 확인

    m = Manager()                # 데이터 공유를 위한 Manager 객체 생성
    result=m.list()            # 결과 리스트 생성

    pool = Pool(process)        # default 값은 4. 4가 적당한 거 같다.

    # 가장 중요한 실행 부분. starmap의 첫번째 인자로 실행 함수를 주고, 
    # 두번째 인자로 실행 함수의 파라미터를 리스트에서 하나씩 꺼내 튜플로 담아서 하나씩 넘긴다 
    pool.starmap(split_str, [(str, okt, result) for str in corpus])

    pool.close()                # 끝나면 닫아주고
    pool.join()                # 다른 프로세서들이 끝날 때까지 기다린다.

    print(result)                # 결과 확인

 

마무리


가뜩에나 시간이 부족한 우리에게 딱 필요한 기술이지 않나 싶다. 컴퓨터가 신나게 일하도록 해주자~~ 깊게 공부하진 못해서 아쉽긴한데... 언젠가 다시 알게될 날이 있지 않을까..? 그때그때 수정하자 😄

 

reference

'python > Scraping' 카테고리의 다른 글

[Python] 동시성(Concurrency) Asyncio scraping  (0) 2021.08.07