티스토리 뷰
데이터를 분석하기 위해서는 다양한 스토리지를 하나의 저장소로 모아야 하는 작업이 필요한 경우가 많다.
이때 처음에 데이터를 다 복사해와도 계속 변경되는 데이터가 생기기 때문에, 데이터를 동기화 해야 하는 이유가 생기기 마련이다. 이런문제를 해결하기위한 첫걸음은 변경분의 캡쳐 데이터를 (change data capture, CDC) 만들고, 이걸 어떻게 반영할것인가? 이것이 핵심이다.
Flink 는 CDC 형태의 데이터 포맷 중 Debezium Format 을 기본지원한다.
이 포맷은 op, after, before 필드가 필수여야 한다. 근데 op 필드가 없는 CDC 포맷은 어떻게 처리할까?
OP필드 없는 CDC 처리하기
사실 op 필드가 없어도 다음과 같이 추론이 가능하다. 그래서 OP 코드값을 찾을 수 없으면 추론한 값을 활용하는걸로 로직을 추가하면 된다. 수정한 코드는 github 에 flink 1.14 기준으로 수정해 놓았다.
구분 | before | after |
추가(Insert) | null | 값있음 |
수정(Update) | 값있음 | 값있음 |
삭제(Delete) | 값있음 | null |
코드 수정한 내용이 궁금하면 풀리퀘 날려놓은 참고하면 되고, 바로 사용해 보려는 사람은 아래 빌드방법을 참고하자.
빌드한 "flink-sql-avro-confluent-registry-1.14-SNAPSHOT.jar" 파일은 $FLINK_HOME/lib 폴더에 바꿔서 재기동하면된다.
https://github.com/jungkoo/flink/pull/2
# 빌드방법 (flink 1.14 버전기준으로 수정함)
git clone https://github.com/jungkoo/flink.git -b support-changefeed-format-1.14
cd flink/flink-formats
mvn clean package -Dmaven.test.skip=true
사용법은?
flink 의 sql-client.sh 에서 사용한다고 가정하면 쿼리는 아래와 같이 표현하면 된다.
스키마레지스트리를 연동한 confluent kafka 형태의 avro 를 입력값으로 사용한다고 가정하면 다음과 같이 선언해서 읽어 낼 수 있다. 주의할점은 "scan.startup.mode" 를 이용해 선언했지만, 실제로 사용할땐 offset 값을 kafka 상에서 유지하게 하는것이 좋으므로 "properties.group.id" 설정을 쓰는것이 좋다.
이부분은 flink 의 debezium 포맷 사용하는 예시에 잘 나와있다.
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/debezium/
CREATE TABLE sample (
id BIGINT,
code BIGINT,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'crdb-sample',
'properties.bootstrap.servers' = '127.0.0.1:9093',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = 'http://127.0.0.1:8081'
);
수정한 코드를 쓰면 debezium 포맷도 되고, op 필드가 없이 after, before 포맷만 있는 형태의 데이터 처리도 가능해진다.
실제 real 환경에서 쓰려면 코드를 더 다듬어야 겠지만, 내가 사용하는 사례에서 동작하는지 테스트하기엔 충분할것으로 보인다.
'데이터처리 > Flink' 카테고리의 다른 글
[Flink] extension (5) should not be presented in certificate_request 오류 - postgresql 연동시 (0) | 2022.01.27 |
---|---|
[Flink] PostgreSQL 연동시 Doesn't support Postgres type 'jsonb' yet 문제 (0) | 2022.01.26 |
[Flink] JDBC Connector 에서 "ORACLE" 연동하기 - flink 1.13 (1) | 2022.01.05 |
[오류] Flink에서 HDFS 데이터 Sink 할때 inprogress 로 flush 안되는 현상 (0) | 2021.11.12 |
[오류] Flink SQL 에서 HIVE 데이터를 조회 못하는 문제 (0) | 2021.11.11 |