Data Engineering24 Airflow Rest API를 사용하여 DAG 호출하기 API 인증 설정 username, password를 사용해서 인증을 할 수 있도록 airflow.cfg파일에서 아래와 같이 설정한다. 기본 설정값은 session 인증이다. [api] auth_backends = airflow.api.auth.backend.basic_auth 현재 설정된 값을 확인하고 싶으면 아래 명령어를 이용한다. $ airflow config get-value api auth_backends airflow.api.auth.backend.basic_auth DAG 파일 작성 호출 할 DAG파일을 작성한다. 아래의 예제는 DAG를 호출 할 때 configuration값을 넘겨서 호출 된 DAG에서 해당 값을 사용할 수 있도록 구성되어 있다. import pendulum from air.. 2024. 1. 13. MSK 클러스터에 S3 sink connector 연결해서 S3에 데이터 적재하기 💡 MSK Connector는 연결하고자 하는 MSK Cluster의 인증방식으로 인증없음이나 IAM 인증만 지원한다. MSK에서 S3 Sink Connector를 사용해서 S3에 접근하기 위해서는 Connector 생성 전에 아래 두개의 리소스가 먼저 생성되어야 한다. S3 서비스 VPC 엔드포인트 Connector에 부여할 IAM Role S3 서비스 VPC 엔드포인트 생성 VPC Console -> Virtual private cloud -> endpoints 접속 -> endpoint 생성 Service에서 s3 검색 Type이 Gateway인 서비스 선택 VPC: msk cluster가 있는 VPC 선택 라우팅 테이블: msk cluster에 연결된 라우팅 테이블 선택 정책: 전체 엑세스 Kaf.. 2023. 11. 12. Airflow Dag 작성 Best practice Airflow 공식홈페이지에서 안내하는 Dag 작성 Best practice를 기반으로 평소에 염두해 두고 코드를 작성하기 위해서 정리했다. 참고: https://airflow.apache.org/docs/apache-airflow/2.0.0/best-practices.html#writing-a-dag 🏅 Airflow Best Practice DAG 작성 Task 사이의 데이터 전달은 작은 사이즈는 Xcom 을 사용하고, 대용량은 S3/HDFS 같은 외부 저장소를 사용한다. Airflow에 등록한 Variable은 Operator의 execute()나 Jinja template 외에서의 사용은 지양한다. 불필요한 top level code는 지양한다. import pendulum from airflow.. 2023. 11. 5. Kinesis Firehose - Lambda를 이용한 Dynamic Partitioning구현 Lambda 생성 원하는 언어를 선택해서 Lambda를 생성한다. 여기서 Lamda 실행 IAM Role은 default로 생성되는 기본 role을 사용해도 되고, Lambda 함수 timeout은 1분 이상으로 설정한다. (링크에서 Firehose의 데이터를 변환하는 Python, Go 예제 코드를 확인 가능) 아래는 source record를 파싱해서 partition key를 지정하는 Python 예제 코드이다. from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(f.. 2023. 8. 27. 이전 1 2 3 4 5 6 다음