최근 검색량이 높은 키워드가 무엇인지를 알고 싶을때, flink 에서는 호핑윈도우(슬라이딩 윈도우) 기반으로 지정하고, 슬라이드 사이즈와 데이터 간격을 지정해서 로직을 유도하여 만드는걸 구성했다. 이해를 돕기위해 대충 쿼리를 표현하면 아래와 같다. 하지만, 처음에는 잘 동작하다가 어느순간 backpress 가 발생해서 데이터 지연으로 제대로 처리안되는 문제가 발생했다. INSERT INTO top_keyword_slide ...생략.... FROM TABLE( HOP( DATA => TABLE kafka_log, TIMECOL => DESCRIPTOR(log_time), SLIDE => INTERVAL '30' SECOND, SIZE => INTERVAL '10' MINUTES) ) WHERE valid..
flink 를 쿼리로 작성해서 insert select 형태로 로직이 실행되면 대시보드의 Running Job List 의 이름이 획일적이라 구분이 쉽지 않다. 특히 동일한 테이블에서 조건만 다르게 N개의 로직을 돌리면 작업이름으로는 구분이 안되는 문제가 존재한다. 이런 문제를 해결하기위해서 쿼리 실행전에 작업이름을 지정하여 해결하는것이 가능하다. insert-into_카탈로그명.데이터베이스명.테이블명 해결방법 다음과 같이 insert select 쿼리를 실행하기전에 SET 'pipeline.name' 형태로 이름을 지정하면 된다. 그러면 insert-into-default_catalog.default_database... 같은 이름이 아니라 사용자가 지정한 작업이름으로 등록된다. SET 'pipelin..
하둡에서 권한관리가 되어있을때 나타나는 문제이다. 다음과 같이 hive catalog 를 등록하려고 했는데 다음과 같은 오류가 발생한다. 참고로 커버로스 인증으로 구성된 하둡 클러스터 환경에서 flink 유저를 만들어서 사용한 케이스이고, 원인은 Ranger 에서 flink 유저는 hive 의 warehouse 패스의 hdfs 접근이 제한되어있기 때문에 나타나는 현상이다. Flink SQL> create catalog myHive with( 'type' = 'hive' , 'hive-conf-dir' = '/usr/hdp/3.3.0/hive/conf'); [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hive.metastore.ap..
HDFS 라이브러리가 없다? Flink 를 테스트로 운영할때는 HA 나 savepoint, checkpoint 정보가 필요없을수 있지만, 운영환경에서 관리하기위해서는 해당 정보가 남을수 있도록 해야하고, 이럴때 HDFS 를 연계해서 사용한다. 그러다보니 내가 사용하는 하둡클러스터의 환경에 영향을 받는다. 즉, 관련된 세팅을 했을때 아래와 같은 오류가 발생한다면 hdfs 연결을 못하는 상황일 확률이 크다. java.io.IOException: Could not create FileSystem for highly available storage path org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file sy..
Flink 를 Yarn 에서 동작할때 다음과 같은 메시지를 출력하며 무한 대기하는 상황이 있다. 이런 이유는 보통 컨테이너에서 리소스를 받아서 데몬을 기동해야하는데 받지 못하는 상황이다. 이때 몇가지 상황에 따라 해결방법이 다르다. 2022-11-30 11:31:33,275 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED 2022-11-30 11:32:33,422 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deployment took more than 60 seconds. Please check if the requested re..
flink sql 에서 kafka connector type 을 지원하고, document 에도 예제는 잘 정리되어있다. 하지만, Kafka 에 SCRAM-SHA-512 인증이 있어서 properties 값을 적용해야하는 경우 어떻게 해야하는지 예시가 없는듯 하다. 이걸 해결하는 방법을 알려주고자 한다. 보통 kafka consumer 를 직접 만들때, 다음과 같은 형태의 연결정보에 이용한다는걸 가정하고 예제를 들어보겠다. # 연결정보 bootstrap.servers=kafka.myhome.com:9093 schema.registry.url=http://schema.myhome.com:8081 # 인증관련설정 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_P..
참고로 이 문제는 Flink 1.14.x 버전대에서 JDK11 버전을 이용할때 아래와 같은 문제가 발생된다. 나같은 경우 hadoop 2.6.x 버전에 hive 2.3.6 을 사용하는데, JDK 호환성의 영향이 존재했었다. https://issues.apache.org/jira/browse/HIVE-22415 https://issues.apache.org/jira/browse/HADOOP-12760 $ java -version openjdk version "11.0.13" 2021-10-19 LTS OpenJDK Runtime Environment 18.9 (build 11.0.13+8-LTS) OpenJDK 64-Bit Server VM 18.9 (build 11.0.13+8-LTS, mixed mod..
증상 다음과 같이 Flink 에서 하둡클러스터의 yarn 기반에서 application 잘 invoke 되는지 테스트를 하려고 했는데, 다음과 같은 오류가 나는 경우가 발생했다. 참고로 오류메시지는 다르지만 기존에 "com/sun/jersey/core/util/FeaturesAndProperties" 클래스를 찾을수 없다며 오류를 냈던 케이스와 동일하게 해결이 가능했다. 2022.03.18 - [데이터처리/Flink] - [Flink] yarn-session.sh 오류 : java.lang.NoClassDefFoundError $ ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar ...생략....