Kafka 의 토픽에 있는 데이터를 실시간성으로 데이터 집계 하기위해서는 Kafka Streams 를 많이 사용한다. 사용하기도 쉬운편이고, 카프카에만 의존되다보니 사실상 카프카만 세팅되어있다면 바로 활용가능하다. 하지만, 스트림 처리에서는 배치에는 없는 개념이 많다보니 의도하지 않은 형태로 결과가 나올때 가 많다. 결과가 반복되는 현상 스트림데이터는 무한의 데이터이다. 그래서 분석을 위해 어떤 시간기준으로 잘라서 집계를 하고 이걸 저장해서 분석하는게 일반적이다. 예를 들어, 1시간 윈도우로 단어 갯수를 샌다고 할때 나는 01:00~02:00 의 최종결과인 알파카 3(붉은색) 결과만 출력하고 싶은데, 중간 합계 결과인 (알파카 1), (알파카2) 가 같이 나오는 문제가 골치아픈 경우가 있다. 왜냐면, K..
hive 는 다양한 파일포맷과 스토리지 핸들러를 통해 hdfs 가 아닌 es 나 kafka 같은 외부 스토리지의 연결도 가능하게 해준다. 그래서 테이블이 어떤 파티션 정책을 갖고 있고 어떤 파일포맷이고 어떤 스토리지 핸들러를 쓰는지 확인해보고 싶은 경우가 생긴다. 특히 딴사람이 만든 테이블이라서 분석이 필요한 경우 더 그런듯 테이블 선언문(DDL) 확인 "show create table 테이블명" 을 하면 테이블 생성할때의 명령을 확인할 수 있다. 이러면 스키마 정보나, SERDE 정보 그리고 어떤 파일포맷과 압축정책을 썼는지 까지 쉽게 확인이 가능하다. 하지만, 코멘트가 한글일 경우 다음과 같이 깨져서 보이는게 조금 아쉽다. 꼭, 코멘트를 확인하고 싶다면 describe 명령을 이용해서 확인하면 가능하..
flink 에서 Kafka 의 데이터를 hdfs 에 Sink 테스트하는데 이상하게 inprogress 파일형태로 중간파일만 생성하고, 최종적으로 flush 가 안되는 상황으로 삽질했었는데, 결론부터 말하면 checkpoint 를 설정해야 한다. Kafka to HDFS 예제 kafka 의 토픽을 hdfs 의 avro 파일포맷으로 sink 하는 예제이고, 날짜, 시간 필드를 파티션으로 지정해서 폴더별로 저장되게 하는 실제로 자주 사용하는 심플한 패턴이다. 그런데 문제는 파일이 생성은 되는거 같은데... 하루가 지나도 파일을 최종 flush 처리 하지 않는느낌이다. ----------------------- -- source ----------------------- CREATE TABLE KafkaUser..
Flink 에서 Hive Connector 를 연동해서, 카탈로그에 등록하면 hive 의 데이터를 조회해볼 수 있다. 기본적으로는 flink 의 taskManager 의 슬롯안에서 데이터를 처리하기 때문에 (즉, M/R 이나 TEZ 엔진이 도는 yarn 기반아닌) 여유있는 슬롯보다 더 큰 슬롯요청을 하게되면 조회가 안되는 상황이 발생될 수 있다. 현상 : flink에서 hive 테이블 조회가 안되는 현상 결론부터 말하면, 나같은 경우는 hive 테이블을 조회하기 위해서 taskManager 의 슬롯이 50개나 요청되었는데, 실제 가용가능한 슬롯은 2개밖에 없어서 나타난 현상이었다. 명시적으로 쿼리 에러가 났으면 빨리 찾았을텐데... 다음과 같이 쿼리 요청했는데 기냥 대기상태로 있고 에러가 바로 나타나지 ..
Flink 에서 sql-client.sh 에서 쿼리기반으로 데이터를 가공하고, 저장(sink) 할 수 있다. 일반적으로 sink 용으로 많이 사용하는 스토리지는 hive 거나 elasticsearch 가 아닐까 싶다. 추가될 라이브러리는 "$FLINK_HOME/lib" 하위에 jar 파일을 복사하고 실행해야 한다. 필요한 라이브러리 세팅 flink 에 hive 를 연동하려면 hive 버전에 따라서 필요한 라이브러리가 다르다. 내가 사용하는 hive 의 버전은 2.3.6 이었기 때문에 다음과 같이 필요한 라이브러리를 다운로드 받아서 사용했다. 참고로, 라이브러리들은 "$FLINK_HOME/lib" 하위에 복사하면 된다. 만약, hive 버전이 다른걸 쓴다면 flink document 에서 필요한 의존파일을..
Flink 는 스트림데이터를 다룰수 있는 플랫폼이다. High-Avaliability(HA)를 구성하거나 상태정보를 저장하려면 공유스토리지가 필수인데 보통 hdfs 나 s3 를 쓰는 경우가 많다. (예제로 보통 s3나 hdfs 로 되어있고) 그런데, 아마존 환경이 아니라면 s3는 안쓸거고 하둡클러스터를 운영하지 않는다면 hdfs를 쓸수없다. 만약, 아마존을 안쓰고 서버에서 카프카와 ES만 연동할때 HA 와 이력정보를 남긴다면 어떻게 해야할까? (=즉, hdfs, s3사용불가) 사실 쿠버네티스환경에서는 퍼시스턴트 볼륨 이 존재한다. 공유도 가능하고 영구 저장도 가능하다. 그래서 HA 이력정보나 savepoint, checkpoint 정보를 담는용도로 사용가능하다. (참고로 나는 ceph 를 연동해서 사용했..
엘라스틱서치는 restapi 를 지원해서 주소를 요청해서 가능한게, url 을 호출하는 가장 대중적인 커맨드라인툴이 curl 이다. 그래서 curl 로 호출하는 경우도 많으니 간단히 명령어를 정리하도록 하겠다. 예제는 편의상 http://127.0.0.1:9200 을 기준으로 설명하도록 하겠다. 추가로, ES에 웹로그인형태로 권한이 존재한다면 "-u 아이디:패스워드" 파라미터를 추가로 기입해주면된다. 이런게 없다면 파란색 부분은 생략하고 실행하면 된다. (사실 예제들은 모두 빼고 적긴했다) curl -u 아이디:패스워드 -XGET 'http://127.0.0.1:9200/_cat/health?v 인덱스 생성 사실 flink 와 연계하거나 API 를 사용해서 인덱스를 수동으로 생성할 일은 없지만, 다음과 ..
하이브에서 쿼리를 날릴때 이런 쿼리가 발생했고 구글링 하면 나오는 사이트중 그나마 힌트에 근접한 결과는 아래링크에서 얻을수 있었다. https://docs.treasuredata.com/display/public/PD/Hive+Known+Limitations 에러의 원인은 뭘까? 원인이 되는 쿼리를 패턴을 찾아보면 "LATERAL VIEW" 와 "UNION ALL" 를 복잡하게 섞었을때 종종 나타나는것 같다. 가끔 beeline 에서 오류나는게 hive 커멘드에서는 재현이 안되는 경우가 있으니 동일한 환경에서 쿼리를 테스트해야 재현되는경우가 있으니 참고하자. 상세한 오류(에러)메시지는 아래와 같다. 참고로 해결방법은 버그가 아닐까 싶을 정도로 단순하며 찜찜하다 ㅋㅋ etting log thread is ..