티스토리 뷰
반응형
flink sql 에서 kafka connector type 을 지원하고, document 에도 예제는 잘 정리되어있다. 하지만, Kafka 에 SCRAM-SHA-512 인증이 있어서 properties 값을 적용해야하는 경우 어떻게 해야하는지 예시가 없는듯 하다. 이걸 해결하는 방법을 알려주고자 한다.
보통 kafka consumer 를 직접 만들때, 다음과 같은 형태의 연결정보에 이용한다는걸 가정하고 예제를 들어보겠다.
# 연결정보
bootstrap.servers=kafka.myhome.com:9093
schema.registry.url=http://schema.myhome.com:8081
# 인증관련설정
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="아이디" \
password="암호";
해결방법
위와 같이 jaas 형태로 인증할때 Flink SQL 에서 테이블을 선언할때, properties.* 형태로 속성을 추가하여 선언하면 된다.
이건 카프카에 대한 속성정보을 추가할때도 동일한 패턴으로 해당 설정의 key 앞에 properties 만 붙여주면 된다.
Flink SQL> CREATE TABLE user_created (
-- one column mapped to the Kafka raw UTF-8 key
the_kafka_key STRING,
-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'my-flink-topic',
'properties.bootstrap.servers' = 'kafka.myhome.com:9093',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="아이디" password="암호";' ,
... 생략 ...
'value.avro-confluent.schema-registry.url' = 'http://schema.myhome.com:8081',
);
그리고, 중요한건 sasl.jaas.config 설정에 패키지 이름이 보일텐데, flink 기본 패키지에는 이 모듈이 기본포함 되어있지 않다.
org.apache.kafka.common.security.scram.ScramLoginModule 의존성을 해결하기위해서는 kafka-client-6.0.0-css.jar 와 같은 client 라이브러리를 "$FLINK_HOME/lib" 폴더에 복사해줘야한다.
나 같은 경우는 Flink 1.14.2 버전에서는 위 버전으로 해결되었는데, 업데이트되면서 kafka 의 의존성버전이 많이 달라지면 다른 버전을 받아야 할 수 있을지도 모르니 참고하도록 하자.
반응형
'데이터처리 > Flink' 카테고리의 다른 글
댓글