graph LR
kafka --> chq[clickhouse queue table]
chq --> chc[clickhouse consumer table]
chc --> cht[clickhouse table]
kafka engine으로 메시지를 가져와서 클릭하우스에 넣으면 보통 위와 같은 순서로 데이터를 넣는다.
문제는 ENGINE = Kafka로 지정하고 보통은 kafka_format을 JSONEachRow를 사용해서 데이터를 넣는데, 이게 key, value로 넣다보니 json 구조에 object > object가 있으면 넣질 못한다.
클릭하우스가 JSONExtract 함수를 지원하니까 이걸 쓰면 되긴 하는데, 버전이 낮으면 메모리 누수 문제가 있어서 되도록 안 쓰는 방향으로 컨슈밍을 하고 있었는데, 버전이 올라가면서 이슈가 해결된 거 같아서 JSONExtract 위주로 컨슈밍을 해봤다.
{
"index_name": "json_extract_test",
"data": {
"remote_addr": "127.0.0.1",
"user": {
"name": "foo-bar, kim",
"id": "foobar"
}
},
"time": "2023-02-22 09:37:10"
}
위처럼 되어 있는 경우, kafka_format이 JSONEachRow인 경우에 data > user 안에 있는 name, id는 못 넣는다. 그래서 kafka에 있는 메시지를 그대로 가져와서 큐테이블에 넣고, 그 다음에 컨슈밍시 JSONExtract로 데이터를 넣으려는 테이블에 넣는 방식으로 처리가 가능하다.
kafka에 있는 json 메시지를 그대로 저장할 큐 테이블 생성
CREATE TABLE default.test_json_ext__queue
(
raw_msg String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka01:9092,kafka02:9092,kafka03:9092',
kafka_topic_list = 'json_extract_test',
kafka_group_name = 'ch_json_extract_test',
kafka_format = 'JSONAsString',
kafka_num_consumers = 5,
kafka_thread_per_consumer = 1,
kafka_max_block_size = 1048576;
큐 테이블에 잘 들어갔나 확인하려면
SET stream_like_engine_allow_direct_select = 1;
SELECT * FROM default.test_json_ext__queue;
데이터가 잘 들어온 걸 확인 했으면, 데이터를 넣을 테이블을 생성
CREATE TABLE default.test_json_ext
(
index_name String,
remote_addr String,
user_name String,
user_id String,
time DateTime('Asia/Seoul')
)
ENGINE = MergeTree()
ORDER BY time;
위 테이블에 넣을 수 있게 큐 테이블에서 가져온 걸 JSONExtract 하는 컨슈밍 테이블 생성
CREATE MATERIALIZED VIEW default.test_json_ext__consumer TO default.test_json_ext AS
SELECT
JSONExtractString(raw_msg, 'index_name') AS index_name,
JSONExtractString(JSONExtractRaw(raw_msg, 'data'), 'remote_addr') AS remote_addr,
JSONExtractString(JSONExtractRaw(JSONExtractRaw(raw_msg, 'data'), 'user'), 'name') AS user_name,
JSONExtractString(JSONExtractRaw(JSONExtractRaw(raw_msg, 'data'), 'user'), 'id') AS user_id,
toDateTime(JSONExtractString(raw_msg, 'time')) AS time
FROM default.test_json_ext__queue;
반응형