문제 다음과 같이 sql-client.sh 상에서 PostgreSQL 을 연동하려고 했는데 다음과 같은 오류가 발생했다. 이건 flink 의 문제가 아니라, PostgreSQL 에서 연결할때 문제가 되는것이다. 가장 손쉬운 해결은 java 옵션을 추가해서 재실행하는것이다. Flink SQL> CREATE CATALOG myDB WITH( > 'type' = 'jdbc', > 'base-url' = 'jdbc:postgresql://127.0.0.1:26257', > 'default-database' = 'mydb', > 'username' = '유저', > 'password' = '암호' > ); [ERROR] Could not execute SQL statement. Reason: javax.net.s..

문제 Flink 에서 기본적으로 제공하는 JDBC Connector 는 Mysql , Derby, PostgreSQL 3개지를 지원한다. 나는 sql-client.sh 를 통해서 쿼리기반으로 데이터를 다루는걸 자주 이용하는데 카탈로그 등록이 되서 바로 테이블을 접근할수 있어 연동하기 좋은데, 문제는 jsonb 타입을 지원하지 않는다. 추가로 내가 발견한 미지원되는 필드타입은 uuid 와 timestampz 필드이다. Flink SQL> describe MyTable; [ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'jsonb' yet Fl..

데이터를 분석하기 위해서는 다양한 스토리지를 하나의 저장소로 모아야 하는 작업이 필요한 경우가 많다. 이때 처음에 데이터를 다 복사해와도 계속 변경되는 데이터가 생기기 때문에, 데이터를 동기화 해야 하는 이유가 생기기 마련이다. 이런문제를 해결하기위한 첫걸음은 변경분의 캡쳐 데이터를 (change data capture, CDC) 만들고, 이걸 어떻게 반영할것인가? 이것이 핵심이다. Flink 는 CDC 형태의 데이터 포맷 중 Debezium Format 을 기본지원한다. 이 포맷은 op, after, before 필드가 필수여야 한다. 근데 op 필드가 없는 CDC 포맷은 어떻게 처리할까? OP필드 없는 CDC 처리하기 사실 op 필드가 없어도 다음과 같이 추론이 가능하다. 그래서 OP 코드값을 찾을 ..
Flink 에서는 JDBC Connector 가 존재한다. 하지만 오피셜하게 지원되는 dbms는 총 3개 뿐이다. MySQL과 PostgreSQL 그리고 Derby ... https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ JDBC JDBC SQL Connector # Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append & Upsert Mode The JDBC connector allows for reading data from and writing data into any relational databases..

flink 에서 Kafka 의 데이터를 hdfs 에 Sink 테스트하는데 이상하게 inprogress 파일형태로 중간파일만 생성하고, 최종적으로 flush 가 안되는 상황으로 삽질했었는데, 결론부터 말하면 checkpoint 를 설정해야 한다. Kafka to HDFS 예제 kafka 의 토픽을 hdfs 의 avro 파일포맷으로 sink 하는 예제이고, 날짜, 시간 필드를 파티션으로 지정해서 폴더별로 저장되게 하는 실제로 자주 사용하는 심플한 패턴이다. 그런데 문제는 파일이 생성은 되는거 같은데... 하루가 지나도 파일을 최종 flush 처리 하지 않는느낌이다. ----------------------- -- source ----------------------- CREATE TABLE KafkaUser..

Flink 에서 Hive Connector 를 연동해서, 카탈로그에 등록하면 hive 의 데이터를 조회해볼 수 있다. 기본적으로는 flink 의 taskManager 의 슬롯안에서 데이터를 처리하기 때문에 (즉, M/R 이나 TEZ 엔진이 도는 yarn 기반아닌) 여유있는 슬롯보다 더 큰 슬롯요청을 하게되면 조회가 안되는 상황이 발생될 수 있다. 현상 : flink에서 hive 테이블 조회가 안되는 현상 결론부터 말하면, 나같은 경우는 hive 테이블을 조회하기 위해서 taskManager 의 슬롯이 50개나 요청되었는데, 실제 가용가능한 슬롯은 2개밖에 없어서 나타난 현상이었다. 명시적으로 쿼리 에러가 났으면 빨리 찾았을텐데... 다음과 같이 쿼리 요청했는데 기냥 대기상태로 있고 에러가 바로 나타나지 ..
Flink 에서 sql-client.sh 에서 쿼리기반으로 데이터를 가공하고, 저장(sink) 할 수 있다. 일반적으로 sink 용으로 많이 사용하는 스토리지는 hive 거나 elasticsearch 가 아닐까 싶다. 추가될 라이브러리는 "$FLINK_HOME/lib" 하위에 jar 파일을 복사하고 실행해야 한다. 필요한 라이브러리 세팅 flink 에 hive 를 연동하려면 hive 버전에 따라서 필요한 라이브러리가 다르다. 내가 사용하는 hive 의 버전은 2.3.6 이었기 때문에 다음과 같이 필요한 라이브러리를 다운로드 받아서 사용했다. 참고로, 라이브러리들은 "$FLINK_HOME/lib" 하위에 복사하면 된다. 만약, hive 버전이 다른걸 쓴다면 flink document 에서 필요한 의존파일을..

Flink 는 스트림데이터를 다룰수 있는 플랫폼이다. High-Avaliability(HA)를 구성하거나 상태정보를 저장하려면 공유스토리지가 필수인데 보통 hdfs 나 s3 를 쓰는 경우가 많다. (예제로 보통 s3나 hdfs 로 되어있고) 그런데, 아마존 환경이 아니라면 s3는 안쓸거고 하둡클러스터를 운영하지 않는다면 hdfs를 쓸수없다. 만약, 아마존을 안쓰고 서버에서 카프카와 ES만 연동할때 HA 와 이력정보를 남긴다면 어떻게 해야할까? (=즉, hdfs, s3사용불가) 사실 쿠버네티스환경에서는 퍼시스턴트 볼륨 이 존재한다. 공유도 가능하고 영구 저장도 가능하다. 그래서 HA 이력정보나 savepoint, checkpoint 정보를 담는용도로 사용가능하다. (참고로 나는 ceph 를 연동해서 사용했..