티스토리 뷰
반응형
Flink 에서는 Savepoint 와 Checkpoint 라는 개념이 있는데, 복구를 할때 사용할 메타정보들을 활용한다.
이 정보는 "state.checkpoint.dir" 이나 "state.savepoints.dir" 를 설정한 디렉토리의 하위에 _metadata 라는 파일로 존재한다. 이 파일을 확인해 보고 싶은경우가 있는데 다음과 같이 코드를 만들면 확인할 수 있다.
_metadata 파일 확인하기 (java)
소스코드를 확인하면 복호화 하는 메소드를 금방 찾을 수 있는데, 문제는 파일로 저장할때는 이 byte[] 앞에 "매직넘버"와 "버전정보"가 추가로 저장되기 때문에, 해당 값을 읽어야 그 이후 데이터를 정상 처리할 수 있다는 점만 주의하면 된다.
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.*;
import java.io.DataInputStream;
import java.io.FileInputStream;
import static org.apache.flink.runtime.checkpoint.Checkpoints.HEADER_MAGIC_NUMBER;
public class Main {
public static void main(String[] args) throws Exception {
String metaPath = "/Users/tost/Downloads/_metadata"; // savepoint or checkpoint , _metadata 파일
FileInputStream fis = new FileInputStream(metaPath);
DataInputStream dis = new DataInputStream(fis);
// header_magic_number + version + [data...]
int headerMagicNumber = dis.readInt();
if (headerMagicNumber != HEADER_MAGIC_NUMBER) {
throw new Exception("매직 해더가 다릅니다");
}
int version = dis.readInt();
MetadataSerializer serializer = MetadataSerializers.getSerializer(version);
CheckpointMetadata meta = serializer.deserialize(dis,
CheckpointMetadata.class.getClassLoader(),
null
);
System.out.println("getCheckpointId() : " + meta.getCheckpointId());
System.out.println("getMasterStates().size() : " + meta.getMasterStates().size());
int idx = 0;
for(MasterState masterState : meta.getMasterStates()) {
System.out.println("["+idx+"]" + masterState);
idx+=1;
}
System.out.println("getOperatorStates().size() : " + meta.getOperatorStates().size());
idx = 0;
for(OperatorState operatorState : meta.getOperatorStates()) {
System.out.println("["+idx+"].operatorState : " + operatorState);
idx+=1;
}
dis.close();
fis.close();
}
}
위와 같이 코드를 돌려보면 아래와 같은 형태의 결과를 확인할 수 있다.
getCheckpointId() : 26416
getMasterStates().size() : 0
getOperatorStates().size() : 5
[0].operatorState : OperatorState(operatorID: cbc357ccb763df2852fee8c4fc7d55f2, parallelism: 1, maxParallelism: 128, coordinatorState: 73 bytes, sub task states: 1, total size (bytes): 447)
[1].operatorState : OperatorState(operatorID: ba40499bacce995f15693b1735928377, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)
[2].operatorState : OperatorState(operatorID: 570f707193e0fe32f4d86d067aba243b, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)
[3].operatorState : OperatorState(operatorID: 7f86b06891c19f1e76c3f65c90ce752b, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)
[4].operatorState : OperatorState(operatorID: cf155f65686cb012844f7c745ec70a3c, parallelism: 1, maxParallelism: 128, coordinatorState: (none), sub task states: 1, total size (bytes): 0)
반응형
'데이터처리 > Flink' 카테고리의 다른 글
Flink 1.15 + Hive 충돌해결 - org.apache.flink.table.planner.delegation.ParserFactory (0) | 2022.09.13 |
---|---|
[FLINK] Avro 포맷에서 TO_TIMESTAMP_LTZ 사용시 정밀도 오류 (0) | 2022.07.15 |
[FLINK] TIMESTAMP vs TIMESTAMP_LTZ 필드 타입 차이 (0) | 2022.06.10 |
[Flink] Explode 쿼리 표현하기 - Cross join unnest (0) | 2022.05.31 |
Flink Application 개발시 Avro 라이브러리 충돌 문제 해결방법 (0) | 2022.05.27 |
댓글