본문 바로가기
Data Engineering/Flink

Flink Datastream 변환 연산 정리 - KeyedStream 변환 연산

by 홍띠 2023. 7. 2.

Flink Datastream 변환 연산 정리 - 기본 변환 의 글에서 이어지는 글이다.


KeyedStream 변환 연산은 입력레코드의 특정 Key를 기준으로 레코드를 분류해서 처리하는 연산이다. 

DataStream으로 들어오는 입력 레코드를 keyBy 연산을 이용해서 각 key의 값별로 분류된 KeyedStream으로 내보낸다.

 

KeyBy 연산을 자세하게 알아보기에 앞서, 이전 글과 마찬가지로 이번 글에서 작성할 예제 코드 들은 아래와 같은 형식을 갖는 Json 타입의 DataStream을 입력 이벤트로 받아 처리하는 코드이다.

{
    "meta": {
        "time": 1687140191352,
        "message_id": "msg-230619110311352"
    },
    "event": {
          "event_type": 1001,
          "data": [
              {
                  "name": "DEVICE_STATE",
                  "event_id": 1745135,
                  "value": "1"
              }
          ],
          "device_time": 1687140191259
      }
}

KeyBy

KeyBy 연산은 입력 레코드를 받아 지정된 Key로 입력 레코드를 분류하여 KeyedStream으로 출력한다. SQL에서 GroupBy 연산과 비슷한 역할을 한다고 이해했다.

만약 유입되는 데이터의 Key인 "event_type"이 1001, 1002, 1003 세개의 이벤트 타입이 있다고 하면, KeyBy 연산을 마친 KeyedStream에 sum, min과 같은 연산을 했을때 각 이벤트 타입별로 계산이 된다.

 

KeyBy 연산은 람다식으로 간단하게 구현 가능하다.

아래 코드는 입력 레코드에서 "event_type"을 문자열로 추출하여, event_type의 값을 기준으로 분류하도록 한다. 그럼 출력되는 레코드는 KeyedStream 타입으로 출력된다. 

KeyedStream<JsonObject> keyByOutput = input
                .map(value -> value.get("event").getAsJsonObject().get("event_type").getAsString());

KeyBy 연산으로 출력된 KeyedStream에는 아래와 같은 연산들이 가능하다.

  • Reduce
  • Aggregations (sum, min, max 등)
  • Window

이외에도 Map, FlatMap 등 기본연산을 포함해 다양한 연산들이 가능하다. 이 중에서 Window 연산은 추후에 따로 글을 쓸 예정이니, 여기서는 다루지 않을 예정이다.

 

아래와 같은 튜플 형태의 KeyedStream을 생성했을때 이후에 여러가지 연산들이 가능하다.

KeyedStream<Tuple2<String, Integer>> keyedStream = env.fromElements(
            new Tuple2<>("A", 1),
            new Tuple2<>("B", 2),
            new Tuple2<>("A", 3),
            new Tuple2<>("B", 4)
    ).keyBy(0);

Reduce

사용자가 정의한 Reduction 함수를 사용해서 동일한 키에 속한 데이터를 결합한다.

DataStream<Tuple2<String, Integer>> output = keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

첫번째 필드를 기준으로 분류된 KeyedStream에 Reduce 연산을 적용하여 두개의 정수값을 받아 더한 결과를 반환하도록 한다. Reduce 연산은 각 키별로 데이터를 누적하여 결과를 출력한다.

("A",4), ("B",6)

Aggregations

KeyedStream의 키 별로 데이터를 집계한다. 집계연산에는 sum(), min(), max(), minBy(), maxBy() 등이 포함 된다.

 

아래는 sum()과 max() 집계연산 예시이다.

// 합계 집계 연산
DataStream<Tuple2<String, Integer>> sumStream = keyedStream.sum(1);

// 최대값 집계 연산
DataStream<Tuple2<String, Integer>> maxStream = keyedStream.maxBy(1);

첫번째 필드를 기준으로 분류된 KeyedStream에 두번째 필드의 값을 각각 합계를 내거나 최대값을 가져오는 연산을 적용했다. 각 연산은 각각 아래와 같은 결과를 출력한다.

//sum()연산 결과
("A", 4), ("B", 6)

//maxBy() 연산 결과
("A", 3), ("B", 4)