테키테크 TEKITECH

Apache Airflow 이해하기 본문

Tech/Workflow Scheduling

Apache Airflow 이해하기

TEKI 2021. 8. 28. 16:17

목차


  • Managing Data Workflow
    • Data Warehouse
    • 데이터 ETL
  • About Apache Airflow
    • Airflow Concepts
      • DAG (Directed Acyclic Graph): 방향성 비순환 그래프
      • Hook, Operator, and Task
      • 그 외
    • Airflow Platforms
      • Python으로 작성된 오픈소스 플랫폼
      • 모듈화와 디버깅
      • 사용자 인터페이스 지원 및 로깅, 모니터링, 알림 기능
      • Batch 프로세스
      • 하나의 관리자
      • Execution Date

 

Managing Data Workflow

Date Warehouse를 구축/운영하는 과정에서 데이터 ETL과 같은 데이터의 Workflow를 실행하려면 Cron과 같은 Job Scheduling Tool이 필요하다. 보다 효율적인 Data Workflow Management를 위해서 Apache Airflow나 Oozie와 같은 툴이 사용되고 있다.

 

Data Warehouse는 네 가지 특성을 가진 데이터 창고이다.

  1. 주제 지향성 (Subject-Orientation) : 데이터를 주제별로 구성하여 누구나 이해하기 쉬운 형태로 저장한다.
  2. 통합성 (Integration) : (변수 명, 코드화 구조 등이) 일관적인 형태로 저장된다.
  3. 시계열성 (Time-Variancy) : 한 번 적재된 데이터는 일정 기간 동안 정확성을 나타낸다.
  4. 비휘발성 (Nonvolatilization) : 한 번 적재된 데이터는 batch작업 이외의 방법으로 변경되지 않는다.

 

데이터 ETL은 데이터를 추출, 변환, 적재하는 작업을 말한다.

  • E (Extract) : 데이터가 저장된 데이블로부터 데이터를 선택적으로 추출
  • T (Transform) : 추출한 데이터를 필요한 형태로 변환
  • L (Load) : 필요한 형태로 가공한 데이터를 새로운 테이블에 적재

 

 

 

Airflow Concepts

 

1. DAG (Directed Acyclic Graph): 방향성 비순환 그래프

DAG는 개별 요소들이 일정한 흐름에 따라 순환하지 않는 구조로 짜인 그래프를 뜻한다.
화살표의 방향성을 사용해 요소의 흐름을 그래프로 표현할 수 있고, 전 단계가 실행되어야 다음 단계가 실행되는 특성(종속성)을 가지고 있다. 서브웨이에서 샌드위치를 주문할 때, 빵을 준비하기 전에 샌드위치를 구울 수 없는 것처럼 말이다.

🥪How to order your Subway sandwich

 

2. Hook, Operator, and Task

Hook는 Hive, MySQL, HDFS, GCP 등 외부 플랫폼과 데이터베이스에 대한 인터페이스이다.
Connection을 설정한 이후, 외부 플랫폼에 통신을 시도할 때 사용한다.
Airflow에서 ETL작업을 하려면 Hook을 가장 먼저 설정해야 한다.

Operator는 OOP에서 사용되는 Class의 개념과 비슷하다.
이때, OOP의 인스턴스에 해당하는 것이 Task이다.
자주 사용되는 Task에 대해 사전 정의된 Operator가 있다.

더보기

사전 정의된 Operator

  • Python Operator
  • Bash Operator
  • BigQuery Operator
  • Dataflow Operator
  • ....

 

Operator의 일반적인 형태

class AnAirflowOperator(BaseOperator):
	template_fields = ('param1', 'param2')
	ui_color = '#A6E6A6'
		
	@apply_defaults
	def __init__(
		self,
		param1,
		param2,
		*args, **kwargs):

		super(AirflowOperator, self).__init__(*args, **kwargs)
		self.param1 = param1
		self.param2 = param2
	
	def execute(self, context):
		# the task what we want to execute

 

GCS에서 BigQuery로 데이터를 전송할 때 Operator 사용 예시

gcs_to_bq =
GoogleCloudStorageToBigQueryOperator(
	task_id = 'gcs_to_bq',
	bucket = 'my_bucket',
	source_objects = ['/path/of/source/object.json'],
	destination_project_dataset_table = 'dataset_table',
	schema = transfer_schema, #Hook
	write_disposition = 'WRITE_TRUNCATE',
	skip_leading_rows = 1,
	google_cloud_storage_conn_id = GCS_CONN_ID,
	bigquery_conn_id = BQ_CONN_ID,
	dag = dag # DAG
)

 

