Database/Opensearch

Opensearch Ingest pipeline으로 로그파일 전처리

daeunnniii 2024. 11. 19. 21:23
728x90
반응형

1. Opensearch Ingest Pipeline

1.1 Opensearch Ingest Pipeline이란?

  • Ingest pipeline은 데이터를 전처리하는 방법으로 데이터를 색인하기 전에 데이터를 변환하거나 필터링해준다.
  • Ingest Node: Opensearch에 데이터를 인덱싱하기 전에 다양한 전처리를 할 수 있는 메커니즘을 제공하는 노드 타입
    • Ingest pipeline을 설정하기 위해서는 Ingest Node가 활성화되어있어야 한다.
  • Ingest Pipeline을 설정하기 위한 형식은 다음과 같다.
PUT _ingest/pipline/{pipline name}
{
  "description" : "...",
  "processors" : [ ... ]
}

1.2 Processor

grok 프로세서란?

  • grok 프로세서는 pattern 매칭을 사용하여 구조화되지 않은 데이터를 구문 분석하고 구조화하는데 사용된다.
  • grok 프로세서를 사용하여 로그 메세지, 웹 서버 액세스 로그, 애플리케이션 로그 및 일관된 형식을 따르는 기타 로그 데이터에서 필드를 추출할 수 있다.

syntax

{
  "grok": {
    "field": "your_message",
    "patterns": ["your_patterns"]
  }
}
  • 이외에도 Set, Date, Script 등 여러가지 프로세서가 있다.

2. Opensearch Ingest Pipeline 적용

2.1 테스트 로그파일

  • 테스트할 Linux 로그 파일이다.
  • Opensearch Ingest Pipeline으로 Linux 로그 파일을 전처리할 것이다.

데이터 참고

https://www.kaggle.com/datasets/omduggineni/loghub-linux-log-data

 

LogHub - Linux Log Data

 

www.kaggle.com

  • 일반적으로 timestamp, hostname, source, message 형식을 가지고 있음.

  • 아래와 같이 timestamp, hostname, source[pid], message 와 같이 pid가 있는 경우도 있음.

 

2.2 Index 생성

INDEX_NAME = 'linux_logs'
 
def create_index(client, index_name, mapping):
    if not client.indices.exists(index=index_name):
        client.indices.create(index=index_name, body=mapping)
        print(f"Index '{index_name}' created")
    else:
        print(f"Index '{index_name}' already exists")
 
mapping = {
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "MMM d HH:mm:ss||MMM dd HH:mm:ss"
      },
      "hostname": { "type": "keyword" },
      "source": { "type": "keyword" },
      "pid": { "type": "integer" },
      "message": { "type": "text" }
    }
  }
}
 
create_index(client, INDEX_NAME, mapping)   # Opensearch 인덱스 생성

 

2.3 Ingest Pipeline 생성 및 적용

  • PID는 각 로그에 따라 있는 경우도 있고, 없는 경우도 있으므로 '?'를 사용하여 선택적으로 만들도록 정규표현식 패턴 작성
def create_ingest_pipeline(name, pipeline):
    try:
        response = client.ingest.put_pipeline(id=name, body=pipeline)
        print("Pipeline created successfully:", response)
    except OpenSearchException as e:
        print("Error creating pipeline:", e)
    except Exception as e:
        print("An unexpected error occurred:", e)
 
pipeline = {
  "description": "Pipeline to parse Linux logs",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{MONTH:month}%{SPACE}%{MONTHDAY:day} %{TIME:time} %{HOSTNAME:hostname} %{WORD:source}(\\[%{NUMBER:pid}\\])?: %{GREEDYDATA:message}"
        ]
      }
    },
    {
      "set": {
        "field": "timestamp",
        "value": "{{month}} {{day}} {{time}}"
      }
    },
    {
      "drop": {
        "if": "ctx.timestamp == null"
      }
    }
  ]
}
 
create_ingest_pipeline(PIPELINE_NAME, pipeline)
  • 로그 파일 인덱싱
def get_bulk_actions(log_file_path):
    actions = []
    with open(log_file_path, 'r', encoding='ISO-8859-1') as file:
        for idx, line in enumerate(file):
            action = {
                "index": {
                    "_index": INDEX_NAME,
                    "pipeline": PIPELINE_NAME
                }
            }
            actions.append(action)
            document = {
                "message": line.strip()
            }
            actions.append(document)
    return actions
 
