💡 MSK Connector는 연결하고자 하는 MSK Cluster의 인증방식으로 인증없음이나 IAM 인증만 지원한다.
MSK에서 S3 Sink Connector를 사용해서 S3에 접근하기 위해서는 Connector 생성 전에 아래 두개의 리소스가 먼저 생성되어야 한다.
- S3 서비스 VPC 엔드포인트
- Connector에 부여할 IAM Role
S3 서비스 VPC 엔드포인트 생성
VPC Console -> Virtual private cloud -> endpoints 접속 -> endpoint 생성
- Service에서 s3 검색
- Type이 Gateway인 서비스 선택
- VPC: msk cluster가 있는 VPC 선택
- 라우팅 테이블: msk cluster에 연결된 라우팅 테이블 선택
- 정책: 전체 엑세스
Kafka Connector에 연결할 IAM Role 생성
- 신뢰정책
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "kafkaconnect.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
- IAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListAllMyBuckets",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:DeleteObject",
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts",
"s3:ListBucketMultipartUploads"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:ap-northeast-2:{account-number}:cluster/{msk-cluster-name}/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:ap-northeast-2:{account-number}:topic/{msk-cluster-name}/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:ap-northeast-2:{account-number}:group/{msk-cluster-name}/*"
]
}
]
}
MSK Connector 생성
MSK Connector는 Connector name이 MSK Cluster의 Consumer group으로 작동한다. 따라서, 기존의 connector를 삭제하고 동일한 이름의 connector를 생성하면 동일한 consumer group이기 때문에 삭제된 connector의 오프셋을 이어서 데이터를 가져오게 된다.
사용자 지정 플러그인 생성
- Confluent.io Amazon S3 Connector 에서 S3 sink connector 다운로드
- S3 버킷에 다운로드한 zip파일 업로드
- MSK Console → 사용자 지정 플러그인 → 생성
- S3 URI: 위에서 업로드한 파일 선택
- 사용자 지정 플러그인 이름 지정 (예시: confluentinc-kafka-connect-s3-10-5-5)
Connector 생성
MSK Console -> MSK Connect -> Connectors -> 생성
- Custom plugins: 위에서 생성한 플러그인 선택
- Apache Kafka cluster
- MSK Cluster
- Connector를 연결할 MSK Cluster 선택
- IAM 인증 선택
- 커넥터 구성 설정
connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=ap-northeast-2 # s3 region
partition.duration.ms=600000 # time-based partitioner의 파티션 주기
topics.dir=kafka_backup # s3에 데이터를 저장할 디렉토리 경로
flush.size=100 # connector buffer 사이즈
schema.compatibility=NONE # 스키마 호환
tasks.max=1 # 실행 태스크 수, Partition수보다 많아지면 resource 낭비 발생
topics=kafka-test-topic # connector를 연결할 kafka 토픽
timezone=UTC # 타임존. KST는 오류 발생
locale=en-US # 마찬가지로 ko-KR은 오류 발생
key.converter.schemas.enable=false
format.class=io.confluent.connect.s3.format.json.JsonFormat # S3에 저장되는 파일의 포맷
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
key.converter=org.apache.kafka.connect.storage.StringConverter
path.format=YYYY/MM/dd/HH # 파티셔닝 path 포맷
timestamp.extractor=RecordField # 파티셔닝에 사용되는 time의 종류. RecordField는 데이터에서 추출해서 쓴다는 것
s3.bucket.name=kafka-backup-bucket # 목적지 S3 버킷
timestamp.field=timestamp # RecordField로 지정하는 key 값
- 커넥터 용량: autoscaled 선택
- 작업자 구성: MSK 기본 구성 사용
- 로그 전송: Amazon CloudWatch 로그그룹 생성 후 선택
- Access permissions: 앞에서 생성한 role 선택
참고:
https://docs.aws.amazon.com/ko_kr/msk/latest/developerguide/mkc-tutorial-setup.html
https://velog.io/@joney0715/AWS-Amazon-MSK-S3-sink-connector-%EC%83%9D%EC%84%B1%ED%95%98%EA%B8%B0
'Data Engineering > Kafka' 카테고리의 다른 글
Kafka - S3 Sink Connector Rotation Strategy(Flush 설정) (0) | 2024.02.18 |
---|