Database/Clickhouse

ClickHouse에 NATS 엔진 설정 방법

daeunnniii 2024. 11. 23. 00:38
728x90
반응형

개요

  • NATS로 publish한 메세지들이 바로 Clickhouse의 테이블에 추가될 수 있도록 NATS 엔진 설정 방법을 알아본다.

1. NATS 메시징 시스템 구축

1.1 NATS 설치

# nats-server 설치
$ curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.18/nats-server-v2.10.18-linux-amd64.zip -o nats-server.zip
$ unzip nats-server.zip -d nats-server
$ sudo cp nats-server/nats-server-v2.10.18-linux-amd64/nats-server /usr/bin/
 
# nats 설치
$ curl -sf https://binaries.nats.dev/nats-io/natscli/nats@latest | sh
$ sudo mv nats /usr/bin

 

1.2 NATS Authentication 설정

# context 생성
$ nats context create daeun
NATS Configuration Context "daeun"
 
  Server URLs: nats://127.0.0.1:4222
         Path: /home/daeun/.config/nats/context/daeun.json
 
$ cd /home/daeun/.config/nats/context
 
$ vim daeun.json
{  "description": "",
  "url": "nats://127.0.0.1:4222",
  "token": "",
  "user": "daeun",
  "password": "daeun",
  "creds": "",
  "nkey": "",
  "cert": "",
  "key": "",
  "ca": "",
  "nsc": "",
  "jetstream_domain": "",
  "jetstream_api_prefix": "",
  "jetstream_event_prefix": "",
  "inbox_prefix": "",
  "user_jwt": ""
}

 

1.3 NATS 서버 실행

nats-server

 

1.4 NATS Publish 테스트 코드 구현

from nats.aio.client import Client as NATS
import asyncio
import psutil
from datetime import datetime
import json
import time

async def publish_data():
    nc = NATS()
 
    async def connect_nats():
        options = {
            "servers": ["nats://192.168.0.8:4222"],
            "user": "daeun",       # 사용자 이름
            "password": "daeun" # 비밀번호
        }
        await nc.connect(**options)
        print("Connected to NATS server")
 
    await connect_nats()
 
    def create_data():
        return {
            "timestamp": time.time_ns(),
            "cpu_usage": psutil.cpu_percent(interval=1),
            "memory_usage": psutil.virtual_memory().percent,
            "disk_usage": psutil.disk_usage('/').percent
        }
 
    try:
        while True:
            data = create_data()
            message = json.dumps(data)
            await nc.publish("system.info", message.encode())
            print(f"Published: {message}")
            await asyncio.sleep(5)  # 5초마다 데이터 퍼블리시
    except KeyboardInterrupt:
        print("Exiting...")
 
    await nc.close()
 
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(publish_data())

 

2. Clickhouse에 NATS 엔진 연동

2.1 docker-compose를 이용하여 Clickhouse 설치

version: '3.8'
 
services:
  clickhouse:
    image: clickhouse/clickhouse-server
    volumes:
      - /home/daeun/clickhouse/config.xml:/etc/clickhouse-server/config.xml
    ports:
      - "9000:9000"
      - "8123:8123"
    networks:
      - clickhouse-net
    restart: always
 
networks:
  clickhouse-net:
    driver: bridge

 

2.2 config.xml에 NATS 정보 설정

$ vim config.xml
 
<clickhouse>
    <nats>
        <user>daeun</user>
        <password>daeun</password>
    </nats>
 
    ...생략...
</clickhouse>

 

2.3 Clickhouse 컨테이너 내부에서 NATS 엔진 연동하여 테이블 생성

  • NATS의 특정 subject로 보낸 데이터를 Clickhouse 테이블로 읽으려면 아래 3가지가 필요하다.

1) NATS 메세지를 ClickHouse 테이블처럼 보이게 만드는 NATS 엔진 테이블

$ docker exec -it [CONTAINER ID] clickhouse-client
 
:) CREATE TABLE system_queue (
    timestamp UInt64,
    cpu_usage Float32,
    memory_usage Float32,
    disk_usage Float32
) ENGINE = NATS SETTINGS nats_url = 'nats://192.168.0.8:4222',
           nats_subjects = 'system.info',
           nats_format = 'JSONEachRow',
           nats_num_consumers = 1,
           nats_username = 'daeun',
           nats_password = 'daeun';

 

2) 수집된 데이터의 홈을 제공하는 대상의 MergeTree 테이블

:) CREATE TABLE sysinfo (
    timestamp DateTime,
    cpu_usage Float32,
    memory_usage Float32,
    disk_usage Float32)
ENGINE = MergeTree()
ORDER BY timestamp;

 

3) NATS에서 대상 테이블로 데이터를 자동으로 이동하기 위한 구체화된

:) CREATE MATERIALIZED VIEW consumer TO sysinfo
     AS SELECT toDateTime(toUInt64(divide(timestamp, 1000000000))) AS timestamp,
     cpu_usage AS cpu_usage,
     memory_usage AS memory_usage,
     disk_usage AS disk_usage FROM system_queue;

 

위 과정을 완료하면 NATS의 'system.info' subject로 publish한 메세지들을 Clickhouse의 sysinfo 테이블에서 확인할 수 있다.

 

728x90
반응형

'Database > Clickhouse' 카테고리의 다른 글

Clickhouse에서의 Sharding  (1) 2024.11.16
Clickhouse 개요 및 Docker로 Multi node 구성  (1) 2024.11.16