티스토리 뷰
[FLINK] kubernetes 환경에서 퍼시스턴트 볼륨 연동하기 - HA, checkPoint, savePoint
정선생 2021. 11. 5. 09:00Flink 는 스트림데이터를 다룰수 있는 플랫폼이다. High-Avaliability(HA)를 구성하거나 상태정보를 저장하려면 공유스토리지가 필수인데 보통 hdfs 나 s3 를 쓰는 경우가 많다. (예제로 보통 s3나 hdfs 로 되어있고)
그런데, 아마존 환경이 아니라면 s3는 안쓸거고 하둡클러스터를 운영하지 않는다면 hdfs를 쓸수없다.
만약, 아마존을 안쓰고 서버에서 카프카와 ES만 연동할때 HA 와 이력정보를 남긴다면 어떻게 해야할까? (=즉, hdfs, s3사용불가)
사실 쿠버네티스환경에서는 퍼시스턴트 볼륨 이 존재한다. 공유도 가능하고 영구 저장도 가능하다.
그래서 HA 이력정보나 savepoint, checkpoint 정보를 담는용도로 사용가능하다. (참고로 나는 ceph 를 연동해서 사용했다)
문제는 kubectl 이 아닌 kubernetes-session.sh 를 이용해 실행하기 때문에, 이 설정을 어떻게 할 수 있냐는게 문제였다 -_-;;;
yaml 은 어디에?
flink 도큐먼트의 Native Kubernetes 를 참고하면 다음과 같이 실행하는걸로 가이드 되어있다.
보통 쿠버네티스에 배포하는 방식은 kubectl 명령을 통해 yaml 구성한걸 등록하는걸 생각한다.
그런데, flink 공식문서의 예시를 보면 다음과 같이 flink에서 다운받은 바이너리에 있는 쉘스크립트를 통해 기동한다는것이다.
그래서 볼륨을 어떻게 연결해야하나? 처음에 헤맸다.
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dexecution.attached=true
볼륨을 마운트 해야 flink-conf.yaml 설정에 이력정보를 담아주는 경로를 지정할텐데 어떻게 해야할까?
- high-availability.storageDir
- state.checkpoints.dir
- state.savepoints.dir
tempate 옵션?
우선 퍼시던트볼륨은 생성했다고 가정한다. (이름은 flink-volume)
그럼 볼륨 마운트를 해서 연결해줘야하는데, 다행히 kubernetes-session.sh 실행명령어의 옵션에 template 형태의 pod yaml 을 제공하는 "kubernetes.pod-template-file" 옵션이 존재한다.
실행예시
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dexecution.attached=true
-Dkubernetes.pod-template-file=./template-ceph.yaml
template-ceph.yaml
볼륨을 마운트 할때, initContainers 에 권한을 제공하는 명령어가 있는데...
기본 마운트를 하면 root 권한으로 생기는데 flink 폴더의 권한은 flink 유조로 되어있다.
그래서 기냥쓰면 권한문제로 상태정보를 못남기는 문제가 존재한다. 그래서 777 권한을 주는걸 구성했으니 참고하자.
apiVersion: v1
kind: Pod
metadata:
name: jobmanager-pod-template
spec:
initContainers:
- name: flink-init
image: busybox:latest
command: ["/bin/sh", "-c"]
args:
- >
chmod -R 777 /data
volumeMounts:
- mountPath: /flink-ext-lib
name: flink-ext-lib
- mountPath: /data/ha
name: ceph
subPath: ha
- mountPath: /data/checkpoints
name: ceph
subPath: cehckpoints
- mountPath: /data/savepoints
name: ceph
subPath: savepoints
containers:
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /data
name: ceph
volumes:
- name: ceph
persistentVolumeClaim:
claimName: flink-volume # pasta 에서 생성할때 이름
readOnly: false # 이력상태를 저장해야하므로 write 가능해야한다.
flink-conf.yaml
/data/ 하위 경로를 기준으로 마운트 했으므로, 위와 같이 flink-conf.yaml 설정에서 공유스토리지와 ha 클래스를 지정하면 된다.
꼭 file:// 을 헤더로 붙여야 하니 생략하지 말자.
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///data/ha
state.checkpoints.dir: file:///data/checkpoints
state.savepoints.dir: file:///data/savepoints
내가 아무리 구글링해도 관련된 정보가 안나와서 삽질했는데 비슷한 고민을 한다면 도움이 되었으면 한다.
flink 는 중국인 개발자가 참여를 많이 해서 그런지 검색해보면 중국어 페이지도 많이 나오는데 한국어 블로그 포스팅은 많이 안나와서 더 헤맸던거 같다.
'데이터처리 > Flink' 카테고리의 다른 글
[Flink] op 필드 없는 "debezium-avro-confluent" 포맷 사용 방법 (0) | 2022.01.12 |
---|---|
[Flink] JDBC Connector 에서 "ORACLE" 연동하기 - flink 1.13 (1) | 2022.01.05 |
[오류] Flink에서 HDFS 데이터 Sink 할때 inprogress 로 flush 안되는 현상 (0) | 2021.11.12 |
[오류] Flink SQL 에서 HIVE 데이터를 조회 못하는 문제 (0) | 2021.11.11 |
[세팅] Flink SQL 에서 HIVE Connector 연동하기 - hive 2.3.6 (0) | 2021.11.10 |