티스토리 뷰

반응형

데이터를 분석하기 위해서는 다양한 스토리지를 하나의 저장소로 모아야 하는 작업이 필요한 경우가 많다.

이때 처음에 데이터를 다 복사해와도 계속 변경되는 데이터가 생기기 때문에, 데이터를 동기화 해야 하는 이유가 생기기 마련이다. 이런문제를 해결하기위한 첫걸음은 변경분의 캡쳐 데이터를 (change data capture, CDC) 만들고, 이걸 어떻게 반영할것인가? 이것이 핵심이다.

 

Flink 는 CDC 형태의 데이터 포맷 중 Debezium Format 을 기본지원한다. 

이 포맷은 op, after, before 필드가 필수여야 한다. 근데 op 필드가 없는 CDC 포맷은 어떻게 처리할까?

 

OP필드 없는 CDC 처리하기

사실 op 필드가 없어도 다음과 같이 추론이 가능하다. 그래서 OP 코드값을 찾을 수 없으면 추론한 값을 활용하는걸로 로직을 추가하면 된다. 수정한 코드는 github 에 flink 1.14 기준으로 수정해 놓았다.

구분 before after
추가(Insert) null 값있음
수정(Update) 값있음 값있음
삭제(Delete) 값있음 null

코드 수정한 내용이 궁금하면 풀리퀘 날려놓은 참고하면 되고, 바로 사용해 보려는 사람은 아래 빌드방법을 참고하자.

빌드한 "flink-sql-avro-confluent-registry-1.14-SNAPSHOT.jar" 파일은 $FLINK_HOME/lib 폴더에 바꿔서 재기동하면된다.

https://github.com/jungkoo/flink/pull/2

# 빌드방법 (flink 1.14 버전기준으로 수정함)
git clone https://github.com/jungkoo/flink.git -b support-changefeed-format-1.14
cd flink/flink-formats
mvn clean package -Dmaven.test.skip=true

사용법은?

flink 의 sql-client.sh 에서 사용한다고 가정하면 쿼리는 아래와 같이 표현하면 된다.

스키마레지스트리를 연동한 confluent kafka 형태의 avro 를 입력값으로 사용한다고 가정하면 다음과 같이 선언해서 읽어 낼 수 있다. 주의할점은 "scan.startup.mode" 를 이용해 선언했지만, 실제로 사용할땐 offset 값을 kafka 상에서 유지하게 하는것이 좋으므로 "properties.group.id" 설정을 쓰는것이 좋다. 

 

이부분은 flink 의 debezium 포맷 사용하는 예시에 잘 나와있다.

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/debezium/

CREATE TABLE sample (  
  id              BIGINT,
  code            BIGINT,
  name            STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'crdb-sample',
 'properties.bootstrap.servers' = '127.0.0.1:9093',
 'scan.startup.mode' = 'earliest-offset', 
 'format' = 'debezium-avro-confluent',
 'debezium-avro-confluent.url' = 'http://127.0.0.1:8081'
);

수정한 코드를 쓰면 debezium 포맷도 되고, op 필드가 없이 after, before 포맷만 있는 형태의 데이터 처리도 가능해진다.

실제 real 환경에서 쓰려면 코드를 더 다듬어야 겠지만, 내가 사용하는 사례에서 동작하는지 테스트하기엔 충분할것으로 보인다.

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함