티스토리 뷰
반응형
Flink 에서는 sql-client.sh 를 실행해서 쿼리기반으로 실행하는 방법이 있고, dataStream API 를 이용해서 직접 java 코드를 짜서 만드는 방법이 있다. 두개를 병행해서 테스트하다보니 avro 라이브러리 충돌이 일어났다.
내가 만들려던건 kafka의 토픽데이터를 json 형태로 로컬 파일로 덤프내리는 flink application 을 만들려는 시도였는데 다음과 같은 오류가 발생되었다. 이걸 해결하는건 라이브러리 충돌이 원인이었고 우회하는건 빌드시점에 충돌안되게 하는게 가장 편했다.
$ ./flink run /home1/myhome/flink-app/target/FlinkSample-1.0-SNAPSHOT.jar -kafka.bootstrap.servers=127.0.0.1:9093 -kafka.schema.registry.url=http://127.0.0.1 -topic=sample-input
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/home1/myhome/flink-1.14.2/lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[START: topic=sample-input] kafkaServer=kafka.myhome.com:9093, schemaRegistryURL=http://127.0.0.1:8081
java.lang.NoSuchMethodError: org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema.forGeneric(Lorg/apache/avro/Schema;Ljava/lang/String;)Lorg/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema;
at sample.FlinkAppSample.source(FlinkAppSample.java:74)
at sample.FlinkAppSample.call(FlinkAppSample.java:56)
at sample.FlinkAppSample.main(FlinkAppSample.java:43)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
원인
원인은 avro 라이브러리 의존성이 충돌났던것이었다.
가장 손쉬운 해결방법은 flink/lib 폴더의 아래 파일을 제거하면 우선 application 실행에는 성공하지만 문제는 sql-client.sh 을통해 kafka 데이터를 다루지 못하는 문제가 있다.
- flink-sql-avro-버전.jar
- flink-sql-avro-confluent-registry-버전.jar
해결방법
모든 문제를 해결하는 가장 손쉬운 방법은 어플래케이션을 빌드할때 리패키징 하는것이다.
더 정확히는 충돌나는 라이브러리가 충돌나지 않도록 package 위치를 다르게 세팅후 빌드하도록 유도하는것이다.
MAVEN 기준으로 설명하면, 다음과 같은 설정을 추가후 빌드해주면 된다.
pom.xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
...
<relocations>
<relocation>
<pattern>io.confluent.kafka</pattern>
<shadedPattern>shaded.io.confluent.kafka</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.flink.formats.avro</pattern>
<shadedPattern>shaded.org.apache.flink.formats.avro</shadedPattern>
</relocation>
</relocations>
...
</configuration>
</execution>
</executions>
</plugin>
사실 이건 avro의 라이브러리 충돌뿐 아니라, 다른 솔루션의 라이브러리 충돌을 우회할때 자주 이용되는거라서 알아두면 딴곳에서도 도움이 될듯 하다.
반응형
'데이터처리 > Flink' 카테고리의 다른 글
[FLINK] TIMESTAMP vs TIMESTAMP_LTZ 필드 타입 차이 (0) | 2022.06.10 |
---|---|
[Flink] Explode 쿼리 표현하기 - Cross join unnest (0) | 2022.05.31 |
[Flink] Could not acquire the minimum required resources 이유와 해결방법 (0) | 2022.05.24 |
Flink 1.15 버전에서 HA 설정시 오류가 발생하는 경우 - 주키퍼 버전 (0) | 2022.05.22 |
[Flink] streaming-source.enable 기능이 동작 안하는 이유? - 파티션갯수 (0) | 2022.04.21 |
댓글