MSK 클러스터에 S3 sink connector 연결해서 S3에 데이터 적재하기
MSK 클러스터에 S3 sink connector 연결해서 S3에 데이터 적재하기
💡 MSK Connector는 연결하고자 하는 MSK Cluster의 인증방식으로 인증없음이나 IAM 인증만 지원한다. MSK에서 S3 Sink Connector를 사용해서 S3에 접근하기 위해서는 Connector 생성 전에 아래 두개의 리소스가
devpongi.tistory.com
이전에 작성했던 블로그 글처럼 커넥터를 설정해서 사용 중 이었는데, 연속적으로 데이터가 들어오지 않는 토픽의 경우에는 Flush size가 매번 채워지지 않아 S3에 데이터가 업로드 되는데 지연이 발생하였다.
그래서 이번에 S3 업로드 설정을 추가하면서, S3 커넥터에서의 Flush 부분을 정리해 보려고 한다.
Rotation Strategy
S3 Sink Connector에서는 파티션 파일을 flush하고 S3에 Object를 업로드하는 시점을 결정하는 것을 rotation strategy라고 한다. 여기에는 몇가지 전략이 있고, 이들은 함께 사용되기도 한다.
Maximum number of records
- flush.size 속성으로 설정
- 한개의 파일에 기록되는 최대 레코드 수 지정
- 업로드 되는 파일마다 정해진 수의 레코드로 기록할 수 있지만, Flush size가 충족되어야만 파일이 업로드 됨
- MSK에서는 해당 값을 제외하면 오류 발생함
Maximum span of record time
- rotate.interval.ms 속성으로 설정
- 파일이 추가적으로 레코드를 받을 수 있는 최대 시간 범위를 설정
- 파티셔너의 timestamp.extractor 에 의해 결정된 파일의 첫번째 record timestamp이 시작 지점
- 레코드의 timestamp가 지정된 시간범위를 넘어가는 경우, 열려있는 파일을 S3에 업로드 하고 새로운 파일에 기록
- 시간 범위를 벗어나는 레코드가 유입되기 전까지 파일은 계속 열려있음
Scheduled rotation
- rotate.schedule.interval.ms 속성으로 설정
- 파일이 추가적으로 레코드를 받을 수 있는 최대 시간 범위를 설정
- rotate.interval.ms 과는 달리 첫번째 레코드의 system time이 시작지점
- 레코드의 양과 상관없이, system time에 따라 스케쥴된 시간에 커밋 발생
- timezone 속성이 필수로 지정되어 있어야 함
Connector 설정
rotate.schedule.interval.ms 속성을 이용하여 데이터가 연속적으로 유입되지 않더라도 스케쥴에 따라 파일을 업로드 하도록 설정한다.
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=ap-northeast-2 # s3 region
partition.duration.ms=600000 # time-based partitioner의 파티션 주기
topics.dir=kafka_backup # s3에 데이터를 저장할 디렉토리 경로
flush.size=100 # connector buffer 사이즈
schema.compatibility=NONE # 스키마 호환
tasks.max=1 # 실행 태스크 수, Partition수보다 많아지면 resource 낭비 발생
topics=kafka-test-topic # connector를 연결할 kafka 토픽
timezone=UTC # 타임존. KST는 오류 발생
locale=en-US # 마찬가지로 ko-KR은 오류 발생
key.converter.schemas.enable=false
format.class=io.confluent.connect.s3.format.json.JsonFormat # S3에 저장되는 파일의 포맷
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
path.format=YYYY/MM/dd/HH # 파티셔닝 path 포맷
timestamp.extractor=RecordField # 파티셔닝에 사용되는 time의 종류. RecordField는 데이터에서 추출해서 쓴다는 것
s3.bucket.name=kafka-backup-bucket # 목적지 S3 버킷
timestamp.field=timestamp # RecordField로 지정하는 key 값
rotate.schedule.interval.ms=60000 # 주기적으로 파일을 업로드 하도록 스케쥴 설정
'Data Engineering > Kafka' 카테고리의 다른 글
MSK 클러스터에 S3 sink connector 연결해서 S3에 데이터 적재하기 (0) | 2023.11.12 |
---|