728x90
반응형
1. Opensearch Ingest Pipeline
1.1 Opensearch Ingest Pipeline이란?
- Ingest pipeline은 데이터를 전처리하는 방법으로 데이터를 색인하기 전에 데이터를 변환하거나 필터링해준다.
- Ingest Node: Opensearch에 데이터를 인덱싱하기 전에 다양한 전처리를 할 수 있는 메커니즘을 제공하는 노드 타입
- Ingest pipeline을 설정하기 위해서는 Ingest Node가 활성화되어있어야 한다.
- Ingest Pipeline을 설정하기 위한 형식은 다음과 같다.
PUT _ingest/pipline/{pipline name}
{
"description" : "...",
"processors" : [ ... ]
}
1.2 Processor
grok 프로세서란?
- grok 프로세서는 pattern 매칭을 사용하여 구조화되지 않은 데이터를 구문 분석하고 구조화하는데 사용된다.
- grok 프로세서를 사용하여 로그 메세지, 웹 서버 액세스 로그, 애플리케이션 로그 및 일관된 형식을 따르는 기타 로그 데이터에서 필드를 추출할 수 있다.
syntax
{
"grok": {
"field": "your_message",
"patterns": ["your_patterns"]
}
}
- 이외에도 Set, Date, Script 등 여러가지 프로세서가 있다.
2. Opensearch Ingest Pipeline 적용
2.1 테스트 로그파일
- 테스트할 Linux 로그 파일이다.
- Opensearch Ingest Pipeline으로 Linux 로그 파일을 전처리할 것이다.
데이터 참고
https://www.kaggle.com/datasets/omduggineni/loghub-linux-log-data
- 일반적으로 timestamp, hostname, source, message 형식을 가지고 있음.
- 아래와 같이 timestamp, hostname, source[pid], message 와 같이 pid가 있는 경우도 있음.
2.2 Index 생성
INDEX_NAME = 'linux_logs'
def create_index(client, index_name, mapping):
if not client.indices.exists(index=index_name):
client.indices.create(index=index_name, body=mapping)
print(f"Index '{index_name}' created")
else:
print(f"Index '{index_name}' already exists")
mapping = {
"mappings": {
"properties": {
"timestamp": {
"type": "date",
"format": "MMM d HH:mm:ss||MMM dd HH:mm:ss"
},
"hostname": { "type": "keyword" },
"source": { "type": "keyword" },
"pid": { "type": "integer" },
"message": { "type": "text" }
}
}
}
create_index(client, INDEX_NAME, mapping) # Opensearch 인덱스 생성
2.3 Ingest Pipeline 생성 및 적용
- PID는 각 로그에 따라 있는 경우도 있고, 없는 경우도 있으므로 '?'를 사용하여 선택적으로 만들도록 정규표현식 패턴 작성
def create_ingest_pipeline(name, pipeline):
try:
response = client.ingest.put_pipeline(id=name, body=pipeline)
print("Pipeline created successfully:", response)
except OpenSearchException as e:
print("Error creating pipeline:", e)
except Exception as e:
print("An unexpected error occurred:", e)
pipeline = {
"description": "Pipeline to parse Linux logs",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{MONTH:month}%{SPACE}%{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:source}(\\[%{NUMBER:pid}\\])?: %{GREEDYDATA:message}"
]
}
},
{
"set": {
"field": "timestamp",
"value": "{{month}} {{day}} {{time}}"
}
},
{
"drop": {
"if": "ctx.timestamp == null"
}
}
]
}
create_ingest_pipeline(PIPELINE_NAME, pipeline)
- 로그 파일 인덱싱
def get_bulk_actions(log_file_path):
actions = []
with open(log_file_path, 'r', encoding='ISO-8859-1') as file:
for idx, line in enumerate(file):
action = {
"index": {
"_index": INDEX_NAME,
"pipeline": PIPELINE_NAME
}
}
actions.append(action)
document = {
"message": line.strip()
}
actions.append(document)
return actions
LOG_FILE_PATH = '/notebook/research/opensearch/log_datasets/Linux.log'
bulk_actions = get_bulk_actions(LOG_FILE_PATH)
response = client.bulk(body=bulk_actions)
2.4 Ingest pipeline 적용 결과 조회
- 조건 없이 데이터 10개 조회
def fetch_data(index_name, query, size=10):
try:
# 데이터 검색
response = client.search(index=index_name, body=query, size=size)
# 검색 결과 출력
hits = response['hits']['hits']
for hit in hits:
print(hit['_source']) # 각 문서의 내용을 출력
except Exception as e:
print("Error occurred:", e)
query = {
"query": {
"match_all": {}
}
}
fetch_data(INDEX_NAME, query)
'''
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'syslog', 'message': 'syslogd startup succeeded', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'syslog', 'message': 'klogd startup succeeded', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'kernel', 'message': 'klogd 1.4.1, log source = /proc/kmsg started.', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'kernel', 'message': 'Linux version 2.6.5-1.358 (bhcompile@bugs.build.redhat.com) (gcc version 3.3.3 20040412 (Red Hat Linux 3.3.3-7)) #1 Sat May 8 09:04:50 EDT 2004', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'kernel', 'message': 'BIOS-provided physical RAM map:', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
... (생략) ...
'''
- 로그에 pid가 존재하는 경우 pid 필드에 대한 값도 잘 저장된 것을 확인할 수 있음.
query = {
"query": {
"exists": {
"field": "pid"
}
}
}
fetch_data(INDEX_NAME, query, size=20)
'''
{'hostname': 'combo', 'month': 'Jun', 'pid': '1677', 'time': '06:06:22', 'source': 'hcid', 'message': 'HCI daemon ver 2.4 started', 'day': '9', 'timestamp': 'Jun 9 06:06:22'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1681', 'time': '06:06:22', 'source': 'sdpd', 'message': 'sdpd v1.5 started', 'day': '9', 'timestamp': 'Jun 9 06:06:22'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1717', 'time': '06:06:23', 'source': 'apmd', 'message': 'Version 3.0.2 (APM BIOS 1.2, Linux driver 1.16ac)', 'day': '9', 'timestamp': 'Jun 9 06:06:23'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'smartd version 5.21 Copyright (C) 2002-3 Bruce Allen', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Home page is http://smartmontools.sourceforge.net/', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Opened configuration file /etc/smartd.conf', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Configuration file /etc/smartd.conf parsed.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Device: /dev/hda, opened', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1717', 'time': '06:06:24', 'source': 'apmd', 'message': 'Charge: * * * (-1% unknown)', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Device: /dev/hda, found in smartd database.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Device: /dev/hda, is SMART capable. Adding to "monitor" list.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Monitoring 1 ATA and 0 SCSI devices', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1757', 'time': '06:06:24', 'source': 'smartd', 'message': 'smartd has fork()ed into background mode. New PID=1757.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:40', 'source': 'xinetd', 'message': 'No such internal service: services/stream - DISABLING', 'day': '9', 'timestamp': 'Jun 9 06:06:40'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'bind failed (Address already in use (errno = 98)). service = telnet', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'Service telnet failed to start and is deactivated.', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'xinetd Version 2.3.13 started with libwrap loadavg options compiled in.', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'Started working: 30 available services', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2054', 'time': '06:06:45', 'source': 'udev', 'message': "creating device node '/udev/lp0'", 'day': '9', 'timestamp': 'Jun 9 06:06:45'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2068', 'time': '06:06:45', 'source': 'gpm', 'message': '*** info [startup.c(95)]:', 'day': '9', 'timestamp': 'Jun 9 06:06:45'}
'''
728x90
반응형
'Database > Opensearch' 카테고리의 다른 글
Opensearch 유사도 검색 (1) | 2024.11.21 |
---|---|
Opensearch Tokenizer, Analyzer와 Custom Analyzer 적용 (3) | 2024.11.20 |
Opensearch 개념과 사용법 정리 (4) | 2024.11.18 |