티스토리 뷰
kafka 의 데이터를 다룰때 일반적으로 spark, flink, kafka-streams 등 다양한 도구를 사용하지만, 사실 배치기반으로 데이터를 가볍게 조회하고 배치를 돌려보기에는 hive 에서 kafka 를 직접 붙여보는게 가장 손쉽다.
어떻게 사용할까?
사실 이 내용은 readme 문서에도 잘 정리되어있다.
https://github.com/apache/hive/blob/master/kafka-handler/README.md
만약, kafka 에 들어있는 파일포맷이 아래와 같은 json 포맷이라면, key 값을 테이블의 필드로 나열하면 된다.
예를 들어, sample_topic 에 아래와 같은 형태의 json 값이 존재한다면
{"name": "gildong", "age": 12, "address": "서울시"}
t_sample_table 이라는 테이블명으로 맵핑한다면 아래와 같이 쿼리를 선언하면 된다.
CREATE EXTERNAL TABLE t_sample_kafka_table (
name string,
age int,
address string
)
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
"kafka.topic" = "sample_topic",
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
'kafka.consumer.security.protocol'='PLAINTEXT',
"kafka.bootstrap.servers" = "10.0.0.1:9092"
);
추가정보
kafka 의 토픽은 offset 정보와 partitions, key 정보등이 존재하는데 생성된 테이블에 우리가 선언하지 않은 필드명이 기본 포함되어있기 때문에 해당 정보를 활용하여 쿼리하는것도 가능하다. 사실 kafka 의 데이터를 확인하기위해 kafka-console-consumer 를 사용하여 볼수는 있는데 매우 불편한데, 데이터 분석과 집계를 hive 에서 할 수 있는건 큰 장점이다.
beeline> describe t_sample_kafka_table;
+--------------+----------------+--------------------+
| col_name | data_type | comment |
+--------------+----------------+--------------------+
| name | string | from deserializer |
| age | int | from deserializer |
| address | string | from deserializer |
| __key | binary | from deserializer |
| __partition | int | from deserializer |
| __offset | bigint | from deserializer |
| __timestamp | bigint | from deserializer |
+--------------+----------------+--------------------+
__timestamp 의 활용
kafka 의 스트림 데이터를, 1시간 단위의 파티션으로 단위로 hive 에 저장하고 시다고 하면, __timestamp 필드를 잘 활용하면 별다른 스트리밍 플랫폼없이 집계하는것도 불가능하지는 않다.
참고로, __timestamp 값은 epochtime 으로 되어있다. (예를 들면, 1686531361380)
https://www.epochconverter.com/ 에서 날짜값을 변환해보면 2023년 6월 12일 09:56:01 데이터가 된다.
hive 에서는 unix_timestamp 를 이용하면 epochtime 으로 변환할 수 있는데, 단위를 맞추기 위해 1000을 곱해야하고, 아래와 같은 형식으로 1시간 단위의 데이터를 추출해서 별도의 테이블로 sink 하는것이 가능해진다.
INSERT OVERWRITE TABLE t_sample_hive_table PARTITION (ymd='2023-06-15', hh24='00')
SELECT
*
FROM
t_sample_kafka_table
WHERE
`__timestamp` >= 1000 * unix_timestamp('2023-06-15 00:00:00', 'yyyy-MM-dd HH:mm:ss')
and `__timestamp` < 1000 * (unix_timestamp('2023-06-15 01:00:00', 'yyyy-MM-dd HH:mm:ss')
epochtime 을 반대로 날짜로 변환해서 보려면 from_unixtime 를 쓰면 되는데, 이는 아래링크에서 Date Functions 내용을 더 보면 된다.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
'데이터처리 > Hive' 카테고리의 다른 글
[hive] hive 에서 kafka 를 연동하고 집계하는 방법 - KafkaStorageHandler 를 써보자 (0) | 2023.07.21 |
---|---|
[hive] avro 압축 포맷별 용량 비교 (snappy, bzip2, deflate) - orc 는 덤 (0) | 2023.07.13 |
[Hive] 파일포맷(orc, parquet, avro..) 별 테이블 생성과 압축방법 (0) | 2023.05.22 |
[HIVE] HiveAccessControlException Permission denied 원인 (0) | 2023.02.16 |
[튜닝] 리듀서가 적게 잡혀 느린 Group By 쿼리 튜닝하기 - CBO 삽질 케이스 (0) | 2023.02.09 |