Task는 DAG의 한 스텝으로 정의한다. 다시 말해 실제로 실행하게 되는 하나의 작업을 의미하는 단위이다.
각 Task는 필요한 파라미터와 함께 DAG를 정의하는 Python 파일에 저장된다.
하나의 DAG 파일에는 여러 개의 Task가 존재할 수 있다 (코드에 정의된 순서와 DAG의 수행 순서는 무관하다).

 

3. 그 외

Airflow의 핵심 요소인 DAG와 Hook, Operator, Task 외에 Workflow를 관리할 수 있는 파라미터 형태의 데이터가 있다.

더보기
  • Xcoms
    : Task의 return값을 다른 Task에서 input으로 사용할 목적으로 데이터를 저장하는 저장소
  • Airflow Varialble
    : Airflow DB에 저장되는 전역 변수
  • Python Variable
    : Python 변수
  • Environment Variable
    : 일반 환경 변수
  • ...

 

 

Airflow Platforms

Airflow는 Python으로 만들어진 DAG로 Workflow를 스케쥴링하고 실행 및 모니터링하는 도구이다.
CLI로만 사용 가능한 Cron과 비교하였을 때, UI를 지원한다는 점에서 사용성이 좋고, Task 단위로 구성하기 때문에 모듈화가 쉽다는 장점이 있다. 또한, 여러 서버와 DB에서 작업을 해야 하는 경우에도 하나의 DAG 안에서 관리할 수 있어 편리하다.

 

1. Python으로 작성된 오픈소스 플랫폼

Airflow는 DAG, Hook, Operator 등 모두 Python으로 사용 가능하다.

 

2. 모듈화와 디버깅

DAG의 구조를 사용하기 때문에 Task가 순차적으로 진행되며, Task를 분야 별로 나누어 관리할 수 있다.
즉, 각 Task별로 테스트가 가능하고, 한 Workflow 내에서도 필요에 따라 나누어 관리하는 것이 가능하다.
또한, 서로 다른 DAG 사이에서 'ExternalTaskSensor'를 사용하여 종속 관계를 형성할 수 있다.

Three workers can manage different types of tasks

 

3. 사용자 인터페이스 지원 및 로깅, 모니터링, 알림 기능

기본적으로 모든 실행에 대해 로깅, 모니터링, 알림 기능을 제공한다.
또한, CLI와 UI를 모두 지원하여 스케쥴링, 실행, 및 모니터링이 쉽다.
특히, DAG를 트리(Tree View), 그래프(Graph View), 간트 차트(Gantt View), 소스 코드(Code View) 등 다양한 형태로 시각화하여 보여주고, Task 별 실행 시간과 속도, 상태 등을 그래프로 보여주는 등 다양한 모니터링 방식을 지원한다.

 

4. Batch 프로세스

일련의 Task들을 Cron 스케쥴링으로 관리하여 Batch 프로세스 목적으로 사용하기에 적합하다.

 

5. 하나의 관리자

여러 서버와 DB에서 작업을 해야 하는 경우에도 하나의 DAG 안에서 관리할 수 있고, 모든 데이터가 기록된 데이터베이스는 UI에서 확인이 가능하다.

Airflow Scheduler는 각 Task를 실행할 조건을 데이터베이스에서 확인한다.
DAG의 순서에 따라 앞의 Task를 실행하면, 작업이 성공적으로 종료되었는지, 실행될 순서가 맞는지, 어떤 실행 조건이 만족되었는지, 실행 결과가 무엇인지 등 실행한 결과를 데이터베이스에 저장한다. 그리고 이 데이터를 기반으로 다음에 실행할 Task를 판단한다. 즉, 각 Task를 수행한 모든 로그는 데이터베이스에 기록된다.

Airflow Architecture by Kyle

 

6. Execution Date

정해진 스케줄에 따라 지정된 시간에 실행되는 Cron Job과는 다르게 Airflow의 스케줄에는 '순서'의 개념이 포함된다.

매일 오전 1시에 데이터를 추출하고 매일 오후 1시에 데이터를 적재하는 스케줄을 예로 들어보자.
만약 오전 1시에 데이터 추출이 지연되어 오후 1시에 예정된 데이터 적재가 실패한 경우,
Cron은 다음 날 오전 1시에 다시 데이터 추출을, 오후 1시에 데이터 적재를 시도할 것이다.
반면 Airflow에서 데이터 추출 Task가 완료된 후 데이터 적재 Task를 진행하도록 조건이 있었다면, 추출 Task가 완료될 때까지 기다린 후 적재 Task를 시도할 것이다.

반응형
Comments