본문 바로가기
Data Engineering/Flink

Flink Datastream 변환 연산 정리 - 기본 변환

by 홍띠 2023. 6. 22.

이번에 Amazon Kinesis Data Analytics를 사용하면서 Flink를 공부하는 중에 많이 쓰이는 변환연산을 정리해 두려고 한다.

Flink를 공부하면서 “아파치 플링크로 하는 데이터 처리, 파비안 휴스케, 바실리키 칼라브리, 에이콘” 책을 참고하였고, 이 글에 작성되는 내용의 일부는 책의 내용을 가져 왔다.

변환 연산?
변환 연산은 말 그대로 DataStream을 입력받아서 여러 종류의 변환을 거친 뒤 입력 값과 다른형태의 출력값을 내보낸다. DataStream API 작성의 본질은 애플리케이션 로직을 구현하는 데이터플로우를 생성 할 때 여러 변환 연산을 조합하는 것 이라 할 수 있다. 거의 모든 변환 연산은 사용자 정의 함수를 바탕으로 하며, 변환 복적의 함수 인터페이스를 클래스로 구현해 함수를 정의 한다.

 

위에서 언급된 책에서는 Flink의 DataStream API 변환 연산을 아래와 같이 분류 한다.

  1. 각 이벤트별로 적용하는 기본 변환 연산
  2. Key별로 grouping된 이벤트에 적용하는 KeyedStream 변환 연산
  3. 여러 스트림을 하나의 스트림으로 병합하거나 하나의 스트림을 여러 스트림으로 분할하는 MultiStream 변환 연산
  4. 스트림 이벤트를 재구성하는 분산 변환 연산
  5. 윈도우 함수 같은 기타 특별 변환 연산

이번 글에서는 "각 이벤트별로 적용하는 기본 변환 연산"에 대해서 정리하려고 한다.


기본 변환 연산은 단일 입력 레코드가 독립적으로 각각의 출력 레코드를 생성한다. 이 연산들은 다른 연산 이후에도 사용되기 때문에 빈번하게 사용되는 기본 연산들이다.

기본 변환 연산에는 아래 세가지의 연산이 있고, 각각의 연산을 자세히 살펴보자.

  • Map
  • Filter
  • FlatMap

이번 글에서 작성할 예제 코드 들은 아래와 같은 형식을 갖는 Json 타입의 DataStream을 입력 이벤트로 받아 처리하는 코드이다.

{
    "meta": {
        "time": 1687140191352,
        "message_id": "msg-230619110311352"
    },
    "event": {
          "event_type": 1001,
          "data": [
              {
                  "name": "DEVICE_STATE",
                  "event_id": 1745135,
                  "value": "1"
              }
          ],
          "device_time": 1687140191259
      }
}

Map

MapFunction은 입력 데이터 타입과 출력 데이터 타입을 지정하게 되어있는 Interface이며, map() 메서드를 가지고 있다. map()은 하나의 입력값을 받아 하나의 출력 값을 만들어 내고, 입력과 출력의 데이터 타입은 다른 타입 일 수 있다.

아래는 입력 스트림에서 “data”필드의 첫번째 Json 데이터를 추출하도록 map을 정의하였다.

DataStream<JsonObject> output = input
                .map(new MapFunction<JsonObject, JsonObject>() {
                    @Override
                    public JsonObject map(JsonObject value) throws Exception {
                        JsonArray data = value.get("event").getAsJsonObject().get("data").getAsJsonArray();
                        return data.get(0).getAsJsonObject();
                    }
                });

“meta”필드만 추출하는 간단한 연산이라면 람다 함수로 표현해도 된다.

DataStream<JsonObject> output = input
                .map(element -> element.get("meta").getAsJsonObject());

Filter

Filter 연산은 데이터를 변경하지 않고 들어온 데이터를 조건에 따라 버리거나 그대로 내보내는 연산이다. 따라서 데이터타입의 변화가 없기 때문에 FilterFunction 인터페이스는 하나의 데이터 타입만 지정하며, filter() 메서드를 가지고 있다. filter는 사용자 정의 로직을 통해 조건을 판별하여 boolean값을 반환함으로써데이터를 버릴지 유지할지를 결정한다.

아래는 “event_type”이 1005인 데이터만 유지하고 나머지 타입의 데이터는 버리는 filter이다.

DataStream<JsonObject> output = input
                .filter(new FilterFunction<JsonObject>() {
                    @Override
                    public boolean filter(JsonObject value) throws Exception {
                        int eventType = value.get("event").getAsJsonObject().get("event_type").getAsInt();
                        return eventType == 1005;
                    }
                });

filter도 마찬가지로 간단한 연산은 람다로 표현해도 된다. 아래는 “meta”혹은 “event”필드가 없다면 데이터를 버리는 filter이다.

DataStream<JsonObject> output = input
                .filter(element -> element.has("meta")&& element.has("event") );

FlatMap

FlatMap 연산은 Filter와 Map을 일반화 한 변환 연산이다. Flatmap은 한개의 입력 레코드를 받아 0개,1개 혹은 여러개의 출력 레코드를 만들 수 있다. FlatMapFunction 인터페이스는 입력과 출력 데이터타입을 지정하고 입력값을 받아 0개 이상의 Collector 객체를 반환하는 flatMap() 메서드를 가지고 있다.

아래는 입력 스트림에서 “meta”와 “event”를 분리하여 2개의 레코드를 출력하는 flatMap이다.

DataStream<JsonObject> output = input
                .flatMap(new FlatMapFunction<JsonObject, JsonObject>() {
                    @Override
                    public void flatMap(JsonObject value, Collector<JsonObject> out) throws Exception {
                        out.collect(value.get("meta").getAsJsonObject());
                        out.collect(value.get("event").getAsJsonObject());
                    }
                });

flatMap 또한 다른 연산들과 마찬가지로 람다 함수로 표현 할 수도 있다.

 

혹은, 아래와 같이 클래스를 따로 생성해서 사용할 수도 있으며 이는 다른 연산자도 마찬가지 이다.

private static class parseFlatMap implements FlatMapFunction<JsonObject, JsonObject> {
    @Override
    public void flatMap(JsonObject value, Collector<JsonObject> out) throws Exception {
        out.collect(value.get("meta").getAsJsonObject());
        out.collect(value.get("event").getAsJsonObject());
    }
}
DataStream<JsonObject> output = input
                .flatMap(new parseFlatMap());

 

참고: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/