Data Engineering Background
- 데이터 팀의 역할 소개
- 데이터 엔지니어링
- 데이터 엔지니어의 일주일
- Data Warehouse Deep Dive
- Redshift Introduction
- How to Access Redshift
- SQL for Data Engineer
- Airflow
- Docker 설치
- Airflow 설치
- 기타
- Airflow Deepdive
- Productionizing Airflow
- Link
데이터 팀의 역할 소개
- 데이터 유무 > 데이터 지식
데이터 조직의 목표
- Data Flow
- Data Warehouse - 한 곳으로 모아 저장해야 편의성이 높아진다.
- Business Insights
- Site Services - Scalalble Product Improvement
- 예시
- Build Leverage for the company through Trustworthy Data - Airbnb
- 부수적인 뉘앙스
- 데이터팀의 공헌도 설명이 중요한 포인트
- Providing higth quality data in timely fashion so that(애널리스트)
- data informed decisions can be made
- 데이터 기반과는 다른 데이터를 참고하겠다는 뉘앙스
- 데이터 기반의 시기적 특성은 미래보다 과거부터 현재까지이므로 기반을 삼아서는 방향을 바꾸기는 어렵고 참고로 삼아야 더 나은 결정의 가능성 생긴다.
- data informed decisions can be made
- Providing higth quality data in timely fashion so that(사이언티스트)
- scalabe product improvement(personalization) by algorithm
- But don't undervalue hubam-in-the-loop
- scalabe product improvement(personalization) by algorithm
- Build Leverage for the company through Trustworthy Data - Airbnb
데이터 조직 직군별 역할
- Data Scientest(Product Science)
- 엔드유저를 대상으로 하므로 External 성격
- 워터폴보다 애자일이 효율적
- 데이터 모으기 - 분석 - 알고리즘 업데이트 - 알고리즘 개발(업데이트에서 개발은 자동화) - 실험
- 알고리즘을 통해 고격경험의 만족도 높여
- 예측 모델을 만들어 제품 퀄리티를 높인다
- 실용성과 인내심 필요
- 스킬
- 머신러닝에 관한 깊은 지식과 경험
- Python/Spark 코딩
- SQL/Hive
- R/SAS/Matlab (수학, 통계학)
- Data Analyse(Decision Science)
- 내부 오디언스를 대상으로 하므로 Internal 성격
- 비즈니스 인텔리전스의 책임
- key metrics 정의, 대시보드 생성
- 내부 직원들의 질문 답변
- 내부 질문이 많이 들어온다.
- 스킬
- 비즈니스 도메인 지식
- SQL
- Data Engineer
- 데이터 웨어하우스(vs. 데이터 레이크)
- 데이터베이스 매니징 - RedShift, BigQuery, Snowflake
- 데이터 파이프라인 매니징 (ETL)
- 3rd party SaaS ETL - FiveTran, StitchData, Segment
- A/B Test
- Data Tools
- 데이터 웨어하우스(vs. 데이터 레이크)
데이터 팀 역할
- 초기 스타트업
- 마케팅이 가장 중요
- 마케팅 채널 어트리뷰션과 코호트 분석
- 비용 차이 크다
- 마케팅 채널 어트리뷰션과 코호트 분석
- 액티브 유저 측정
- 액티브 유저 정의가 중요
- 마케팅이 가장 중요
- 성장하는 스타트업
- 키 Metrics 정의
- AAA(Actionable, Accessible, Auditable)
- 데이터 인프라에 적절한 투자 시기
- 키 Metrics 정의
데이터 조직 구조
- 중앙 조직
- 소통이 어렵다
- 도메인 지식 습득이 어렵다
- 데이터 팀 커리어에 도움
- 분산 조직
- 데이터 조직의 사람이 2등시민이 될 수 있다
- 하이브리드
- 2년에 한 번 조직 개편(에어비앤비)
경험에서 얻은 교훈
- Data Should Drive Business Results
- Infrastructure (and best practices) First
- Quality of Data is Important
- Metrics First
- The Simpler a Solution is The Better
데이터 엔지니어링
데이터 웨어하우스 관리
- Redshift vs BigQuery, Snowflake?
- snowflake, bigquery는 스토리지와 컴퓨테이션 분리해서 업그레이드 가능
- 예) 매우 큰 조인 쿼리 - 더 갖다 붙이면서 돈 많이 낸다. 쿼리 하나에 만불 나왔다
- 매일 아침 비싼 쿼리 쓴 사람 top 10을 공개한다.
- 당장 돈 많이 쓰는 것처럼 보일 수도 있지만 인프라가 커질수록 당연히 더 효율적이다.
- 레드시프트는 스토리지 컴퓨테이션 같이 해야
- Fixed cost
- 예) 매우 큰 조인 쿼리 - 그냥 서버가 죽는다.
- 직원 200명 수준까지는 레드시프트가 나을 수도 있다.
- snowflake, bigquery는 스토리지와 컴퓨테이션 분리해서 업그레이드 가능
- 데이터 웨어하우스 구조
- 프로덕션
- raw_data, analytics, adhoc, archive
- 프로덕션
Data Pipelines
- Copying data from sources to a destination
- The destination is Data Warehouse
- Sometimes data transformation is required
- Data Pipelines == ETL (Extract, Transform and Load) == Data Jobs
- In Airflow, this is called DAG
- Maintenance is a lot of work
- Handling failures and backfilling!
- How to detect data schema and semantic changes as early as possible is important
- Scheduling
- By time and by data availability
- Job Dependency
데이터 분석가와 과학자 지원
- 서머리 테이블 만들기
- 로우 데이터가 수많은 트러블의 원인이 될 수 있다.
- Being consistent is more important being correct
- 앤지니어 없이 분석가가 서머리 테이블 만들 수 있도록 지원
- 과학자가 배포 모델을 빌드할 수 있도록 지원
트렌드
- 클라우드 데이터 웨어하우스
- 배치가 아닌 스트리밍, 실시간
- 카프카 or Kinesis
- SaaS 기반 ETL
- Data DevOps
데이터 엔지니어의 일주일
데이터 팀원 백그라운드
- 물리학 박사, 생물학 박사 - 물리학자 대부분 코드를 짜서 실험한다. 데이터사이언티스트로 매우 적합하다.
- 93년생 경영학 전공 수구선수, 디자이너, 경제학, 공연
- 서포트팀에서 적극적으로 Mysql 코스를 만들기까지
- 포인트 - 변화를 두려워하지 말아야 한다.
월요일
- 스프린트(JIRA)
- Sprint 데모 미팅( 말보다 데모로 ) 30분
- Sprint 회고 미팅( 화이트 보드 3섹션) 30분
- 미팅 주도 돌아가면서
- What went well
- What could have gone better
- Any discussion points
- Sprint 플래닝 미팅 30분
- 40%는 인프로 코드 리팩토링에 사용
- 미팅 제외 하루 5시간 일한다고 가정하고 플래닝
- On-Call 엔지니어 새로 지정
- 다음 일주일간(주말 포함) 모든 data pipeline 실패와 관련한 이슈 해결
- 데이터 파이프라인 실패율을 Key metriecs로 관리
- 서머리 테이블에 집중
화요일
- Daily standup
- What did you do in the past 24 hours?
- What do you plan to do in the next 24 hours?
- Any blockers/issues to discuss?
- 여러 사람들과 일대일 미팅들
- 내부 팀원들과의 미팅
- DE <-> DA
- DE <-> DS
- DA <-> DS
- 다른 팀과의 sync-up 미팅
- 내부 팀원들과의 미팅
- 데이터 파이프라인 개발과 최적화
수요일/목요일
- Daily standup (Slack virtual standup)
- 데이터 파이프라인 개발과 최적화
- 미팅
금요일
- Daily standup
- 데이터 파이프라인 개발과 최적화
- 주간 스태프 미팅
- Metrics & Quarterly Goals 리뷰
- Data Warehouse capacity 리뷰
- Percentage of Core Data Pipeline Failure
- SLA of Core Data Pipeline Failure
- Data Warehouse capacity 리뷰
- Recruiting & Retention
- 주간 Incident 리뷰 - Postmortem 미팅
- 메인 프로젝트 리뷰
- External projects
- Internal projects
- 팀/개인 업데이트
- Metrics & Quarterly Goals 리뷰
데이터 엔지니어링 == 노가다?
- Keep data pipline running with high quality data
- Integration with Slack/Emails for failure
- Input and output validation!!!
- SLA(service level agreement) with different stakeholders
- Marketing team
- Finance team
- Noticing semantic change is a lot harder than data pipeline failures
- Backfilling becomes very important
- 백필 - 데이터파이프라인 실패 시 재실행
- Full fresh vs. Incremental update
Data Warehouse Deep Dive
Data Warehouse: a separate SQL database
- 2005~2011's 하둡 주목 - SQL은 죽었다?
- SQL은 작은 규모에서 운용?
- Map Reduce 각광
- C++로 웹서비스 만드는 느낌
- 2010년 이후 다시 SQL로 회귀
- 스크립트 언어로 웹서비스 만드는 느낌
- 1960's부터 시작된 성숙된 SQL
- 2010's 이후 하둡 등의 Map Reduce 위에 SQL 지원하는 기능 등장
- Hive, Spark 빅데이터 프로세싱에도 SQL 사용
- 상황에 맞게 데이터베이스 선택
- Mysql를 데이터 웨어하우스로 사용하는 기업도 존재
- Data Warehouse isn’t your production database
- It should be separated from your production database
- 쿼리 잘못 날리면 production에 영향을 주므로
- OLAP (OnLine Analytical Processing) vs. OLTP (OnLine Transaction Processing)
- OLAP은 느려도 된다. 프로덕션인 OLTP는 단방에 결과가 나와야 한다는 암묵적 인식
- It should be separated from your production database
Central Data Storage of your Company
- The first step to become a “true” data organization
- data warehouse가 생겨야 진정한 데이터 조직
- 프로덕션과 나뉘기 때문에 자유도가 높아져
- Store raw data from different sources
- 프로덕션 이외에 소스가 생겨서 더 많은 일이 가능해져
- Create summary tables out of the raw data (and other summary)
- Expose the summary tables to employees within the company
- Being consistent is more important than being correct
- Fixed Cost option vs. Scalable Option
- Scalable options will provide decoupled capacity increase of storage and computation
- BigQuery and Snowflake
- Fixed cost option will give stable cost projection
- Redshift or most on-premise data warehouse (open source based)
- Scalable options will provide decoupled capacity increase of storage and computation
Redshift Introduction
- 페타바이트까지 지원가능하지만 페타까지 가면 느려질 것
- 여전히 OLAP
- 쿼리 응답 느리다
- 컬럼별 저장
- 컬럼별 압추 가능하다
- 벌크 업데이트 지원
- SQL로만 쿼리 날려야 하므로 테이블 스키마 디자인 매우 중요
Redshift is Postgresql 8.x compatible
- But Redshift doesn’t support full features of Postgresql 8.x
- Accessible through most Postgresql 8.x clients/libraries
- JDBC/ODBC
- SQL is the main language to interact
- Table schema design matters
Redshift Optimization can be tricky
- Distkey, Diststyle and Sortkey need to be optimized in 2+ Redshift cluster
- distyle is to determine how distribution will be done
- all, even, and key (default is “even”)
- distkey is to determine which node to store a record
- sortkey is to determine in what order to store records in a node
- distyle is to determine how distribution will be done
- A skewed table can cause a big performance issue
- 예 : 서버 8대 - 테이블 1개 만들고 레코드 넣으면 8대 중 1대로 가야 하는데 어떤 키를 써야 할지 개발자가 정해줘야 한다. 빅쿼리 등과 달리 레드시프트만 개발자가 정해줘야 한다.
- 랜덤, 모든 노드, 키 값에 맞춰서 Ditribution Key
- 무조건 Even(랜덤) 쓰는 게 상책
- 랜덤, 모든 노드, 키 값에 맞춰서 Ditribution Key
- 기본 SQL이지만 파이썬으로 보강 가능
- 캐시 도입 가능
- Bulk Update Sequence -> S3 -> 레드시프트
- 카피 커맨드 -> 조단위 레코드 입력도 가능, 몇 백배 차이를 낼 수 있음
How to Access Redshift
- JDBC/ODBC desktop tools such as
- SQLWorkBench, Navicat and so on
- Requires IP registration for outside access
- JDBC/ODBC Library
- Any PostgreSQL 8.0.x compatible should work
- In Python, psycopg2 is a preferred module to use
User and User Group Creation
CREATE GROUP analytics_users;
GRANT USAGE ON SCHEMA analytics TO GROUP analytics_users;
GRANT USAGE ON SCHEMA raw_data TO GROUP analytics_users;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics TO GROUP analytics_users;
GRANT SELECT ON ALL TABLES IN SCHEMA raw_data TO GROUP analytics_users;
GRANT ALL ON SCHEMA adhoc to GROUP analytics_users;
GRANT ALL ON ALL TABLES IN SCHEMA adhoc TO GROUP analytics_users;
CREATE USER keeyong PASSWORD '...';
ALTER GROUP analytics_users ADD USER keeyong;
- 과제
- GROUP BY 1, ORDER BY 1 are preferred (concise)
- TO_CHAR (ts, ‘YYYY-MM’)
- LEFT(ts, 7) or DATE_TRUNC(‘month’, ts) does the same
-
Counting distinct userid not sessionid
- Criteria of Good Summary Tables
- Make your job or other’s job faster and easier and most importantly accurate/consistent
- Key Metrics Calculation (Revenue, User Growth, …)
- Marketing Performance
- Build around key assets of your service/product
- Make your job or other’s job faster and easier and most importantly accurate/consistent
- Items you are selling, Customers
SQL for Data Engineer
Easy to Use and Vetted over Time
- 맵리듀스에 비해 쉽다
- SQL was originally developed in IBM in early 70s
- There are two parts
- DDL: Data Definition Language
- CREATE TABLE, DROP TABLE, ALTER TABLE
- DML: Data Manipulation Language
- SELECT, INSERT INTO
- DDL: Data Definition Language
- In mid 2000s it was less popular as new big data technologies emerged
- But eventually all of them supported some forms of SQLs
Disadvatage of SQL
- Optimized for structured data handling
- Regular expression and JSON can be used for unstructured data handling to a certain degree
- Redshift can only handle a flat structure (not nested)
- Big Query supports a nested structure (very powerful)
- Star schema isn’t always great vs. Denormalized schema
- There is no standard syntax (various SQL dialects)
- Mostly similar though
- Not very flexible in handling semi-structured or unstructured data:
- More flexible large scale unstructured data processing requires another framework
- Spark, Hadoop (MapReduce -> Hive) and so on
Redshift SQL
- 데이터 퀄러티 - 현업에 클린 데이터란 없다.
- 처음 쓰는 데이터는 절대 믿지 말라.
- 스키마만 보고 짐작하면 오판 가능성 높다.
- 실제 레코드를 몇 개 살펴보는 것이 정답!
- 어떤 데이터인지 이해할 수 있을 정도는 확인해야
- 같은 PK가 있어도 삽입 가능
- PK 유니크 보장하려면 확인 절차가 퍼포먼스에 걸림돌이 된다.
- 엔지니어가 보장해야
- Comment
- – for single line comment, /* */ multi line comment
DDL (Data Definition Language)
- CREATE TABLE
- Primary key: Big Data 데이터웨어하우스에서는 지켜지지 않음 (Redshift, Snowflake, BigQuery)
- CREATE TABLE table_name AS SELECT
- vs. CREATE TABLE and then INSERT
- DROP TABLE
- DROP TABLE IF EXISTS table_name 존재하면 삭제 아니면 말고
- ALTER TABLE
- Add new columns, Rename existing columns, dropping existing columns
SELECT
- SELECT COUNT(1) FROM raw_data.session_timestamp;
- COUNT(1) vs. COUNT(*) - count(1)이 더 빠르다
- CASE WHEN
- Multiple CASE WHEN: to convert values from integer to string or vice versa
- GROUP BY
- Count by channel
- ORDER BY
- ASC vs. DESC
- Multi-Column ordering
- ORDER BY 1, DESC 2; -> 첫 번째 컬럼은 오름차순으로 하되 같은 값들이 존재하면 두 번째 컬럼의 내림차순으로
WHERE
- IN
- WHERE channel in (‘Google’, ‘Youtube’)
- WHERE channel = ‘Google’ AND channel = ‘Youtube’
- NOT IN
- WHERE channel in (‘Google’, ‘Youtube’)
- LIKE and ILIKE
- LIKE is a case sensitive string match. ILIKE is a case-insensitive string match
- WHERE channel LIKE ‘G%’
- NOT LIKE or NOT ILIKE
- ILKE - 대소문자 구별 노노
- BETWEEN
- Used for date range matching
STRING Funcions
LEFT(str, N) REPLACE(str, exp1, exp2) UPPER(str) LOWER(str) LEN(str)
INSERT INTO vs. COPY
- INSERT INTO is slower than COPY
- COPY is a batch insertion mechanism.
- INSERT INTO table_name SELECT * FROM …
- This is better than CTAS (CREATE TABLE table_name AS SELECT) if you want to control the types of the fields
- 필드 타입과 이름 지정 불가, text 타입과 크기도 사용 불가
- But matching varchar length can be challenging
- Snowflake and BigQuery support String type (no need to worry about string length)
Type Cast and Conversion
- DATE Conversion:
- CONVERT_TIMEZONE
- CONVERT_TIMEZONE('America/Los_Angeles', ts)
- DATE, TRUNCATE
- DATE_TRUNC
- 첫번째 인자가 어떤 값을 추출하는지 지정 (week, month, day, …)
- EXTRACT or DATE_PART: 날짜시간에서 특정 부분의 값을 추출가능
- CONVERT_TIMEZONE
- Type Casting:
- cast or :: operator
- category::int or cast(category as int)
- TO_CHAR, TO_TIMESTAMP
Prctice
1)
CREATE TABLE adhoc.이름_channel (
channel varchar(32) primary key
);
INSERT INTO adhoc.이름_channel VALUES ('FACEBOOK'), ('GOOGLE');
SELECT * FROM adhoc.이름_channel;
2)
DROP TABLE adhoc.이름_channel;
CREATE TABLE adhoc.이름_channel AS
SELECT DISTINCT channel
FROM raw_data.user_session_channel;
3)
ALTER TABLE adhoc.이름_channel RENAME channel to channelname;
INSERT INTO adhoc.이름_channel VALUES ('TIKTOK');
%%sql
CREATE TABLE adhoc.yongjunleeme_channel as
select distinct channel
from raw_data.user_session_channel;
ALTER TABLE adhoc.yongjunleeme_channel RENAME channel to channelname;
INSERT INTO adhoc.yongjunleeme_channel.value('TICTOK');
SELECT
LEN(channelname),
UPPER(channelname),
LOWER(channelname),
REPLACE(UPPER(LEFT(channelname, 4)), 'OO', 'XX')
FROM adhoc.yongjunleeme_channel;
- Session이 가장 많이 생성되는 시간대?
SELECT EXTRACT(HOUR FROM st.ts),
COUNT(DISTINCT(usc.userid))
FROM raw_data.user_session_channel usc
JOIN raw_data.session_timestamp st
ON usc.sessionid = st.sessionid
GROUP BY 1
ORDER BY 2 DESC;
JOIN
- 이미지
- Be Careful in using JOIN
- JOIN을 하는 테이블들간의 관계
- One to one
- 조인 시 레코드 수 그대로 또는 교집합 수로 줄어든다
- One to many?
- Many to one?
- Many to many?
- 조인 시 레코드 수 늘어난다.
- 데이터가 클린하지 않으면 결과의 레코드수가 확 늘어나 문제
- 조인 시 레코드 수 늘어난다.
- One to one
- When to use INNER JOIN vs. LEFT JOIN
- Outer JOIN or RIGHT JOIN are relatively rare to use
SELECT c.channelname, COUNT(DISTINCT(usc.userid))
FROM raw_data.channel c
JOIN raw_data.user_session_channel usc ON c.channelname = usc.channel
JOIN raw_data.session_timestamp st ON usc.sessionid = st.sessionid
GROUP BY 1
ORDER BY 2 DESC;
Keeyong에서 모두에게: (11:48 오전)
SELECT c.channelname, COUNT(DISTINCT(usc.userid))
FROM raw_data.channel c
LEFT JOIN raw_data.user_session_channel usc ON c.channelname = usc.channel
LEFT JOIN raw_data.session_timestamp st ON usc.sessionid = st.sessionid
GROUP BY 1
ORDER BY 2 DESC;
GRANT SELECT ON TABLE raw_data.channel to group analytics_users;
NULL
- 값이 존재하지 않음을 의미
- 0이나 비어있는 string과는 다름을 분명히 인지
- IS NULL or IS NOT NULL
- 노노 - NULL or <> NULL
- Boolean 타입의 필드도 “IS TRUE” 혹은 “IS FALSE”로 비교
- LEFT JOIN시 매칭되는 것이 있는지 확인하는데 아주 유용
- NULL 값을 다른 값으로 변환하고 싶다면
- COALESCE를 사용
SELECT SUM(amount)
FROM raw_data.session_transaction
WHERE refunded IS True;
Advanced SQL
- UNION (합집합)
- 여러개의 테이블들이나 SELECT 결과를 하나의 결과로 합쳐줌
- UNION vs. UNION ALL
- UNION은 중복을 제거
- EXCEPT (MINUS)
- 하나의 SELECT 결과에서 다른 SELECT 결과를 빼주는 것이 가능
- INTERSECT (교집합)
- 여러 개의 SELECT문에서 같은 레코드들만 찾아줌
COALESCE, NULLIF
- COALESCE(Expression1, Expression2, …):
- 첫번째 Expression부터 값이 NULL이 아닌 것이 나오면 그 값을 리턴하고 모두 NULL이면 NULL을 리턴한다.
- NULL값을 다른 값으로 바꾸고 싶을 때 사용한다.
- NULLIF(Expression1, Expression2):
- Expression1과 Expression2의 값이 같으면 NULL을 리턴한다
DELETE FROM vs. TRUNCATE
- DELETE * FROM vs. DELETE FROM
- DELETE FROM table_name
- 테이블에서 모든 레코드를 삭제
- vs. DROP TABLE table_name
- WHERE 사용해 특정 레코드만 삭제 가능:
- DELETE FROM raw_data.user_session_channel WHERE channel = ‘Google’
- TRUNCATE table_name도 테이블에서 모든 레코드를 삭제
- DELETE FROM은 속도가 느림
- TRUNCATE이 전체 테이블의 내용 삭제시에는 여러모로 유리
- 하지만 TRUNCATE는 WHERE을 지원하지 않음
WINDOW
- Syntax:
- function(expression) OVER ( [ PARTITION BY expression] [ ORDER BY expression ] )
- Useful functions:
- ROW_NUMBER, FIRST_VALUE, LAST_VALUE
- Math functions: AVG, SUM, COUNT, MAX, MIN, MEDIAN, NTH_VALUE
Practice
- 사용자별로 처음 채널과 마지막 채널 알아내기
- ROW_NUMBER vs. FIRST_VALUE/LAST_VALUE
- 사용자 251번의 시간순으로 봤을 때 첫 번째 채널과 마지막 채널은 무엇인가?
- 노가다를 하자면 아래 쿼리를 실행해서 처음과 마지막 채널을 보면 된다.
SELECT ts, channel
FROM raw_data.user_session_channel usc
JOIN raw_data.session_timestamp st ON usc.sessionid = st.sessionid
WHERE userid = 251
ORDER BY 1
- ROW_NUMBER 이용해서 해보자
- ROW_NUMBER() OVER (PARTITION BY field1 ORDER BY field2)
SELECT
ts, channel, ROW_NUMBER() OVER (partition by userid order by ts) as N
FROM raw_data.user_session_channel usc
JOIN raw_data.session_timestamp st
ON usc.sessionid = st.sessionid
WHERE userid = 251;
SELECT * FROM (SELECT ts, channel, ROW_NUMBER()
OVER (partition by userid order by ts) as N
FROM raw_data.user_session_channel usc
JOIN raw_data.session_timestamp st
ON usc.sessionid = st.sessionid
WHERE userid = 251
)
WHERE N = 1
JSON Parsing Functions
- JSON의 포맷을 이미 아는 상황에서만 사용가능한 함수
- JSON String을 입력으로 받아 특정 필드의 값을 추출가능 (nested 구조 지원)
- 예제) JSON_EXTRACT_PATH_TEXT
SELECT JSON_EXTRACT_PATH_TEXT
('{"f2":{"f3":1},"f4":{"f5":99,"f6":"star"}}','f4', 'f6');
숙제
Airflow
Quick Airflow Intro
- Airflow is a platform for data pipelines in Python (beyond a scheduler)
- Not just a scheduler but also also a worker(s)
- Provides a web UI as well
- Not just a scheduler but also also a worker(s)
- Airflow is a cluster of servers
- DAG and scheduling info are stored in a DB (SQLite is used by default)
- MySQL or Postgres is preferred for real production usage
- DAG and scheduling info are stored in a DB (SQLite is used by default)
- DAG -> ETL
- 태스크로 구성
- 오퍼레이터의 인스턴스
- 단점
- 서버 늘어나면 유지보수 복잡해진다
- GCP provides “Cloud Composer”를 쓰면 편하다
DAG Common Configuration Example
from datetime import datetime, timedelta
from airflow import DAG
default_args = {
'owner': '',
'start_date': datetime(2020, 8, 7, hour=0, minute=00), # 백필할 때, 인크리멘탈 업데이트할 때 유용(복잡도는 상승)
'end_date': datetime(2020, 8, 31, hour=23, minute=00),
'email': [''],
'retries': 1, # 리트라이 횟수
'retry_delay': timedelta(minutes=3), # 3분 기다렸다가 리트라이
}
test_dag = DAG(
"dag_v1", # DAG name, 웹 UI에 나옴, 유니크해야
# schedule (same as cronjob)
schedule_interval="0 9 * * *", # 매일 9시 0분에 실행하라, year는 생략 상태
# common settings
default_args=default_args # timezone 파라미터는 설치 시 환경파일에 지정, 기본은 UTC
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
t1 >> t2 # t1이 끝나면 t2를 실행하라, t2.set_upstream(t1)
t1 >> t3 # t1이 끝나면 t3를 실행하라, t3.set_upstream(t1)
아래 설치 명령들은 우분투를 기반으로 한다.
http://www.marknagelberg.com/getting-started-with-airflow-using-docker/
Docker 설치
먼저 Docker를 설치하기 위헤 apt 패키지 매니저 자체를 업데이트하고 설치를 진행한다.
$ sudo apt-get update
$ sudo apt install -y docker.io
$ sudo systemctl start docker
$ sudo systemctl enable docker
$ docker --version
다음으로 hello-world를 실행하여 설치가 제대로 되었는지 확인한다. 출력문에 "Hello from Docker!"가 있어야 한다.
$ sudo docker run hello-world
Airflow 설치
먼저 dags 폴더를 하나 만든다. 이 폴더 아래 Python으로 만든 DAG 코드가 존재해야 한다.
$ pwd
/home/ubuntu/
$ mkdir dags
$ ls -tl
total 4
drwxrwxr-x 2 ubuntu ubuntu 4096 Aug 3 02:43 dags
Docker를 이용해 Airflow 설치
간단 설치 방법:
$ sudo docker pull puckel/docker-airflow
Airflow가 설치된 Docker 이미지 이름을 보는 방법 (puckel/docker-airflow를 찾는다)
$ sudo docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
puckel/docker-airflow latest ce92b0f4d1d5 5 months ago 797MB
hello-world latest bf756fb1ae65 7 months ago 13.3kB
더 자세히 설치하려면 https://medium.com/@xnuinside/quick-guide-how-to-run-apache-airflow-cluster-in-docker-compose-615eb8abd67a를 참고
Airflow 실행
다음으로 이 폴더를 dags 폴더로 지정해서 Airflow를 실행한다.
sudo docker run -d -p 8080:8080 -v /home/ubuntu/dags:/usr/local/airflow/dags puckel/docker-airflow webserver
Airflow 웹서버 방문
http://호스트이름:8080/
기타
Airflow Container 실행 중단하기
먼저 Airflow Docker instance의 이름을 알아낸다. "sudo docker ps"를 명령을 실행하여 NAMES 컬럼밑에 나오는 이름을 기억한다. 여기서는 angry_wu인데 매번 바뀐다는 점에 유의한다.
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a56dbb111b5b puckel/docker-airflow "/entrypoint.sh webs…" 21 minutes ago Up 21 minutes 5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp angry_wu
이 이름을 가지고 중단한다
$ sudo docker stop angry_wu
data-engineering repo내의 HelloWorld.py를 Airflow로 올리기
/home/ubuntu에서 아래를 실행
$ git clone https://github.com/keeyong/data-engineering.git
다음으로 data-engineering/dags/HelloWorld.py를 /home/ubuntu/dags 폴더로 복사
cp data-engineering/dags/HelloWorld.py /home/ubuntu/dags/
Airflow를 중단하기 위해 Docker Container의 이름을 알아낸다.
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a56dbb111b5b puckel/docker-airflow "/entrypoint.sh webs…" 21 minutes ago Up 21 minutes 5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp angry_wu
$ sudo docker stop angry_wu
Airflow를 재실행한다.
sudo docker run -d -p 8080:8080 -v /home/ubuntu/dags:/usr/local/airflow/dags puckel/docker-airflow webserver
이미 실행된 Airflow Docker Container를 같은 인자로 다시 재실행하고 싶다면
awesome_shaw가 Container의 이름인 경우 아래와 같이 실행한다.
sudo docker restart awesome_shaw
Docker 내 Airflow DAG 폴더로 이동하기
이는 DAG 개발을 위해서 필요하다. 먼저 "sudo docker ps"를 명령을 실행하여 NAMES 컬럼밑에 나오는 이름을 기억한다.
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a56dbb111b5b puckel/docker-airflow "/entrypoint.sh webs…" 21 minutes ago Up 21 minutes 5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp angry_wu
이 이름을 가지고 "sudo docker exec -ti"으로 컨테이너 안으로 이동한다.
$ sudo docker exec -ti angry_wu bash
airflow@a56dbb111b5b:~$ pwd
/usr/local/airflow
airflow@a56dbb111b5b:~$ cd dags
개발
여기에서 ls -tl과 같은 명령을 실행해보면 HelloWorld.py를 볼 수 있어야 한다 (앞서 과정을 거쳤다면). 새로운 DAG 개발을 원한다면 여기서 파이썬 코드 파일을 만든다. 근데 Docker는 기본적으로 아무런 에디터 설치가 되어 있지 않다. 아래와 같이 vim을 설치해 사용한다:
apt-get update
apt-get install vim
Airflow 명령 실행
여기서 여러가지 airflow 명령어들을 실행해볼 수 있다. 예를 들어 list_dags를 실행하면 현재 설치되어 있는 모든 DAG들이 리스트된다.
airflow@a56dbb111b5b:~$ airflow list_dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_complex
example_external_task_marker_child
example_external_task_marker_parent
example_http_operator
example_passing_params_via_test_command
example_pig_operator
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test_utils
tutorial
my_first_dag
- "airflow list_tasks DAG이름"을 실행하면 DAG에 속한 태스크들의 이름이 모두 나열된다.
- 특정 태스트를 실행하고 싶다면 예를 들어 task ID가 print_hello라면 "airflow test DAG이름 print_hello 2020-08-09" 이렇게 실행하면 된다. 여기서 주의할 점은 2020-08-09의 경우 DAG의 start_date보다는 뒤어야 하지만 현재 시간보다 미래이면 안된다.
Docker Container의 내용을 저장하기
- https://stackoverflow.com/questions/44480740/how-to-save-a-docker-container-state
Airflow Deepdive
4주차 리뷰
- 변수 설정 기능
- Connction 설정도 가능
- secret_key로 네이밍하면 저절로 암호화
- Docker 명령 리뷰
sudo docker run -d -p 8080:8080 -v /home/ubuntu/dags:/usr/local/airflow/dags puckel/docker-airflow webserver
- /home/ubuntu/dags의 내용이 docker container내의 /usr/local/airflow/dags로 미러링이 됨
- 예를 들어 /home/ubuntu/dags에 있는 파일의 내용을 바꾸는대로 바로 반영이 됨
Airflow Operation, Variables and Connections
DAG Configuration Example
- dag 인자를 딕셔너리롤 만들어서 할당
from datetime import datetime, timedelta
from airflow import DAG
default_args = {
'owner': 'yongjun',
'start_date': datetime(2020, 8, 7, hour=0, minute=00),
'end_date': datetime(2020, 8, 31, hour=23, minute=00),
'email': ['yongjun@mail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
dag = DAG(
"dag_v1", # DAG name
# schedule (same as cronjob)
schedule_interval="0 9 * * *",
# common settings
default_args=default_args
)
- 주의할 점
- Note that Airflow will run the first job at start_date + interval.
- start_date에 바로 시작이 아니라 start_date + interval에 처음 시작
- A daily dag with the start_date '2020-07-26 01:00' starts at 2020-07-27 01:00.
- Start_date 이해하기(catchup 설정은 디폴트가 true)
- 2020-08-10 02:00:00로 start date로 설정된 daily job이 있다
- catchup이 True로 설정되어 있다고 가정 (디폴트가 True)
- 지금 시간이 2020-08-13 20:00:00이고 처음으로 이 job이 활성화되었다
- 질문: 이 경우 이 job은 몇번 실행될까?
- 2020-08-10 02:00:00 -> 2020 08 11 실행
- 2020-08-11 02:00:00 -> 2020 08 12 실행
- 2020-08-12 02:00:00 -> 2020 08 13 실행
- 2020-08-13 02:00:00 -> 2020 08 14 실행
- 질문: 이 경우 이 job은 몇번 실행될까?
- 2020-08-10 02:00:00로 start date로 설정된 daily job이 있다
- 타임존 변경 원하면 airflow.ctg 파일에서 변경 가능
Important DAG parameters (not task parameters)
- 꼭 DAG의 파라미터로 써줘야 한다
- max_active_runs: # of DAGs instance
- 실행할 DAG의 수
- concurrency: # of tasks that can run in parallel
- catchup # whether to backfill past runs
- 이전 시점 실행 여부, 디폴트가 true
- max_active_runs: # of DAGs instance
Operators - PythonOperator
- 파라미터를 넘기려면
provide_context = True
- params 딕셔너리 넘겨야
- excution_date는 start_date와 같이 실행 시간, 실제 실행 시점은 interval을 더해야
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table': 'delighted_nps',
'schema': 'raw_data'
},
provide_context=True
)
from airflow.exceptions import AirflowException
def python_func(**cxt):
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# do what you need to do
...
How to Trigger DAGs
- 스케쥴과 상관 없이 UI에서 Trigger
- How to Trigger a DAG - From Terminal
- Airflow 도커 컨테이너에 접근한 다음
- airflow list_dags
- airflow list_tasks (DAG이름)
- airflow test DAG이름 Task이름 날짜
- 날짜는 YYYY-MM-DD
- start_date보다 과거이거나 오늘 날짜보다 미래인 경우 실행 안됨
- 날짜는 YYYY-MM-DD
- airflow test second_assignment_v3 extract 2020-08-09
Instructions to follow
- Airflow 컨테이너가 실행중이라면 일단 중단. 컨테이너 이름을 찾아 아래 명령 실행
$ sudo docker stop (컨테이너이름)
- keeyong/data-engineering repo를 Airflow 서버에서 다운로드
- 이미 다운받았다면 data-engineering 폴더안에서 git pull 실행
$ git clone https://github.com/keeyong/data-engineering
- 이 repo의 dags 폴더를 가지고 Docker container를 실행
- Docker run 할때 —name [이름] -> 이름 지정 가능
$ sudo docker run -d -p 8080:8080 -v /home/ubuntu/data-engineering/dags:/usr/local/airflow/dags puckel/docker-airflow webserver
$ sudo docker run --name yongjunlee -it -d -p 8080:8080 -v /home/ubuntu/data-engineering/dags:/usr/local/airflow/dags puckel/docker-airflow webserver
- 해당 docker container의 이름을 찾아서 container안으로 이동
- 컨테이너 이름 찾기
$ sudo docker ps
- 컨테이너 안으로 이동하기
$ sudo docker exec -ti (컨테이너이름) bash
- 여기서 필요한 2개의 파이썬 모듈을 설치
$ pip install botocore
$ pip install boto3
- 컨테이너를 빠져나와서 컨테이너를 재시작
$ sudo docker restart (컨테이너이름)
- Airflow 웹서버 접근: http://(airflow서버이름):8080/
- Variables
- csv_url = https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv
- iam_role_for_copy_access_token = arn:aws:iam::080705373126:role/redshift.read.s3
- Redshift 테이블 생성
CREATE TABLE (본인스키마).customer_variants (
customer_id int,
variant varchar(8),
primary key (customer_id)
);
CREATE TABLE (본인스키마).customer_interactions (
customer_id int,
datestamp date,
item_id int,
clicked int,
purchased int ,
paidamount int,
primary key(customer_id, datestamp, item_id, clicked,purchased)
);
CREATE TABLE (본인스키마).item_features (
item_id int,
quality float,
popularity varchar(16),
price int,
primary key(item_id)
);
CREATE TABLE (본인스키마).customer_features (
customer_id int,
engagement_level varchar(16),
gender varchar(16),
PRIMARY KEY (customer_id)
);
Airflow Installation on Ubuntu
- install
-
포스트그레스 연결 시 프로덕션에서는 마스터가 아닌 슬레이브를 만들어서 연결하는 것이 더 안전하다
-
airflow.cfg -> dags_folder
- Task의 현재 디렉토리는?
- Airflow Dag 디렉토리가 Task의 현재 디렉토리
- Task 코드안에서 다음을 실행해보면 분명히 알 수 있음
import os
dir_path = os.path.dirname(os.path.realpath(__file__))
logging.info(dir_path)