
hive 에서 array 를 다루는 함수가 기본적으로 너무 없었다. 배열크기를 찾는 size, 배열에 포함여부를 찾는 array_contains, 그리고 배열을 정렬해주는 sort_array 정도만 존재했다. 사실상 explode() 처리후 다시 collect_list() 로 묶어서 처리하라는 말인데... 문제는 2개의 필드를 explode() 해야할 경우 이게 참 껄끄러운 문제가 많고 아쉬운점이 많았다. https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-CollectionFunctions Hive 4.x 추가된 Array 함수 hive 4.x 버전대부터 꽤 많은 UDF 가 추가되어있다. (근데 왜 h..
brickhouse 는 cookbook 이 정리가 잘 안되어있어서 좋은데도 사용을 잘 못하고 있는게 아닌가 싶다. 이번에는 timeseries 에 있는 함수의 예제를 정리하고자 한다. https://github.com/klout/brickhouse/tree/master/src/main/java/brickhouse/udf/timeseries 1. moving_avg 이동평균값을 구할때 사용한다. 쉽게 생각하면 주식차트에서 7일 이동평균, 30일 이동평균선 같이 평균값을 만들어 내는데 이런 값을 유도한다고 생각하면 된다. 그래서 array 값과 이동평균을 할 값을 지정하면 된다. 내부적으로 double 로 캐스팅해서 계산하므로 항상 소숫점으로 나온다. beeline> CREATE TEMPORARY FUNCT..
hive 의 udf 로 사용하다보면 아쉬운점이 많은데, brickhouse 의 UDF 를 쓰면 가려운 부분을 많이 긁어줘서 편하게 로직을 만들수 있다. 여러 케이스의 udf 가 있지만, 여기서는 json 관련 udf 의 샘플과 예시를 적어두고자 한다. 편의상 add jar 는 되어있다고 가정하겠다. https://github.com/klout/brickhouse GitHub - klout/brickhouse: Hive UDF's for the data warehouse Hive UDF's for the data warehouse. Contribute to klout/brickhouse development by creating an account on GitHub. github.com json 함수는 아..

최근 검색량이 높은 키워드가 무엇인지를 알고 싶을때, flink 에서는 호핑윈도우(슬라이딩 윈도우) 기반으로 지정하고, 슬라이드 사이즈와 데이터 간격을 지정해서 로직을 유도하여 만드는걸 구성했다. 이해를 돕기위해 대충 쿼리를 표현하면 아래와 같다. 하지만, 처음에는 잘 동작하다가 어느순간 backpress 가 발생해서 데이터 지연으로 제대로 처리안되는 문제가 발생했다. INSERT INTO top_keyword_slide ...생략.... FROM TABLE( HOP( DATA => TABLE kafka_log, TIMECOL => DESCRIPTOR(log_time), SLIDE => INTERVAL '30' SECOND, SIZE => INTERVAL '10' MINUTES) ) WHERE valid..

flink 를 쿼리로 작성해서 insert select 형태로 로직이 실행되면 대시보드의 Running Job List 의 이름이 획일적이라 구분이 쉽지 않다. 특히 동일한 테이블에서 조건만 다르게 N개의 로직을 돌리면 작업이름으로는 구분이 안되는 문제가 존재한다. 이런 문제를 해결하기위해서 쿼리 실행전에 작업이름을 지정하여 해결하는것이 가능하다. insert-into_카탈로그명.데이터베이스명.테이블명 해결방법 다음과 같이 insert select 쿼리를 실행하기전에 SET 'pipeline.name' 형태로 이름을 지정하면 된다. 그러면 insert-into-default_catalog.default_database... 같은 이름이 아니라 사용자가 지정한 작업이름으로 등록된다. SET 'pipelin..
hive 쿼리를 다루다보면 array, map 같은 데이터 타입이 존재하는데, 이런 데이터를 다룰때 기본적으로는 explode() 후 처리하는것이 기본적인 접근방식이다. 1. hive 기본 array 함수 별도의 jar 를 추가할 필요없이 hive 에서 기본으로 제공하는 array 관련 함수이며, 관련된 내용은 아래 링크에 정리되어있다. https://cwiki.apache.org/confluence/display/hive/languagemanual+udf 1.1 size 배열의 사이즈를 구하는 함수이다. beeline> select size( array(1,2,2,3) ); +------+ | _c0 | +------+ | 4 | +------+ 1.2 array_contains 배열에 특정한 아이템이..

airflow 의 best practices 글을 읽어보면, Variable.get(키) 형태로 직접 값을 가져오지 않고, 바인딩처리해서 {{var.value.키}} 표현해서 사용하는것을 기본 가이드로 하고 있다. 그 이유는 일단 DAG 를 구성할때 top level code 에 관련된 로직이 존재한다면 DAG 실행뿐 아니라, 로딩되는 시점에서도 그 코드가 동작되기 때문에 성능에 문제가 될 수 있다. https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#airflow-variables from airflow.models import Variable # Bad example foo_var = Variable.get("foo")..

airflow 로 스케쥴 관리를 하고 있는데, logical_date 가 1초 밀리는 희안한 일이 일어났다. logical_date 의 경우 RUN_ID 값에 시간값이 붙어서 쉽게 인지가 가능한데, 정상적일땐 아래와 같이 RUN_ID 값이 00 으로 딱 떨어졌는데, 어느순간 다음과 같이 RUN_ID 의 값이 00 으로 딱 안떨어지고, 뒷 단위가 조금씩 밀리는 현상이 발견되었다. 구분 정상일때 비정상일때 RUN_ID scheduled__2003-12-07T20:00:00+00:00 scheduled__2003-12-07T21:00:01.0099+00:00 이게 문제가 되는 이유는 ExternalTaskSensor 의 경우는 앞쪽 DAG 의 의존성을 체크할때 logical_date 가 같은 이력을 참조하기 ..