728x90
반응형
개요
- NATS로 publish한 메세지들이 바로 Clickhouse의 테이블에 추가될 수 있도록 NATS 엔진 설정 방법을 알아본다.
1. NATS 메시징 시스템 구축
1.1 NATS 설치
# nats-server 설치
$ curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.18/nats-server-v2.10.18-linux-amd64.zip -o nats-server.zip
$ unzip nats-server.zip -d nats-server
$ sudo cp nats-server/nats-server-v2.10.18-linux-amd64/nats-server /usr/bin/
# nats 설치
$ curl -sf https://binaries.nats.dev/nats-io/natscli/nats@latest | sh
$ sudo mv nats /usr/bin
1.2 NATS Authentication 설정
# context 생성
$ nats context create daeun
NATS Configuration Context "daeun"
Server URLs: nats://127.0.0.1:4222
Path: /home/daeun/.config/nats/context/daeun.json
$ cd /home/daeun/.config/nats/context
$ vim daeun.json
{ "description": "",
"url": "nats://127.0.0.1:4222",
"token": "",
"user": "daeun",
"password": "daeun",
"creds": "",
"nkey": "",
"cert": "",
"key": "",
"ca": "",
"nsc": "",
"jetstream_domain": "",
"jetstream_api_prefix": "",
"jetstream_event_prefix": "",
"inbox_prefix": "",
"user_jwt": ""
}
1.3 NATS 서버 실행
nats-server
1.4 NATS Publish 테스트 코드 구현
from nats.aio.client import Client as NATS
import asyncio
import psutil
from datetime import datetime
import json
import time
async def publish_data():
nc = NATS()
async def connect_nats():
options = {
"servers": ["nats://192.168.0.8:4222"],
"user": "daeun", # 사용자 이름
"password": "daeun" # 비밀번호
}
await nc.connect(**options)
print("Connected to NATS server")
await connect_nats()
def create_data():
return {
"timestamp": time.time_ns(),
"cpu_usage": psutil.cpu_percent(interval=1),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent
}
try:
while True:
data = create_data()
message = json.dumps(data)
await nc.publish("system.info", message.encode())
print(f"Published: {message}")
await asyncio.sleep(5) # 5초마다 데이터 퍼블리시
except KeyboardInterrupt:
print("Exiting...")
await nc.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(publish_data())
2. Clickhouse에 NATS 엔진 연동
2.1 docker-compose를 이용하여 Clickhouse 설치
version: '3.8'
services:
clickhouse:
image: clickhouse/clickhouse-server
volumes:
- /home/daeun/clickhouse/config.xml:/etc/clickhouse-server/config.xml
ports:
- "9000:9000"
- "8123:8123"
networks:
- clickhouse-net
restart: always
networks:
clickhouse-net:
driver: bridge
2.2 config.xml에 NATS 정보 설정
$ vim config.xml
<clickhouse>
<nats>
<user>daeun</user>
<password>daeun</password>
</nats>
...생략...
</clickhouse>
2.3 Clickhouse 컨테이너 내부에서 NATS 엔진 연동하여 테이블 생성
- NATS의 특정 subject로 보낸 데이터를 Clickhouse 테이블로 읽으려면 아래 3가지가 필요하다.
1) NATS 메세지를 ClickHouse 테이블처럼 보이게 만드는 NATS 엔진 테이블
$ docker exec -it [CONTAINER ID] clickhouse-client
:) CREATE TABLE system_queue (
timestamp UInt64,
cpu_usage Float32,
memory_usage Float32,
disk_usage Float32
) ENGINE = NATS SETTINGS nats_url = 'nats://192.168.0.8:4222',
nats_subjects = 'system.info',
nats_format = 'JSONEachRow',
nats_num_consumers = 1,
nats_username = 'daeun',
nats_password = 'daeun';
2) 수집된 데이터의 홈을 제공하는 대상의 MergeTree 테이블
:) CREATE TABLE sysinfo (
timestamp DateTime,
cpu_usage Float32,
memory_usage Float32,
disk_usage Float32)
ENGINE = MergeTree()
ORDER BY timestamp;
3) NATS에서 대상 테이블로 데이터를 자동으로 이동하기 위한 구체화된 뷰
:) CREATE MATERIALIZED VIEW consumer TO sysinfo
AS SELECT toDateTime(toUInt64(divide(timestamp, 1000000000))) AS timestamp,
cpu_usage AS cpu_usage,
memory_usage AS memory_usage,
disk_usage AS disk_usage FROM system_queue;
위 과정을 완료하면 NATS의 'system.info' subject로 publish한 메세지들을 Clickhouse의 sysinfo 테이블에서 확인할 수 있다.
728x90
반응형
'Database > Clickhouse' 카테고리의 다른 글
Clickhouse에서의 Sharding (1) | 2024.11.16 |
---|---|
Clickhouse 개요 및 Docker로 Multi node 구성 (1) | 2024.11.16 |