티스토리 뷰
[Flink] PostgreSQL 연동시 Doesn't support Postgres type 'jsonb' yet 문제
정선생 2022. 1. 26. 08:00문제
Flink 에서 기본적으로 제공하는 JDBC Connector 는 Mysql , Derby, PostgreSQL 3개지를 지원한다.
나는 sql-client.sh 를 통해서 쿼리기반으로 데이터를 다루는걸 자주 이용하는데 카탈로그 등록이 되서 바로 테이블을 접근할수 있어 연동하기 좋은데, 문제는 jsonb 타입을 지원하지 않는다.
추가로 내가 발견한 미지원되는 필드타입은 uuid 와 timestampz 필드이다.
Flink SQL> describe MyTable;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Doesn't support Postgres type 'jsonb' yet
Flink SQL> describe YouTable;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Doesn't support Postgres type 'uuid' yet
해결방법
가장 손쉬운건 uuid, json, jsonb 타입을 text 타입처럼 string으로 인지하도록 코드를 수정하는것이다.
내부적으로는 문자열은 LogicalType 타입값이 "VARCHAR" 로 인지되고 리턴되는 값은 String 타입이 기본으로 되어있다. 하지만 우리는 uuid, json, jsonb 타입도 문자열로 조회할 수 있게 했기 때문에, String 타입이 아닌 데이터는 문자열로 강제 변환하여 조회하는 방향으로 수정하면 조회가 가능하다.
https://github.com/jungkoo/flink/pull/3
사실, 내가 수정한 풀리퀘스트 링크가 있긴 한데 timestampz 타입의 timezone offset 문제는 해결이 안되었지만 필요한 사람을 위해서 우선 링크도 같이 제공하겠다.
결과확인
위와 같이 코드를 수정하고 재기동 후 sql-client.sh 에서 쿼리를 실행하면 다음과 같이 정상 동작된다.
-- myJson 필드가 jsob 타입인데 확인가능
Flink SQL> describe MyTable;
+-----------------------+------------------+-------+---------+--------+-----------+
| name | type | null | key | extras | watermark |
+-----------------------+------------------+-------+---------+--------+-----------+
| id | BIGINT | false | PRI(id) | | |
...생략...
| myJson | STRING | true | | | |
| type | STRING | false | | | |
+-----------------------+------------------+-------+---------+--------+-----------+
-- id 가 uuid 타입인데 확인가능
Flink SQL> describe YouTable;
+-----------------+--------------+-------+-------------+--------+-----------+
| name | type | null | key | extras | watermark |
+-----------------+--------------+-------+-------------+--------+-----------+
| id | STRING | false | PRI(id) | | |
| status | BIGINT | true | | | |
...생략...
+-----------------+--------------+-------+-------------+--------+-----------+
참고로 timestampz 타입 문제가 없다면 flink-jdbc-connector 만 빌드해서 lib 폴더에 빌드한걸 바꿔쳐서 실행하면 된다.
요즘에 자잘하게 안되는 기능이 많아서 self 수정하게 되는게 많은것 같다. 나중에 찜찜한 코드도 수정해서 contribute 하는게 좋을것 같다. (받아줄지는 모르겠지만)
'데이터처리 > Flink' 카테고리의 다른 글
[Flink] sql-client 에서 쿼리 결과가 안나오는 문제 - checkpoint (0) | 2022.02.05 |
---|---|
[Flink] extension (5) should not be presented in certificate_request 오류 - postgresql 연동시 (0) | 2022.01.27 |
[Flink] op 필드 없는 "debezium-avro-confluent" 포맷 사용 방법 (0) | 2022.01.12 |
[Flink] JDBC Connector 에서 "ORACLE" 연동하기 - flink 1.13 (1) | 2022.01.05 |
[오류] Flink에서 HDFS 데이터 Sink 할때 inprogress 로 flush 안되는 현상 (0) | 2021.11.12 |