참고로 아래와 같이 security.protocol 이 SASL_PLAINTEXT kafka 를 연동했을때 나타났던 문제이다. kafka 의 토픽을 맵핑한 ddl 문과 에러메시지 이다. beeline> CREATE EXTERNAL TABLE kafka_table ( foo STRING, bar STRING ) STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES ( "kafka.bootstrap.servers" = "10.1.1.1:9092", "kafka.topic" = "sample_topic", "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe", 'kafk..
hive 에서 kafka 를 쉽게 연동해서 사용할 수 있어서, 꽤 유용하게 쓸수 있다. 그런데, 하둡클러스터는 kerberos 인증을 사용하고 kafka 는 다른 인증방식을 사용할 경우 어떻게 사용해야하는지 readme.md 문서를 뒤져봐도 친절하지가 않다. 그래서 관련된 선언방법과 주의사항을 안내하려고 한다. https://github.com/apache/hive/blob/master/kafka-handler/README.md 문제점 우선 org.apache.hadoop.hive.kafka.KafkaStorageHandler 를 사용가능하다는 전제로 설명하도록 하겠다. 보통 kafka 의 인증정보를 다음과 같이 표현하여 사용하는데... 아래 정보를 table 선언할때 어떻게 맵핑하는지 정리가 안되어있어..
사실 Kafka 의 데이터를 활용할 때, KafkaStrems 를 사용해서 consumer 를 만들거나, ksql 을 쓰거나 혹은 spark 를 쓰는게 일반적이다. 하지만, 실시간성으로 데이터를 다루는게 아니라, 조금 지연되더라도 배치기반으로 처리하고, 그 결과를 아카이빙 할 수 있도록 유지보수하는 요구사항도 꽤 많은데 이럴때는 기냥 hive 에서 KafkaStorageHandler 를 사용하는게 훨씬 간편하다. 기본내용 hive 에서 KafkaStorageHandler 를 사용하는건 아래 문서를 참고하면 된다. https://github.com/apache/hive/blob/master/kafka-handler/README.md 보통 kafka 에 데이터를 적재할때, JSON 과 AVRO 포맷을 많이 ..
kafka 의 데이터를 다룰때 일반적으로 spark, flink, kafka-streams 등 다양한 도구를 사용하지만, 사실 배치기반으로 데이터를 가볍게 조회하고 배치를 돌려보기에는 hive 에서 kafka 를 직접 붙여보는게 가장 손쉽다. 어떻게 사용할까? 사실 이 내용은 readme 문서에도 잘 정리되어있다. https://github.com/apache/hive/blob/master/kafka-handler/README.md 만약, kafka 에 들어있는 파일포맷이 아래와 같은 json 포맷이라면, key 값을 테이블의 필드로 나열하면 된다. 예를 들어, sample_topic 에 아래와 같은 형태의 json 값이 존재한다면 {"name": "gildong", "age": 12, "address"..
flink sql 에서 kafka connector type 을 지원하고, document 에도 예제는 잘 정리되어있다. 하지만, Kafka 에 SCRAM-SHA-512 인증이 있어서 properties 값을 적용해야하는 경우 어떻게 해야하는지 예시가 없는듯 하다. 이걸 해결하는 방법을 알려주고자 한다. 보통 kafka consumer 를 직접 만들때, 다음과 같은 형태의 연결정보에 이용한다는걸 가정하고 예제를 들어보겠다. # 연결정보 bootstrap.servers=kafka.myhome.com:9093 schema.registry.url=http://schema.myhome.com:8081 # 인증관련설정 sasl.mechanism=SCRAM-SHA-512 security.protocol=SASL_P..
Kafka 의 토픽에 있는 데이터를 실시간성으로 데이터 집계 하기위해서는 Kafka Streams 를 많이 사용한다. 사용하기도 쉬운편이고, 카프카에만 의존되다보니 사실상 카프카만 세팅되어있다면 바로 활용가능하다. 하지만, 스트림 처리에서는 배치에는 없는 개념이 많다보니 의도하지 않은 형태로 결과가 나올때 가 많다. 결과가 반복되는 현상 스트림데이터는 무한의 데이터이다. 그래서 분석을 위해 어떤 시간기준으로 잘라서 집계를 하고 이걸 저장해서 분석하는게 일반적이다. 예를 들어, 1시간 윈도우로 단어 갯수를 샌다고 할때 나는 01:00~02:00 의 최종결과인 알파카 3(붉은색) 결과만 출력하고 싶은데, 중간 합계 결과인 (알파카 1), (알파카2) 가 같이 나오는 문제가 골치아픈 경우가 있다. 왜냐면, K..
flink 에서 Kafka 의 데이터를 hdfs 에 Sink 테스트하는데 이상하게 inprogress 파일형태로 중간파일만 생성하고, 최종적으로 flush 가 안되는 상황으로 삽질했었는데, 결론부터 말하면 checkpoint 를 설정해야 한다. Kafka to HDFS 예제 kafka 의 토픽을 hdfs 의 avro 파일포맷으로 sink 하는 예제이고, 날짜, 시간 필드를 파티션으로 지정해서 폴더별로 저장되게 하는 실제로 자주 사용하는 심플한 패턴이다. 그런데 문제는 파일이 생성은 되는거 같은데... 하루가 지나도 파일을 최종 flush 처리 하지 않는느낌이다. ----------------------- -- source ----------------------- CREATE TABLE KafkaUser..
카프카를 사용하다보면 토픽이나 컨슈머그룹을 확인하기위한 명령어를 사용해야 하는 경우가 많다. 처음엔 기본 command line 명령어툴을 사용했는데 사용하면서 느끼는 가장 큰 불편함은 아래 2가지였다. 명령어 파라미터가 조금 장황하고 클러스터를 여러대 운영한다면 "bootstrap-server" 주소를 기억해야한다 그래서 좀더 편하게 카프카 토픽을 다루는 명령어 툴이 없나? 하고 삽질하던끝에 꽤 좋은 녀석을 발견했다. kafkactl 이라는 녀석이고, 다행히 내가 사용중인 macOS 에서 잘 동작한다 https://github.com/deviceinsight/kafkactl 서버주소 입력하는것에서 해방 ! kafkactl 에서는 설정파일이 존재하기 때문에 매번 서버 주소를 입력하지 않고 명령어 처리가 ..