LOG_FILE_PATH = '/notebook/research/opensearch/log_datasets/Linux.log'
bulk_actions = get_bulk_actions(LOG_FILE_PATH)
 
response = client.bulk(body=bulk_actions)

 

2.4 Ingest pipeline 적용 결과 조회

  • 조건 없이 데이터 10개 조회
def fetch_data(index_name, query, size=10):
    try:
        # 데이터 검색
        response = client.search(index=index_name, body=query, size=size)
         
        # 검색 결과 출력
        hits = response['hits']['hits']
        for hit in hits:
            print(hit['_source'])  # 각 문서의 내용을 출력
         
    except Exception as e:
        print("Error occurred:", e)
 
 
query = {
    "query": {
        "match_all": {}
    }
}
fetch_data(INDEX_NAME, query)
 
'''
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'syslog', 'message': 'syslogd startup succeeded', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'syslog', 'message': 'klogd startup succeeded', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'kernel', 'message': 'klogd 1.4.1, log source = /proc/kmsg started.', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'kernel', 'message': 'Linux version 2.6.5-1.358 (bhcompile@bugs.build.redhat.com) (gcc version 3.3.3 20040412 (Red Hat Linux 3.3.3-7)) #1 Sat May 8 09:04:50 EDT 2004', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
{'hostname': 'combo', 'month': 'Jun', 'time': '06:06:20', 'source': 'kernel', 'message': 'BIOS-provided physical RAM map:', 'day': '9', 'timestamp': 'Jun 9 06:06:20'}
... (생략) ...
 
'''
  • 로그에 pid가 존재하는 경우 pid 필드에 대한 값도 잘 저장된 것을 확인할 수 있음.
query = {
  "query": {
    "exists": {
      "field": "pid"
    }
  }
}
fetch_data(INDEX_NAME, query, size=20)
 
'''
{'hostname': 'combo', 'month': 'Jun', 'pid': '1677', 'time': '06:06:22', 'source': 'hcid', 'message': 'HCI daemon ver 2.4 started', 'day': '9', 'timestamp': 'Jun 9 06:06:22'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1681', 'time': '06:06:22', 'source': 'sdpd', 'message': 'sdpd v1.5 started', 'day': '9', 'timestamp': 'Jun 9 06:06:22'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1717', 'time': '06:06:23', 'source': 'apmd', 'message': 'Version 3.0.2 (APM BIOS 1.2, Linux driver 1.16ac)', 'day': '9', 'timestamp': 'Jun 9 06:06:23'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'smartd version 5.21 Copyright (C) 2002-3 Bruce Allen', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Home page is http://smartmontools.sourceforge.net/', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Opened configuration file /etc/smartd.conf', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Configuration file /etc/smartd.conf parsed.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Device: /dev/hda, opened', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1717', 'time': '06:06:24', 'source': 'apmd', 'message': 'Charge: * * * (-1% unknown)', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Device: /dev/hda, found in smartd database.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Device: /dev/hda, is SMART capable. Adding to "monitor" list.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1755', 'time': '06:06:24', 'source': 'smartd', 'message': 'Monitoring 1 ATA and 0 SCSI devices', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '1757', 'time': '06:06:24', 'source': 'smartd', 'message': 'smartd has fork()ed into background mode. New PID=1757.', 'day': '9', 'timestamp': 'Jun 9 06:06:24'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:40', 'source': 'xinetd', 'message': 'No such internal service: services/stream - DISABLING', 'day': '9', 'timestamp': 'Jun 9 06:06:40'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'bind failed (Address already in use (errno = 98)). service = telnet', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'Service telnet failed to start and is deactivated.', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'xinetd Version 2.3.13 started with libwrap loadavg options compiled in.', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2003', 'time': '06:06:41', 'source': 'xinetd', 'message': 'Started working: 30 available services', 'day': '9', 'timestamp': 'Jun 9 06:06:41'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2054', 'time': '06:06:45', 'source': 'udev', 'message': "creating device node '/udev/lp0'", 'day': '9', 'timestamp': 'Jun 9 06:06:45'}
{'hostname': 'combo', 'month': 'Jun', 'pid': '2068', 'time': '06:06:45', 'source': 'gpm', 'message': '*** info [startup.c(95)]:', 'day': '9', 'timestamp': 'Jun 9 06:06:45'}
 
'''

 

 

 

728x90
반응형