Kinesis Data Streams를 이용하면서, 다른 AWS 서비스에 DataStreams에 write된 데이터를 전달하는 경우에 Kinesis Firehose (Delivery Stream)을 사용한다. 내 경우에는 AWS S3로 데이터를 전달하는 용도로 사용하는데, Firehose는 UTC의 타임존을 기준으로 S3 경로를 설정하기 때문에 실제 데이터에 기록되는 KST기준의 시간과 디렉토리의 경로 명이 맞지 않는 문제가 발생했다.
그래서 방법을 찾아본 결과, Firehose의 타임존을 설정으로 변경할 수는 없다. 하지만, Dynamic partitioning을 사용해서 S3 Prefix를 커스터마이징 할 수 있다. 타임존을 변경하는것 외에도 시간이 아닌 다른 key를 설정 할 수도 있고, 데이터를 파싱하고 변환해서 사용 할 수도 있다.
💡 Dynamic partitioning 사용여부는 Firehose생성시에만 설정 할 수 있고, 생성 후에 편집은 불가하다.
Firehose를 새로 생성하는 과정에서 Destination Settings 섹션이 있다. 여기서 데이터를 저장할 S3 버킷을 설정하고, Dynamic partitioning을 enabled 시켜준다.
Enabled로 설정하면 아래와 같은 새로운 항목들이 추가된다.
Multiline record deaggregation
집계된 데이터가 들어오는 경우 deaggregation을 이용해서 다중 레코드를 분리하고 파싱 할 수 있다.
Json 형식과, 사용자 지정 구분자(base64 인코딩 된 문자열)을 지원한다.
New line delimiter
S3로 데이터를 전송 할 때, 각 레코드 사이에 new line 구분자(’\n’)를 추가 할 수 있다.
사용자 지정 구분자는 불가하다.
Inline parsing for JSON
Inline Parsing으로 파티셔닝 키를 생성 할 수 있다. 여기서 생성된 파티셔닝 키는 S3 Prefix지정 시에 사용 가능 하다.
위에서 언급한 Timezone 문제처럼 S3의 prefix를 설정을 기본설정 외의 방법으로 변경하고자 할때, 할 수 있는 방법 중 하나이다.
아래와 같은 샘플 데이터가 들어온다고 가정 했을때, Inline Parser를 어떻게 구성하면 되는지에 대한 공식문서 예시이다.
#sample data
{
"type": {
"device": "mobile",
"event": "user_clicked_submit_button"
},
"customer_id": "1234567890",
"event_timestamp": 1630442383, #epoch timestamp
"region": "sample_region"
}
Inline Parsing은 jq expression으로 설정한다. (jq parser: https://stedolan.github.io/jq/)
Parameter | jq expression |
customer_id | .customer_id |
device | .type.device |
year | .event_timestamp| strftime("%Y") |
month | .event_timestamp| strftime("%m") |
day | .event_timestamp| strftime("%d") |
hour | .event_timestamp| strftime("%H") |
만약에 나의 경우와 같이 UTC 타임존이 아닌 KST 타임존에 맞게 변환되길 원하는 경우에는 위의 jq 표현식에 아래와 같이 추가해주면 된다. (UTC와 KST의 9시간 차이만큼 타임스탬프에 더해주는것!)
[.event_timestamp, 32400] | add | strftime("%Y%m%d %H")
들어온 데이터를 원하는 파티셔닝 키로 설정하였으면, 아래와 같이 S3 Prefix를 설정하면 된다.
#S3 Prefix
customer_id=!{partitionKeyFromQuery:customer_id}/device=!{partitionKeyFromQuery:device}/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/hour=!{partitionKeyFromQuery:hour}/
#S3 folder structure 결과
s3://<Bucket>/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa
Lambda 함수를 이용해서 파티셔닝 키 생성
람다함수를 이용해서 파티셔닝 키를 설정하는 것은 다아나믹 파티셔닝 항목에 보이지는 않지만, 위의 데이터 변환 항목에서 람다함수를 설정하고 파티셔닝 키에도 사용 할 수 있다.
먼저 파티셔닝 키를 메타데이터로 설정하는 람다 함수를 생성한다. (아래의 예시도 공식문서에서 제공하는 예시이다.)
# This is an Amazon Kinesis Data Firehose stream processing Lambda function that
# replays every read record from input to output
# and extracts partition keys from the records.
from __future__ import print_function
import base64
import json
import datetime
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
# Create return value.
firehose_records_output = {}
# Create result object.
firehose_records_output['records'] = []
print("\\n")
# Go through records and process them.
for firehose_record_input in firehose_records_input['records']:
# Get user payload.
payload = base64.b64decode(firehose_record_input['data'])
jsonVal = json.loads(payload)
print("Record that was received")
print(jsonVal)
print("\\n")
# Create output Firehose record and add modified payload and record ID to it.
firehose_record_output = {}
eventTimestamp = datetime.datetime.fromtimestamp(jsonVal['eventTimestamp'])
partitionKeys = {}
partitionKeys["customerId"] = jsonVal['customerId']
partitionKeys["year"] = eventTimestamp.strftime('%Y')
partitionKeys["month"] = eventTimestamp.strftime('%m')
partitionKeys["date"] = eventTimestamp.strftime('%d')
partitionKeys["hour"] = eventTimestamp.strftime('%H')
partitionKeys["minute"] = eventTimestamp.strftime('%M')
# Must set proper record ID.
firehose_record_output['recordId'] = firehose_record_input['recordId']
firehose_record_output['data'] = firehose_record_input['data']
firehose_record_output['result'] = 'Ok'
firehose_record_output[metadata]['partitionKeys'] = partitionKeys
# Add the record to the list of output records.
firehose_records_output['records'].append(firehose_record_output)
# At the end return processed records.
return firehose_records_output
람다 함수를 생성하고 설정 하였으면, Inline Parser와 같이 S3 Prefix에서 람다함수의 키를 사용하겠다고 지정해 주면 된다.
#S3 Prefix
customerId=!{partitionKeyFromLambda:customerId}/year=!{partitionKeyFromLambda:year}/month=!{partitionKeyFromLambda:month}/day=!{partitionKeyFromLambda:day}/hour=!{partitionKeyFromLambda:hour}/
S3의 파티셔닝은 S3의 성능 향상을 위해서 중요하게 설정되어야 하기 때문에 위의 Dynamic Partitioning을 이용하면 S3에 저장되는 데이터 들을 필요에 맞게 더 효율적으로 사용 할 수 있을 것 같다.
'Data Engineering > Kinesis' 카테고리의 다른 글
Kinesis Firehose - Lambda를 이용한 Dynamic Partitioning구현 (0) | 2023.08.27 |
---|