티스토리 뷰

반응형

Flink 에서는 Savepoint 와 Checkpoint 라는 개념이 있는데, 복구를 할때 사용할 메타정보들을 활용한다. 

이 정보는 "state.checkpoint.dir" 이나 "state.savepoints.dir" 를 설정한 디렉토리의 하위에 _metadata 라는 파일로 존재한다. 이 파일을 확인해 보고 싶은경우가 있는데 다음과 같이 코드를 만들면 확인할 수 있다.

_metadata 파일 확인하기 (java)

소스코드를 확인하면 복호화 하는 메소드를 금방 찾을 수 있는데, 문제는 파일로 저장할때는 이 byte[] 앞에 "매직넘버"와 "버전정보"가 추가로 저장되기 때문에, 해당 값을 읽어야 그 이후 데이터를 정상 처리할 수 있다는 점만 주의하면 된다.

import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.*;
import java.io.DataInputStream;
import java.io.FileInputStream;
import static org.apache.flink.runtime.checkpoint.Checkpoints.HEADER_MAGIC_NUMBER;


public class Main {
    public static void main(String[] args) throws Exception {

        String metaPath = "/Users/tost/Downloads/_metadata"; // savepoint or checkpoint , _metadata 파일 
        FileInputStream fis = new FileInputStream(metaPath);
        DataInputStream dis = new DataInputStream(fis);

        // header_magic_number + version + [data...]
        int headerMagicNumber = dis.readInt();
        if (headerMagicNumber != HEADER_MAGIC_NUMBER) {
            throw new Exception("매직 해더가 다릅니다");
        }

        int version = dis.readInt();
        MetadataSerializer serializer = MetadataSerializers.getSerializer(version);

        CheckpointMetadata meta = serializer.deserialize(dis,
                CheckpointMetadata.class.getClassLoader(),
                null
        );

        System.out.println("getCheckpointId()          : " + meta.getCheckpointId());
        System.out.println("getMasterStates().size()   : " + meta.getMasterStates().size());
        int idx = 0;
        for(MasterState masterState : meta.getMasterStates()) {
            System.out.println("["+idx+"]" + masterState);
            idx+=1;
        }
        System.out.println("getOperatorStates().size() : " + meta.getOperatorStates().size());
        idx = 0;
        for(OperatorState operatorState : meta.getOperatorStates()) {
            System.out.println("["+idx+"].operatorState       : " + operatorState);
            idx+=1;
        }
        dis.close();
        fis.close();
    }
}

 

위와 같이 코드를 돌려보면 아래와 같은 형태의 결과를 확인할 수 있다.

getCheckpointId()          : 26416
getMasterStates().size()   : 0
getOperatorStates().size() : 5
[0].operatorState       : OperatorState(operatorID: cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 1, maxParallelism: 128, coordinatorState: 73 bytes, sub task states: 1, total size (bytes): 447)
[1].operatorState       : OperatorState(operatorID: ba40499bacce995f15693b1735928377, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)
[2].operatorState       : OperatorState(operatorID: 570f707193e0fe32f4d86d067aba243b, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)
[3].operatorState       : OperatorState(operatorID: 7f86b06891c19f1e76c3f65c90ce752b, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)
[4].operatorState       : OperatorState(operatorID: cf155f65686cb012844f7c745ec70a3c, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)

 

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함