티스토리 뷰

반응형

kafka 의 데이터를 다룰때 일반적으로 spark, flink, kafka-streams 등 다양한 도구를 사용하지만, 사실 배치기반으로 데이터를 가볍게 조회하고 배치를 돌려보기에는 hive 에서 kafka 를 직접 붙여보는게 가장 손쉽다.

 

어떻게 사용할까?

사실 이 내용은 readme 문서에도 잘 정리되어있다.

https://github.com/apache/hive/blob/master/kafka-handler/README.md

 

만약, kafka 에 들어있는 파일포맷이 아래와 같은 json 포맷이라면, key 값을 테이블의 필드로 나열하면 된다.

예를 들어, sample_topic 에 아래와 같은 형태의 json 값이 존재한다면

{"name": "gildong", "age": 12, "address": "서울시"}

t_sample_table 이라는 테이블명으로 맵핑한다면 아래와 같이 쿼리를 선언하면 된다.

CREATE EXTERNAL TABLE t_sample_kafka_table (
     name    string,
     age     int,
     address string
)
STORED BY 
  'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES ( 
  "kafka.topic" = "sample_topic",
  "kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
  'kafka.consumer.security.protocol'='PLAINTEXT',
  "kafka.bootstrap.servers" = "10.0.0.1:9092"
);

추가정보 

kafka 의 토픽은 offset 정보와 partitions, key 정보등이 존재하는데 생성된 테이블에 우리가 선언하지 않은 필드명이 기본 포함되어있기 때문에 해당 정보를 활용하여 쿼리하는것도 가능하다. 사실 kafka 의 데이터를 확인하기위해 kafka-console-consumer 를 사용하여 볼수는 있는데 매우 불편한데, 데이터 분석과 집계를 hive 에서 할 수 있는건 큰 장점이다.

beeline> describe t_sample_kafka_table;
+--------------+----------------+--------------------+
|   col_name   |   data_type    |      comment       |
+--------------+----------------+--------------------+
| name         | string         | from deserializer  |
| age          | int            | from deserializer  |
| address      | string         | from deserializer  |
| __key        | binary         | from deserializer  |
| __partition  | int            | from deserializer  |
| __offset     | bigint         | from deserializer  |
| __timestamp  | bigint         | from deserializer  |
+--------------+----------------+--------------------+

__timestamp 의 활용

kafka 의 스트림 데이터를,  1시간 단위의 파티션으로 단위로 hive 에 저장하고 시다고 하면, __timestamp 필드를 잘 활용하면 별다른 스트리밍 플랫폼없이 집계하는것도 불가능하지는 않다. 

참고로, __timestamp 값은 epochtime 으로 되어있다. (예를 들면, 1686531361380)

https://www.epochconverter.com/ 에서 날짜값을 변환해보면 2023년 6월 12일 09:56:01 데이터가 된다.

hive 에서는 unix_timestamp 를 이용하면 epochtime 으로 변환할 수 있는데, 단위를 맞추기 위해 1000을 곱해야하고, 아래와 같은 형식으로 1시간 단위의 데이터를 추출해서 별도의 테이블로 sink 하는것이 가능해진다.

INSERT OVERWRITE TABLE t_sample_hive_table PARTITION (ymd='2023-06-15', hh24='00')
SELECT
    *
FROM
    t_sample_kafka_table
WHERE
        `__timestamp` >=  1000 * unix_timestamp('2023-06-15 00:00:00', 'yyyy-MM-dd HH:mm:ss')
    and `__timestamp` <  1000 * (unix_timestamp('2023-06-15 01:00:00', 'yyyy-MM-dd HH:mm:ss')

epochtime 을 반대로 날짜로 변환해서 보려면 from_unixtime 를 쓰면 되는데, 이는 아래링크에서 Date Functions 내용을 더 보면 된다.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF 

 

LanguageManual UDF - Apache Hive - Apache Software Foundation

Hive Operators and User-Defined Functions (UDFs) Case-insensitive All Hive keywords are case-insensitive, including the names of Hive operators and functions. In Beeline or the CLI, use the commands below to show the latest documentation: SHOW FUNCTIONS; D

cwiki.apache.org

 

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함