티스토리 뷰
flink 에서 Kafka 의 데이터를 hdfs 에 Sink 테스트하는데 이상하게 inprogress 파일형태로 중간파일만 생성하고, 최종적으로 flush 가 안되는 상황으로 삽질했었는데, 결론부터 말하면 checkpoint 를 설정해야 한다.
Kafka to HDFS 예제
kafka 의 토픽을 hdfs 의 avro 파일포맷으로 sink 하는 예제이고, 날짜, 시간 필드를 파티션으로 지정해서 폴더별로 저장되게 하는 실제로 자주 사용하는 심플한 패턴이다. 그런데 문제는 파일이 생성은 되는거 같은데... 하루가 지나도 파일을 최종 flush 처리 하지 않는느낌이다.
-----------------------
-- source
-----------------------
CREATE TABLE KafkaUserMoney (
the_kafka_key STRING,
logTime TIMESTAMP(0),
name STRING,
money BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'user-money',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '192.1.2.3:9092',
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://192.1.2.4:8081',
'value.avro-confluent.schema-registry.subject' = 'user',
'value.fields-include' = 'EXCEPT_KEY'
);
-----------------------
-- sink
-----------------------
CREATE TABLE HdfsUserMoney (
name STRING,
money BIGINT,
ymd STRING,
hh24 STRING
) PARTITIONED BY (ymd, hh24) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://myHadoop/user/output/user_money',
'format' = 'avro'
);
INSERT INTO HdfsUserMoney
SELECT
name. ,
money ,
DATE_FORMAT(logTime, 'yyyy-MM-dd') AS ymd,
DATE_FORMAT(logTime, 'HH00') AS hh24
FROM
KafkaUserMoney
;
문제 : inpregress 상태로 완료안됨
작업을 보면, 계속 파티션은 늘어가는데 파일이 보면 알겠지만 "part-XXX.inpgoress.XXX" 같은 패턴으로 파일이 생성되는데 이게 완료처리가 안된거다.
% hadoop fs -ls hdfs://myHadoop/user/output/user_money/*/*
Found 1 items
-rw-r--r-- 3 user hdfs 2173 2021-10-12 19:49 hdfs://myHadoop/user/output/user_money/ymd=2021-10-05/hh24=11/.part-9bb4ef5f-f6b1-4705-a448-d846eb8e0df0-0-0.inprogress.bba76c91-c2f0-4c22-9f9a-8a98e90c75bc
Found 1 items
-rw-r--r-- 3 user hdfs 1776 2021-10-12 19:49 hdfs://myHadoop/user/output/user_money/ymd=2021-10-05/hh24=12/.part-9bb4ef5f-f6b1-4705-a448-d846eb8e0df0-0-1.inprogress.60d690d1-d588-4c2b-914c-4fccfcdd29ff
Found 1 items
-rw-r--r-- 3 user hdfs 6221 2021-10-12 19:49 hdfs://myHadoop/user/output/user_money/ymd=2021-10-05/hh24=14/.part-9bb4ef5f-f6b1-4705-a448-d846eb8e0df0-0-2.inprogress.00d33d3b-f987-49d3-846e-141f5e3a0a06
Found 1 items
...
구글링을 열심히 해보니 check point 가 비활성화 되었을때 나오는 현상이라는 말이 있다.
참고로, checkpoint 는 stream 데이터를 처리할때 장애를 대비해서 어디까지 처리했는지 상태정보를 주기적으로 남기는작업을 의미한다. (명시적으로 사용자가 작업종료하면서 상태저장하는건 savepoint 라고 한다)
즉, flink 에서 무한의 스트림데이터를 어디까지 처리했는지 중간정산(?)을 하는 과정이 존재해야 hdfs 상에 파일을 저장하는걸 마무리 작업을 할수 있다는 말이 되는듯 하다.
checkpoint 설정하기
flink-conf.yaml 의 설정에 "execution.checkpointing.interval" 의 주기를 세팅해주면 된다.
이 설정파일에 기본으로 주석으로 되어있지만 자주 쓰이는 설정을 확인 할 수 있는데, 이 설정은 거기에 없고 구글링해도 명확하게 잘 안나와서 더 헤맸던것 같다. 참고로 해당값을 10분으로 세팅한 이유는 cloudera 문서 설명에 10분으로 세팅했길래 기냥 따라했다. 잘모르면 기냥 따라하는게 국룰...
https://docs.cloudera.com/csa/1.4.0/development/topics/csa-checkpoint.html
- flink-conf.yaml : 10분으로 세팅
...생략...
execution.checkpointing.interval: 600000
위와 같이 관련된 설정을 추가하고, 쿼리를 다시 등록하고 대시보드의 JOB 정보를 확인해보면,
Checkpoints 의 Checkpoint Mode 가 Exactly Onece 로 변경되어 있고, Interval 설정값이 반영된것도 확인가능하다.
그리고 좀 기다리면서 hdfs 경로의 파일을 모니터링해보면 inprogress 상태의 파일이 다음과 같이 flush 되서 정상적으로 저장되는걸 확인 할 수 있다.
hdfs://myHadoop/user/output/user_money/ymd=2021-10-05/hh24=11/part-d962d7b5-ba02-4e76-871d-1cd9bf87e158-0-39
'데이터처리 > Flink' 카테고리의 다른 글
[Flink] op 필드 없는 "debezium-avro-confluent" 포맷 사용 방법 (0) | 2022.01.12 |
---|---|
[Flink] JDBC Connector 에서 "ORACLE" 연동하기 - flink 1.13 (1) | 2022.01.05 |
[오류] Flink SQL 에서 HIVE 데이터를 조회 못하는 문제 (0) | 2021.11.11 |
[세팅] Flink SQL 에서 HIVE Connector 연동하기 - hive 2.3.6 (0) | 2021.11.10 |
[FLINK] kubernetes 환경에서 퍼시스턴트 볼륨 연동하기 - HA, checkPoint, savePoint (0) | 2021.11.05 |