티스토리 뷰

반응형

Flink 는 스트림데이터를 다룰수 있는 플랫폼이다. High-Avaliability(HA)를 구성하거나 상태정보를 저장하려면 공유스토리지가 필수인데 보통 hdfs 나 s3 를 쓰는 경우가 많다. (예제로 보통 s3나 hdfs 로 되어있고)

 

그런데, 아마존 환경이 아니라면 s3는 안쓸거고 하둡클러스터를 운영하지 않는다면 hdfs를 쓸수없다.

만약, 아마존을 안쓰고 서버에서 카프카와 ES만 연동할때 HA 와 이력정보를 남긴다면 어떻게 해야할까? (=즉, hdfs, s3사용불가)

 

사실 쿠버네티스환경에서는 퍼시스턴트 볼륨 이 존재한다. 공유도 가능하고 영구 저장도 가능하다. 

그래서 HA 이력정보나 savepoint, checkpoint 정보를 담는용도로 사용가능하다. (참고로 나는 ceph 를 연동해서 사용했다)

문제는 kubectl 이 아닌 kubernetes-session.sh 를 이용해 실행하기 때문에, 이 설정을 어떻게 할 수 있냐는게 문제였다 -_-;;;

yaml 은 어디에?

flink 도큐먼트의 Native Kubernetes 를 참고하면 다음과 같이 실행하는걸로 가이드 되어있다.

보통 쿠버네티스에 배포하는 방식은 kubectl 명령을 통해 yaml 구성한걸 등록하는걸 생각한다.

그런데, flink 공식문서의 예시를 보면 다음과 같이 flink에서 다운받은 바이너리에 있는 쉘스크립트를 통해 기동한다는것이다.

그래서 볼륨을 어떻게 연결해야하나? 처음에 헤맸다.

$ ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    -Dexecution.attached=true

볼륨을 마운트 해야 flink-conf.yaml 설정에 이력정보를 담아주는 경로를 지정할텐데 어떻게 해야할까?

  • high-availability.storageDir
  • state.checkpoints.dir
  • state.savepoints.dir

tempate 옵션?

우선 퍼시던트볼륨은 생성했다고 가정한다. (이름은 flink-volume)

그럼 볼륨 마운트를 해서 연결해줘야하는데, 다행히 kubernetes-session.sh 실행명령어의 옵션에 template 형태의 pod yaml 을 제공하는 "kubernetes.pod-template-file" 옵션이 존재한다. 

실행예시

$ ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    -Dexecution.attached=true
    -Dkubernetes.pod-template-file=./template-ceph.yaml

 

template-ceph.yaml

볼륨을 마운트 할때, initContainers 에 권한을 제공하는 명령어가 있는데...

기본 마운트를 하면 root 권한으로 생기는데 flink 폴더의 권한은 flink 유조로 되어있다.

그래서 기냥쓰면 권한문제로 상태정보를 못남기는 문제가 존재한다. 그래서 777 권한을 주는걸 구성했으니 참고하자.

apiVersion: v1
kind: Pod
metadata:
  name: jobmanager-pod-template
spec:
  initContainers:
    - name: flink-init
      image: busybox:latest      
      command: ["/bin/sh", "-c"]
      args:
        - >           
          chmod -R 777 /data 
      volumeMounts:
        - mountPath: /flink-ext-lib
          name: flink-ext-lib     
        - mountPath: /data/ha
          name: ceph
          subPath: ha
        - mountPath: /data/checkpoints
          name: ceph
          subPath: cehckpoints
        - mountPath: /data/savepoints
          name: ceph
          subPath: savepoints
  containers:    
    - name: flink-main-container
      resources:
        requests:
          ephemeral-storage: 2048Mi
        limits:
          ephemeral-storage: 2048Mi
      volumeMounts:
        - mountPath: /data
          name: ceph
  volumes:
    - name: ceph
      persistentVolumeClaim:
        claimName: flink-volume # pasta 에서 생성할때 이름
        readOnly: false # 이력상태를 저장해야하므로 write 가능해야한다.

 

flink-conf.yaml

/data/ 하위 경로를 기준으로 마운트 했으므로, 위와 같이 flink-conf.yaml 설정에서 공유스토리지와 ha 클래스를 지정하면 된다.

꼭 file:// 을 헤더로 붙여야 하니 생략하지 말자.

high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///data/ha
state.checkpoints.dir: file:///data/checkpoints
state.savepoints.dir: file:///data/savepoints

내가 아무리 구글링해도 관련된 정보가 안나와서 삽질했는데 비슷한 고민을 한다면 도움이 되었으면 한다.

flink 는 중국인 개발자가 참여를 많이 해서 그런지 검색해보면 중국어 페이지도 많이 나오는데 한국어 블로그 포스팅은 많이 안나와서 더 헤맸던거 같다.

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함