삽질/개발,엔지니어링

clickhouse에 kafka json 메시지 가져올 때 object > object 구조 처리

maengis 2023. 2. 22. 15:21
graph LR kafka --> chq[clickhouse queue table] chq --> chc[clickhouse consumer table] chc --> cht[clickhouse table]

kafka engine으로 메시지를 가져와서 클릭하우스에 넣으면 보통 위와 같은 순서로 데이터를 넣는다.

문제는 ENGINE = Kafka로 지정하고 보통은 kafka_format을 JSONEachRow를 사용해서 데이터를 넣는데, 이게 key, value로 넣다보니 json 구조에 object > object가 있으면 넣질 못한다.

클릭하우스가 JSONExtract 함수를 지원하니까 이걸 쓰면 되긴 하는데, 버전이 낮으면 메모리 누수 문제가 있어서 되도록 안 쓰는 방향으로 컨슈밍을 하고 있었는데, 버전이 올라가면서 이슈가 해결된 거 같아서 JSONExtract 위주로 컨슈밍을 해봤다.

{
   "index_name": "json_extract_test",
   "data": {
      "remote_addr": "127.0.0.1",
      "user": {
         "name": "foo-bar, kim",
         "id": "foobar"
      }
   },
   "time": "2023-02-22 09:37:10"
}

위처럼 되어 있는 경우, kafka_format이 JSONEachRow인 경우에 data > user 안에 있는 name, id는 못 넣는다. 그래서 kafka에 있는 메시지를 그대로 가져와서 큐테이블에 넣고, 그 다음에 컨슈밍시 JSONExtract로 데이터를 넣으려는 테이블에 넣는 방식으로 처리가 가능하다.

kafka에 있는 json 메시지를 그대로 저장할 큐 테이블 생성

CREATE TABLE default.test_json_ext__queue
(
    raw_msg String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka01:9092,kafka02:9092,kafka03:9092',
kafka_topic_list = 'json_extract_test',
kafka_group_name = 'ch_json_extract_test',
kafka_format = 'JSONAsString',
kafka_num_consumers = 5,
kafka_thread_per_consumer = 1,
kafka_max_block_size = 1048576;

큐 테이블에 잘 들어갔나 확인하려면

SET stream_like_engine_allow_direct_select = 1;
SELECT * FROM default.test_json_ext__queue;

데이터가 잘 들어온 걸 확인 했으면, 데이터를 넣을 테이블을 생성

CREATE TABLE default.test_json_ext
(
    index_name String,
    remote_addr String,
    user_name String,
    user_id String,
    time DateTime('Asia/Seoul')
)
ENGINE = MergeTree()
ORDER BY time;

위 테이블에 넣을 수 있게 큐 테이블에서 가져온 걸 JSONExtract 하는 컨슈밍 테이블 생성

CREATE MATERIALIZED VIEW default.test_json_ext__consumer TO default.test_json_ext AS
SELECT
    JSONExtractString(raw_msg, 'index_name') AS index_name,
    JSONExtractString(JSONExtractRaw(raw_msg, 'data'), 'remote_addr') AS remote_addr,
    JSONExtractString(JSONExtractRaw(JSONExtractRaw(raw_msg, 'data'), 'user'), 'name') AS user_name,
    JSONExtractString(JSONExtractRaw(JSONExtractRaw(raw_msg, 'data'), 'user'), 'id') AS user_id,
    toDateTime(JSONExtractString(raw_msg, 'time')) AS time
FROM default.test_json_ext__queue;
반응형