본문 바로가기
Data Engineering/Kinesis

Kinesis Firehose - Lambda를 이용한 Dynamic Partitioning구현

by 홍띠 2023. 8. 27.

Lambda 생성

원하는 언어를 선택해서 Lambda를 생성한다.

여기서 Lamda 실행 IAM Role은 default로 생성되는 기본 role을 사용해도 되고, Lambda 함수 timeout은 1분 이상으로 설정한다.

(링크에서 Firehose의 데이터를 변환하는 Python, Go 예제 코드를 확인 가능)

 

아래는 source record를 파싱해서 partition key를 지정하는 Python 예제 코드이다.

from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "date": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output

 

Firehose 설정

Firehose 구성에서 Lambda를 이용한 source 레코드 변환을 Enable 한다.

Lambda function에 위에서 생성한 Lamba지정하고, 가장 최신의 lambda 소스를 사용할 예정이라면 version은 $Latest로(default) 지정한다.

Destination settings에서 Dynamic partitioning을 Enabled한다.

( Partitioning은 한번 Enable하면 추후에 disable 시킬 수 없다. )

S3 bucket prefix를 아래와 같이 설정해서 Lamba에서 지정한 Partition Key를 S3 저장 경로에 사용하도록 설정한다.

kinesis-firehose-dir/!{partitionKeyFromLambda:year}/!{partitionKeyFromLambda:month}/!{partitionKeyFromLambda:date}/!{partitionKeyFromLambda:hour}/!{partitionKeyFromLambda:customerId}/

IAM Role 변경

Firehose의 실행 role에 위에서 생성한 Lambda를 Invoke 시킬 수 있도록 role을 추가한다.

        {
            "Sid": "",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": "arn:aws:lambda:ap-northeast-2:{account-id}:function:partitionkey-parsing-function:$LATEST"
        }

'Data Engineering > Kinesis' 카테고리의 다른 글

Kinesis Firehose Dynamic partitioning 설정  (0) 2022.12.04