hive 의 udf 로 사용하다보면 아쉬운점이 많은데, brickhouse 의 UDF 를 쓰면 가려운 부분을 많이 긁어줘서 편하게 로직을 만들수 있다. 여러 케이스의 udf 가 있지만, 여기서는 json 관련 udf 의 샘플과 예시를 적어두고자 한다. 편의상 add jar 는 되어있다고 가정하겠다. https://github.com/klout/brickhouse GitHub - klout/brickhouse: Hive UDF's for the data warehouse Hive UDF's for the data warehouse. Contribute to klout/brickhouse development by creating an account on GitHub. github.com json 함수는 아..
![](http://i1.daumcdn.net/thumb/C148x148/?fname=https://blog.kakaocdn.net/dn/AqAcx/btsC9AblErL/UKlW3oGh9OCshKHEdpbhl0/img.png)
최근 검색량이 높은 키워드가 무엇인지를 알고 싶을때, flink 에서는 호핑윈도우(슬라이딩 윈도우) 기반으로 지정하고, 슬라이드 사이즈와 데이터 간격을 지정해서 로직을 유도하여 만드는걸 구성했다. 이해를 돕기위해 대충 쿼리를 표현하면 아래와 같다. 하지만, 처음에는 잘 동작하다가 어느순간 backpress 가 발생해서 데이터 지연으로 제대로 처리안되는 문제가 발생했다. INSERT INTO top_keyword_slide ...생략.... FROM TABLE( HOP( DATA => TABLE kafka_log, TIMECOL => DESCRIPTOR(log_time), SLIDE => INTERVAL '30' SECOND, SIZE => INTERVAL '10' MINUTES) ) WHERE valid..
![](http://i1.daumcdn.net/thumb/C148x148/?fname=https://blog.kakaocdn.net/dn/I1CVQ/btsC9lFfQZp/FllV0dSZbYBediHlhixT5k/img.png)
flink 를 쿼리로 작성해서 insert select 형태로 로직이 실행되면 대시보드의 Running Job List 의 이름이 획일적이라 구분이 쉽지 않다. 특히 동일한 테이블에서 조건만 다르게 N개의 로직을 돌리면 작업이름으로는 구분이 안되는 문제가 존재한다. 이런 문제를 해결하기위해서 쿼리 실행전에 작업이름을 지정하여 해결하는것이 가능하다. insert-into_카탈로그명.데이터베이스명.테이블명 해결방법 다음과 같이 insert select 쿼리를 실행하기전에 SET 'pipeline.name' 형태로 이름을 지정하면 된다. 그러면 insert-into-default_catalog.default_database... 같은 이름이 아니라 사용자가 지정한 작업이름으로 등록된다. SET 'pipelin..
hive 쿼리를 다루다보면 array, map 같은 데이터 타입이 존재하는데, 이런 데이터를 다룰때 기본적으로는 explode() 후 처리하는것이 기본적인 접근방식이다. 1. hive 기본 array 함수 별도의 jar 를 추가할 필요없이 hive 에서 기본으로 제공하는 array 관련 함수이며, 관련된 내용은 아래 링크에 정리되어있다. https://cwiki.apache.org/confluence/display/hive/languagemanual+udf 1.1 size 배열의 사이즈를 구하는 함수이다. beeline> select size( array(1,2,2,3) ); +------+ | _c0 | +------+ | 4 | +------+ 1.2 array_contains 배열에 특정한 아이템이..
![](http://i1.daumcdn.net/thumb/C148x148/?fname=https://blog.kakaocdn.net/dn/x28SU/btsBUArDwN8/MmXt0dYoxlvW7SvRLdoiP0/img.png)
airflow 의 best practices 글을 읽어보면, Variable.get(키) 형태로 직접 값을 가져오지 않고, 바인딩처리해서 {{var.value.키}} 표현해서 사용하는것을 기본 가이드로 하고 있다. 그 이유는 일단 DAG 를 구성할때 top level code 에 관련된 로직이 존재한다면 DAG 실행뿐 아니라, 로딩되는 시점에서도 그 코드가 동작되기 때문에 성능에 문제가 될 수 있다. https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#airflow-variables from airflow.models import Variable # Bad example foo_var = Variable.get("foo")..
![](http://i1.daumcdn.net/thumb/C148x148/?fname=https://blog.kakaocdn.net/dn/cfMI45/btsBRSkrUI0/VETUiO24fqu0orgMfKUeuk/img.png)
airflow 로 스케쥴 관리를 하고 있는데, logical_date 가 1초 밀리는 희안한 일이 일어났다. logical_date 의 경우 RUN_ID 값에 시간값이 붙어서 쉽게 인지가 가능한데, 정상적일땐 아래와 같이 RUN_ID 값이 00 으로 딱 떨어졌는데, 어느순간 다음과 같이 RUN_ID 의 값이 00 으로 딱 안떨어지고, 뒷 단위가 조금씩 밀리는 현상이 발견되었다. 구분 정상일때 비정상일때 RUN_ID scheduled__2003-12-07T20:00:00+00:00 scheduled__2003-12-07T21:00:01.0099+00:00 이게 문제가 되는 이유는 ExternalTaskSensor 의 경우는 앞쪽 DAG 의 의존성을 체크할때 logical_date 가 같은 이력을 참조하기 ..
![](http://i1.daumcdn.net/thumb/C148x148/?fname=https://blog.kakaocdn.net/dn/GFXcT/btsBMP9C3ym/PkL85Gb83sLC392tVMwT41/img.png)
airflow 에서는 code 탭에서 dag 를 구성한 python 파일을 볼 수 있다. 그런데, 이 파일과 dag 값이 다르게 로딩되서 한참 삽질한 경험을 공유하고자 한다. 결론부터 말하면 dag 를 선언한 파일에서 다른 dag 의 파일을 import 하면서 영향을 받았다. 원인 확인방법 airflow 에서는 dag 파일을 읽고, dag bag 에 담아서 관리된다. 문제는 이때 관련된 정보가 잘못 인지되었던 문제였다. dag 로딩이 잘되었는지는 airflow dags list 명령으로 확인이 가능하다. 원래 primitive.py, hour.py, day.py 3개의 파일이 따로 존재하고, dag 도 파일별로 따로 선언했는데 아래와 같이 primitive.py 경로가 아닌 hour.py 로 잘못잡히는걸..
hive 에서는 string, bigint, double, decimal 같은 일반적인 primitive 데이터 타입이외에 map, struct, array 같은 complex type 을 지원한다. 테스트용 쿼리를 만들때 primitive 타입의 경우 쉽게 표현이 가능한데 complext type 을 표현하는 방법은 종종 헛갈릴때가 많아서 간단히 표현법을 다루고자 한다. hive 에서는 기본적으로 "select 값" 형태로 더미 값을 출력해보는게 가능한데, complext type 을 선언하여 보는건 아래와 같이 사용하면 된다. 참고로 array 와 같이 N개의 아이템을 풀어서 분석하려면? explode 를 이용해 분석하면 되는데, 그건 아래 글을 참고하도록 하자. 2023.11.27 - [데이터처리/..