티스토리 뷰
사실 Kafka 의 데이터를 활용할 때, KafkaStrems 를 사용해서 consumer 를 만들거나, ksql 을 쓰거나 혹은 spark 를 쓰는게 일반적이다. 하지만, 실시간성으로 데이터를 다루는게 아니라, 조금 지연되더라도 배치기반으로 처리하고, 그 결과를 아카이빙 할 수 있도록 유지보수하는 요구사항도 꽤 많은데 이럴때는 기냥 hive 에서 KafkaStorageHandler 를 사용하는게 훨씬 간편하다.
기본내용
hive 에서 KafkaStorageHandler 를 사용하는건 아래 문서를 참고하면 된다.
https://github.com/apache/hive/blob/master/kafka-handler/README.md
보통 kafka 에 데이터를 적재할때, JSON 과 AVRO 포맷을 많이 사용할텐데, kafka.serde.class 에서 2가지 모두 지원한다.
external table 을 사용하여 필드를 맵핑하면 쉽게 사용가능하다.
- org.apache.hadoop.hive.serde2.JsonSerDe
- org.apache.hadoop.hive.serde2.avro.AvroSerDe
CREATE EXTERNAL TABLE
kafka_table (
`timestamp` TIMESTAMP,
`page` STRING,
`newPage` BOOLEAN,
`added` INT,
`deleted` BIGINT,
`delta` DOUBLE)
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'= 'test-topic',
'kafka.serde.class' = 'org.apache.hadoop.hive.serde2.JsonSerDe',
'kafka.consumer.security.protocol'='PLAINTEXT',
'kafka.bootstrap.servers" = 'localhost:9092'
);
Kafka 데이터를 시간 파티션으로 저장하려면?
kafka 의 토픽을 영속성있게 모든 데이터를 유지하지 않는다. 보통 최근 7일수준의 데이터를 보유하고 데이터 사이즈도 무제한이 아니다.
그렇기 때문에 해당데이터를 영속성있게 분석하고 싶다면, 해당 데이터를 hdfs 와 같은 스토리지에 저장을 해두고 써야 유용하다.
KafkaStorageHandler 를 쓰면 이것도 손쉬운데, 이때 kafka 의 __timestamp 정보를 잘 활용하여 조건에 넣어 사용하면 된다.
__timestamp 는 kafka 의 토픽에 존재하는 날짜값이며, 이 날짜값의 특성을 고려해서 실제 배치가 실행될 시점의 스케쥴만 잘 조정해서 실행하면 된다.
-- {날짜} : 2023-01-02 같은 포맷
-- {시간} : 15 같은 포맷이라면?
INSERT OVERWRITE TABLE t_hive_table PARTITION(ymd='{날짜}', hh24='{시간}')
SELECT
`timestamp`
,`page`
,`newPage`
,`added`
,`deleted`
,`delta`
FROM
kafka_table
WHERE
-- 3600초는 60분을 의미, 즉 아래와 같은 형태의 조건을 만들기 위한것임
-- 즉 2023-01-02 15:00 <= __timestamp < 2023-01-02 16:00
`__timestamp` >= 1000 * unix_timestamp('{날짜} {시간}:00:00', 'yyyy-MM-dd HH:mm:ss')
and `__timestamp` < 1000 * (unix_timestamp('{날짜} {시간}:00:00', 'yyyy-MM-dd HH:mm:ss') + 3600)
만약, kafka topic 의 __timestamp 값이 아닌 토픽내에 존재하는 별도의 시간필드가 존재한다면?
지연도착을 고려해서 __timestamp 값을 좀더 넓게 잡아주고, 해당 날짜값의 조건을 and 조건으로 더 넣어주면 충분히 데이터를 적재하고 저장하는게 가능하다.
이럴때 hive 의 날짜계산 함수나 이런것도 많이 알아야 하는데 이건 나중에 언제 다루도록 하겠다.
'데이터처리 > Hive' 카테고리의 다른 글
[hive] KafkaStorageHandler 에서 Failed to construct kafka consumer 오류 해결하기 (0) | 2023.07.25 |
---|---|
[hive] KafkaStorageHandler 에서 SASL_PLAINTEXT 인증모듈 사용하기 (0) | 2023.07.24 |
[hive] avro 압축 포맷별 용량 비교 (snappy, bzip2, deflate) - orc 는 덤 (0) | 2023.07.13 |
[Hive] kafka 의 json 데이터를 hive 에서 다루는 방법 - KafkaStorageHandler 활용법 (0) | 2023.06.16 |
[Hive] 파일포맷(orc, parquet, avro..) 별 테이블 생성과 압축방법 (0) | 2023.05.22 |