지난 포스팅 - 산타 백준 프로젝트 (1) - 데이터 정의와 스크래핑
저번 시간에는 어떤 데이터를 사용할 것인지 어떤 방식으로 스크래핑을 할 것인지에 대해 이야기해보았다. 이번 시간부터는 본격적으로 Airflow를 이용하여 분산 크롤링 작업을 해볼 것이다. Airflow 설치부터 시작해서 기본적인 설정과 DAG 작성법까지 종합적으로 정리할 것이다.
그러면 Airflow 설치와 환경 설정부터 시작해보자!
🔧Airflow 설치와 설정
아키텍처를 보면 알 수 있다시피 나는 마스터 노드를 따로 두고 부스트캠프 측에서 제공해주는 서버들을 워커 노드로 두고 있다. 따라서 먼저 GCP를 이용하여 마스터 노드를 구성했던 과정을 살펴보려고 한다.
그런데 이 부분도 사실 GCP에 Airlfow를 구성하는 과정이 잘 나와있는 블로그가 있어서 나도 해당 블로그를 참고했다. 그래서 내가 굳이 그 과정을 한 번 더 쓰는 것보다 링크를 다는게 나을 거 같아 참고 부분에 링크를 달아놓았다.
마스터 노드에 Airflow를 설치했다면, 나머지 워커 노드에도 똑같이 설치해주면 된다. 그대로 반복해주면 되기 때문에 어려운 부분은 없었다.
워커 노드와 마스터 노드를 연결해줄 때 가장 고생했던 부분이 방화벽을 열어주는 부분이었다. 계속 까먹기도 하고 GCP에서 작업하는게 이번에 처음이라 조금 낯설기도 하였다. 특히 내가 계속 까먹었던 부분은 DB의 방화벽 개방이었다. 해당 블로그에도 나와있지만 워커 노드도 결국 같은 airflow metadata DB를 공유하고 있기 때문에 해당 워커 노드들에 대한 IP도 꼭 허용해주어야 한다.
이 부분만 잘 설정하면 Airflow를 설치하고 설정하는 부분은 어렵지 않게 끝이 난다. 이제 각 노드를 연결해주는 작업을 할 차례이다.
🥬Celery를 이용한 분산 크롤링
처음에 필요한 데이터를 크롤링 할 때는 팀원들과 범위를 나누어서 수집한 후 이를 취합하는 방식으로 진행하였다. 초기 데이터는 이런 식으로 모을 수 있지만 후에 주기적인 재학습을 위한 데이터를 이런 식으로 수집하는 것은 매우매우 비효율적이었다. 그래서 Airflow를 이용해 여러 서버에서 주기적으로 크롤링을 하기로 하였다.
Celery를 이용하면 기대할 수 있는 부분은 크게 2가지이다. 첫번째로 단일 컴퓨터를 이용하여 작업을 수행할 때보다 훨씬 빠르게 작업을 완료할 수 있다. 이번 크롤링의 경우도 단일 노드에서는 약 3~4시간까지 걸리는 작업을 3대의 컴퓨터로 나누어 작업하여 약 40분 정도만에 모든 작업을 마무리할 수 있었다.
두번째는 SPOF(단일 고장점)을 방지할 수 있다는 것이다. 단일 장애점(single point of failure, SPOF)은 시스템 구성 요소 중에서, 동작하지 않으면 전체 시스템이 중단되는 요소를 말한다. 여기서는 크롤링 코드가 동작하는 서버가 될 수 있고, 만약 해당 서버가 고장난다면 데이터 수집과 재학습 모두에게 영향이 간다.
Celery를 이용하면 큐를 이용하여 특정 노드가 고장나더라도 다른 노드로 태스크를 이동시켜 작업할 수 있어 장애에 유연하게 대처할 수 있다는 장점이 있다.
Airflow에는 분산 작업을 위한 Executor가 여러 가지 있다. 그 중 여러 서버에서 Task를 분산으로 처리하기 위한 방법에 CeleryExcutor와 KubernetesExcutor가 있다. 마음 같아서는 이번에 쿠버네티스를 제대로 사용해보고 싶어서 KubernetesExcutor를 사용하고 싶었지만, 주최측에서 제공한 서버가 컨테이너 형태여서 사용하지 못했다...😂
그래서 어쩔 수 없이 CeleryExecutor를 사용하기로 하였다! 앞서 대부분의 설정을 해놓았기 때문에 Celery를 사용하는데는 그렇게 큰 어려움은 없다. celery를 사용하기 위한 몇 가지 툴과 설정들만 변경해주면 된다.
RabbitMQ와 Celery 설치
Airflow에서는 기본적으로 메타DB로 SQLite를 사용한다. 기본적인 기능들을 사용하는데는 별로 문제가 없지만 SQLite에서는 병렬 처리가 불가능하다. 우리는 앞서서 PostgreSQL로 DB를 설정했기 때문에 괜찮다.
또 CeleryExecutor를 사용하기 위해서는 기본적으로 메시지 브로커(Message broker)라는 것이 필요하다. 메시지 브로커는 송신자가 메시지 큐(Message Queue)에 메시지를 전달하면 이를 수신자가 감지하여 메시지를 읽고 사용하는 구조를 가진다.
Celery를 이용하려면 이러한 메시지 브로커가 필요하고 이번 프로젝트에서 나는 RabbitMQ를 사용하였다. 이에 대한 설치법은 이번에도 아래 링크를 참고하면 될 거 같다.
본격적인 분산 크롤링 DAG 코드 작성
분산 작업을 위한 모든 설정이 끝났다면 이제 본격적으로 DAG를 작성할 차례이다. 총 3개의 DAG를 작성했는데 여기서는 하나만 소개하려고 한다. 유저 데이터 수집을 위한 get_user.py 부분인데, 우선 DAG 그래프부터 한 번 살펴보도록 하자!!
그래프를 보면 알 수 있듯이 3개의 서버에 크롤링을 분산 처리하고 있다. 전체적인 큰 과정은 사이트에서 데이터를 크롤링하고, 각 서버에서 크롤링한 데이터를 마스터 서버로 옮겨서 전처리를 한 다음 GCS, Bigquery로 옮긴다. Task 하나하나 살펴보도록 하자.
가장 처음은 is_solved_ac_api_available으로 solved ac 사이트의 api가 이용가능한지 확인하는 태스크이다. Httpsensor를 이용하였다. 이 부분은 어렵지 않아 별다른 설명없이 넘어가도록 한다.
is_solved_ac_user_api_available = HttpSensor(
task_id="is_solved_ac_user_api_available",
http_conn_id="solved_ac_user_api_id",
endpoint="ranking/tier",
response_check=lambda response: response.status_code == 200,
poke_interval=5,
timeout=20
)
그 다음은 본격적으로 3개의 서버에서 병렬로 크롤링 작업을 하도록 트리거하는 태스크들이다. 여기서 Airflow의 Variable을 사용하는데, 노드의 개수가 유동적일 수 있다는 가정하에 Variable에 worker_node_nums라는 변수를 등록하고, 크롤링에 사용하는 노드 개수를 지정해준다. 그러면 해당 노드 개수만큼 서버에서 크롤링을 한다.
worker_node_nums = Variable.get("worker_node_nums")
for node in range(1, int(worker_node_nums)+1):
trigger_get_data = PythonOperator(
task_id=f"trigger_get_data_{node}",
python_callable=get_problem_list,
op_kwargs={"worker_num": worker_node_nums, "node": node},
queue=f"worker_{node}"
)
check_user_exists = SFTPSensor(
task_id=f"check_user_exists_{node}",
sftp_conn_id=f"ssh_id_{node}",
path=f"/opt/ml/data/user/user_{node}.csv",
poke_interval=60,
timeout=2000,
queue=f"worker_{node}"
)
mv_user_file_to_master = SFTPOperator(
task_id=f"mv_user_file_to_master_{node}",
ssh_conn_id=f"ssh_id_{node}",
local_filepath=f"/home/kgw7401/user/user_{node}.csv",
remote_filepath=f"/opt/ml/data/user/user_{node}.csv",
operation="get",
queue=f"worker_{node}"
)
여기서 내가 조금 고생했던 부분은 connect id를 설정하는 부분이었다. 아래와 같이 각 서버마다 ssh connection id를 설정하고 각각은 아래와 같이 설정하면 된다.
각 서버별로 ssh connect id를 모두 설정하였다면 이를 이용해서 크롤링이 완성된 파일을 체크하고, 마스터 노드로 옮기는 태스크도 작성하면 분산 데이터 크롤링 부분은 마무리되었다.(해당 코드는 위의 코드블럭에 다 있다)
다음은 마스터 노드로 옮긴 파일들의 전처리(파일 통합, 필요한 열 추가 및 수정)를 하여야 하는데, 이 과정도 그렇게 중요하지는 않으니 간단하게만 보도록 하겠다. PythonOperator를 이용하였고, 따로 add_problem_list_and_save라는 메소드를 만들어 해당 작업을 실행하도록 하였다.
add_problem_list_and_save = PythonOperator(
task_id="add_problem_list_and_save",
python_callable=add_problem_list_and_save,
queue="master_node"
)
지금부터는 완성된 파일을 GCS, Bigquery로 옮기는 과정을 살펴볼 것이다. 그러기 위해서는 일단 GCS 버킷과 빅쿼리 테이블을 생성해주어야 한다. GCS 버킷은 스토리지에서 버킷 만들기로 생성해주면 된다. 버킷 안에는 폴더를 생성할 수도 있는데, 나는 버킷에 사용할 테이블별로 폴더를 만들어주었다.
빅쿼리 테이블도 미리 만들어두는데, 빅쿼리에서는 테이블을 만들기 전에 먼저 데이터세트를 만들어주어야 한다. GCS에서 설정한 버킷의 폴더와 동일하게 데이터세트와 테이블들을 만들어주었다,
GCS와 빅쿼리 생성이 모두 끝났다면 이제 본격적으로 DAG의 Task를 작성해보도록 하자. 위에서 보았던 전체 흐름을 전처리 이후부터 살펴보면, 이제 완성된 데이터를 로컬에서 GCS로 옮기고 데이터가 잘 들어갔는지 체크하고 이를 다시 빅쿼리로 옮긴다. 긴 말 말고 바로 코드로 살펴보자.
각 작업에 해당하는 Operator들이 Airflow에 이미 만들어져 있기 때문에 어렵지 않게 사용할 수 있다. 각 Operator들의 인자들 또한 변수명이 나름 직관적이기 때문에 어렵지 않게 사용할 수 있다.
먼저 mv_user_local_to_gcs는 로컬 파일 GCS로 옮기기 위한 Task이다. dst(목적지)를 보면 알 수 있듯이 버킷에 저장될 때 airflow의 templates를 이용해서 <해당 task가 실행된 날짜-user.json>으로 저장된다. 이렇게 저장하도록 한 이유는 나중에 혹시 해당 날짜에 대한 분석이 있을 수 있기 때문에 날짜별로 데이터를 다운받아 사용할 수 있도록 하기 위함이다. gcp_conn_id의 conn_id를 통해 airflow내에서 gcp와 연동할 수 있는데, 이를 생성하는 방법은 아래에서 자세히 다루도록 하겠다.
check_gcs_user_exists는 로컬 파일이 GCS에 잘 들어갔는지 확인하기 위한 과정이다.
mv_gcs_to_bq는 가장 중요한 항목이라고 할 수 있다. GCS에 위치한 파일을 빅쿼리로 옮기는 역할을 한다. 빅쿼리의 좋은 점은 스키마를 자동으로 인식하여 테이블을 구성해준다는 것이다. 여기서 주목해야 할 인자가 2개 있는데, skip_leading_rows와 write_disposition이다.
skip_leading_rows는 몇 개의 행을 무시하고 저장할 것인지 지정하는 인자이다. 여기서는 첫번째 행이 제목이기 때문에 1로 지정해 제외하도록 하였다. write_disposition는 새로운 데이터가 들어왔을 때 추가할 것인지, 덮어쓸 것인지를 지정할 수 있다. user 테이블 같은 경우는 새로운 데이터가 추가되기 보다는 같은 데이터가 변경되는 구조이기 때문에 APPEND가 아닌 TRUNCATE를 통해 데이터가 새로 들어올 때마다 덮어쓰도록 하였다.
mv_user_local_to_gcs = LocalFilesystemToGCSOperator(
task_id="mv_user_local_to_gcs",
gcp_conn_id="google_cloud_conn_id",
src=os.path.join(os.getcwd(), "user", "new_user.json"),
dst="user/{{ ds }}-user.json",
bucket="santa-boj-final",
queue="master_node"
)
check_gcs_user_exists = GCSObjectExistenceSensor(
task_id="check_gcs_user_exists",
bucket="santa-boj-final",
object="user/{{ ds }}-user.json",
google_cloud_conn_id ="google_cloud_conn_id",
queue="master_node"
)
mv_gcs_to_bq = GCSToBigQueryOperator(
task_id="mv_gcs_to_bq",
bucket="santa-boj-final",
source_objects=['user/{{ ds }}-user.json'],
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table="santa-boj.dataset.user",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
gcp_conn_id="google_cloud_conn_id",
queue="master_node"
)
해당 Operator들은 기본적으로 GCP와 연관된 작업을 행하기 때문에 Airflow와 GCP를 연결하는 connect id를 생성해주어야 한다. connect id를 생성하는 과정에서 조금 어려움을 겪었는데, 이를 조금 상세하게 기술해보려고 한다. 우선 gcp와의 작업을 위해 airflow에 google cloud 관련 provider를 설치해야 한다. 이는 pip install apache-airflow-providers-google로 설치할 수 있다.
이제 가장 중요한 Airflow GCP 연동을 위한 키파일을 생성해주어야 한다. GCP에서 IAM 및 관리자 -> 서비스 계정으로 들어간다. 그리고 새로운 서비스 계정을 만들면 된다. airflow용 서비스 계정을 생성하는 방법은 아래의 그림과 같다. (그림을 나란히 배열하니 조금 깨져서 보인다...😂 클릭으로 확대해서 보면 잘 나온다!)
서비스 계정까지 만들었다면 이제 해당 서비스 계정에서 지정한 권한을 이용하기 위해 키를 발급받아야 한다. 키는 아래와 같이 생성한 서비스 계정의 키로 들어가서 새 키 만들기를 통해 만들 수 있다.
이제 Airflow내에서 GCP를 사용할 모든 준비를 마쳤다. 우선 발급받은 키를 서버에 원하는 곳에 위치시킨다. 그리고 연동을 위한 Connection을 작성해보도록 하자. Keyfile Path에는 키가 위치한 경로를 적어준다. 그리고 그 다음 중요한 부분은 Scopes인데, 이는 구글 API에 엑세스하기 위해 요청해야 할 OAuth 2.0 범위이다. 우리는 https://www.googleapis.com/auth/cloud-platform을 입력해주면 된다. 이렇게 GCP를 사용하기 위한 준비가 끝났다.
이제 DAG의 마지막 단계이다. 위의 과정이 끝나면 로컬에 있는 파일을 삭제하고, 작업이 성공적으로 마무리되었다는 메시지를 보내야 한다. 파일을 삭제하는 task는 BashOperator로 손쉽게 할 수 있다.
handle_local_user_data = BashOperator(
task_id="handle_local_user_data",
bash_command="""
rm -rf /home/kgw7401/user/user*.csv /home/kgw7401/user/old_user.csv /home/kgw7401/user/new_user.json
mv /home/kgw7401/user/new_user.csv /home/kgw7401/user/old_user.csv
""",
queue="master_node"
)
DAG의 완료를 알리는 메시지는 이용하고 있는 Slack을 이용하기로 했다. Slack에 메시지를 보내기 위해서는 Slack API를 사용해야 하는데, https://api.slack.com/에서 어플리케이션을 생성하고 토큰을 발급받아 이를 이용하면 된다. 자세한 내용은 다른 블로그에도 자세하게 나와있어 생략하도록 하겠다.
send_slack_notification = SlackWebhookOperator(
task_id="send_slack_notification",
http_conn_id="slack_conn_id",
message=get_message(),
channel="#level2-recsys-05-알잘딱깔센-캠퍼만",
queue="master_node"
)
이렇게 DAG를 완성하였다. 다른 DAG 또한 궁금하다면 아래의 github를 참고하면 모든 코드가 나와있다!
https://github.com/kgw7401/final-project-level3-recsys-05
❓다음 포스팅에서는?
모든 과정을 일일이 설명하느라 이번 글은 조금 길었다. 조금 더 자세하게 설명했으면 좋았겠지만 뭔가 얼른 마무리를 짓고 싶은 마음에 조금 아쉬운 부분도 있지만, 그 부분은 나중에 차차 고쳐나가는 걸로 하자...😅
Airflow로 DAG를 작성하는 과정을 살펴보았으니 다음 포스팅에서는 스케줄링을 통해 BigQuery에 적재된 데이터를 이용하는 방법이나, 효율적인 적재를 위한 포맷 변환 등에 대해서 살펴보려고 한다.
참고
[GCP] #07 GCP Compute Engine VM에 아파치 에어플로우(Airflow) 설치하여 ETL 환경 구축하기
[airflow] 4. CeleryExecutor 사용하기
'프로젝트 > 산타 백준 프로젝트' 카테고리의 다른 글
산타 백준 프로젝트 (1) - 데이터 정의와 스크래핑 (0) | 2022.06.11 |
---|---|
산타 백준 프로젝트 (0) - 프로젝트 개요 (0) | 2022.06.09 |