티스토리 뷰

반응형

flink sql 에서 kafka connector type 을 지원하고, document 에도 예제는 잘 정리되어있다. 하지만, Kafka 에 SCRAM-SHA-512 인증이 있어서 properties 값을 적용해야하는 경우 어떻게 해야하는지 예시가 없는듯 하다. 이걸 해결하는 방법을 알려주고자 한다.

 

보통 kafka consumer 를 직접 만들때, 다음과 같은 형태의 연결정보에 이용한다는걸 가정하고 예제를 들어보겠다.

# 연결정보
bootstrap.servers=kafka.myhome.com:9093
schema.registry.url=http://schema.myhome.com:8081

# 인증관련설정
sasl.mechanism=SCRAM-SHA-512
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="아이디" \
        password="암호";

 

해결방법

위와 같이 jaas 형태로 인증할때  Flink SQL 에서 테이블을 선언할때, properties.* 형태로 속성을 추가하여 선언하면 된다.

이건 카프카에 대한 속성정보을 추가할때도 동일한 패턴으로 해당 설정의 key 앞에 properties 만 붙여주면 된다.

Flink SQL> CREATE TABLE user_created (
  -- one column mapped to the Kafka raw UTF-8 key
  the_kafka_key STRING,
  
  -- a few columns mapped to the Avro fields of the Kafka value
  id STRING,
  name STRING, 
  email STRING

) WITH (
  'connector' = 'kafka',
  'topic' = 'my-flink-topic',
  'properties.bootstrap.servers' = 'kafka.myhome.com:9093',
  'properties.sasl.mechanism' = 'SCRAM-SHA-512',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="아이디" password="암호";' ,
  ... 생략 ...
  'value.avro-confluent.schema-registry.url' = 'http://schema.myhome.com:8081',
);

그리고, 중요한건 sasl.jaas.config 설정에 패키지 이름이 보일텐데, flink 기본 패키지에는 이 모듈이 기본포함 되어있지 않다.

org.apache.kafka.common.security.scram.ScramLoginModule 의존성을 해결하기위해서는 kafka-client-6.0.0-css.jar  와 같은 client 라이브러리를 "$FLINK_HOME/lib" 폴더에 복사해줘야한다.

 

나 같은 경우는 Flink 1.14.2 버전에서는 위 버전으로 해결되었는데, 업데이트되면서 kafka 의 의존성버전이 많이 달라지면 다른 버전을 받아야 할 수 있을지도 모르니 참고하도록 하자.

반응형
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함