티스토리 뷰
데이터처리/Hive
[hive] KafkaStorageHandler 에서 Failed to construct kafka consumer 오류 해결하기
정선생 2023. 7. 25. 00:00반응형
참고로 아래와 같이 security.protocol 이 SASL_PLAINTEXT kafka 를 연동했을때 나타났던 문제이다.
kafka 의 토픽을 맵핑한 ddl 문과 에러메시지 이다.
beeline> CREATE EXTERNAL TABLE kafka_table (
foo STRING,
bar STRING
)
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
"kafka.bootstrap.servers" = "10.1.1.1:9092",
"kafka.topic" = "sample_topic",
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
'kafka.consumer.security.protocol'='SASL_PLAINTEXT',
'kafka.consumer.sasl.mechanism'='PLAIN',
'kafka.consumer.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="myuser" password="password";'
);
beeline> select * from kafka_table limit 1;
INFO : Completed executing command(queryId=hive_20230721185214_7a80733e-669e-4652-a299-368cf733e2b6); Time taken: 0.005 seconds
Error: Unable to get the next row set with exception: java.io.IOException: org.apache.kafkaesque.common.KafkaException: Failed to construct kafka consumer (state=,code=0)
org.apache.hive.service.cli.HiveSQLException: Unable to get the next row set with exception: java.io.IOException: org.apache.kafkaesque.common.KafkaException: Failed to construct kafka consumer
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:387)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:373)
at org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:351)
at org.apache.hive.beeline.BufferedRows.<init>(BufferedRows.java:56)
at org.apache.hive.beeline.IncrementalRowsWithNormalization.<init>(IncrementalRowsWithNormalization.java:50)
at org.apache.hive.beeline.BeeLine.print(BeeLine.java:2331)
at org.apache.hive.beeline.Commands.executeInternal(Commands.java:1028)
at org.apache.hive.beeline.Commands.execute(Commands.java:1216)
at org.apache.hive.beeline.Commands.sql(Commands.java:1145)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1519)
at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:1379)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:1149)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:1097)
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:555)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:537)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:328)
at org.apache.hadoop.util.RunJar.main(RunJar.java:241)
원인
결론부터 말하면 sasl.jaas.config 값에서 문제가 발생된것이다.
보통 kafka 의 연결정보에서 sasl.jaas.config 정보를 표현할때 아래와 같이 표현되어있고, 이를 hive table 로 선언할때 자연스럽게 ' 로 감싸고 표현을 하게 되는데 이게 파싱을 제대로 못하는 문제이다.
'org.apache.kafka.common.security.plain.PlainLoginModule required username="아이디" password="암호";'
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="아이디" \
password="암호";
하둡클러스터내의 로그를 역추적해보면 아래와 같이 JAAS config 관련값의 key 값을 못찾는 상황으로 인지된다.
Caused by: org.apache.kafkaesque.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafkaesque.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825)
at org.apache.kafkaesque.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at org.apache.kafkaesque.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647)
at org.apache.kafkaesque.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627)
at org.apache.hadoop.hive.kafka.KafkaInputFormat.computeSplits(KafkaInputFormat.java:125)
at org.apache.hadoop.hive.kafka.KafkaInputFormat.getSplits(KafkaInputFormat.java:69)
at org.apache.hadoop.hive.ql.exec.FetchOperator.generateWrappedSplits(FetchOperator.java:435)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:402)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:306)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:562)
... 20 more
Caused by: java.lang.IllegalArgumentException: Value not specified for key 'username' in JAAS config
at org.apache.kafkaesque.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:116)
at org.apache.kafkaesque.common.security.JaasConfig.<init>(JaasConfig.java:63)
at org.apache.kafkaesque.common.security.JaasContext.load(JaasContext.java:88)
at org.apache.kafkaesque.common.security.JaasContext.loadClientContext(JaasContext.java:82)
at org.apache.kafkaesque.common.network.ChannelBuilders.create(ChannelBuilders.java:167)
at org.apache.kafkaesque.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
at org.apache.kafkaesque.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
at org.apache.kafkaesque.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:738)
... 29 more
해결방법
hive 에서 sql 로 표현된 문자열을 던져줄때 문제가 발생된것이다. 이를 회피하기위해서는 "" 로 감싸있던걸 \'\' 형태로 표현해야한다.
JaasConfig.parseAppConfigurationEntry() 코드내에서는 " 를 인지하는걸 보면, hive 에서 선언된 문자열을 읽고 던져줄때 다른형태로 전달되는게 아닌가 추정된다. 아무튼... 이런 케이스는 이스케입처리를 해서 선언한 테이블을 사용하면 된다.
beeline> CREATE EXTERNAL TABLE kafka_table (
foo STRING,
bar STRING
)
STORED BY
'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
"kafka.bootstrap.servers" = "10.1.1.1:9092",
"kafka.topic" = "sample_topic",
"kafka.serde.class" = "org.apache.hadoop.hive.serde2.JsonSerDe",
'kafka.consumer.security.protocol'='SASL_PLAINTEXT',
'kafka.consumer.sasl.mechanism'='PLAIN',
--'kafka.consumer.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="myuser" password="password";'
'kafka.consumer.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username=\'myuser\' password=\'password\';'
);
삽질은 길었지만 그래도 해결
반응형
'데이터처리 > Hive' 카테고리의 다른 글
[HIVE] external table 이름 변경 후 location 까지 변경하는 방법 - 파티션 포함 (0) | 2023.12.01 |
---|---|
[HIVE] Array 필드를 행으로 풀어내는 방법 - explode, posexplode (1) | 2023.11.27 |
[hive] KafkaStorageHandler 에서 SASL_PLAINTEXT 인증모듈 사용하기 (0) | 2023.07.24 |
[hive] hive 에서 kafka 를 연동하고 집계하는 방법 - KafkaStorageHandler 를 써보자 (0) | 2023.07.21 |
[hive] avro 압축 포맷별 용량 비교 (snappy, bzip2, deflate) - orc 는 덤 (0) | 2023.07.13 |
댓글