Logstash 하나의 이벤트를 다수의 이벤트로

Logstash를 통하여 데이터를 수집하다보니…

하나의 이벤트를 수신하였을 때 해당 이벤트를 나누어 다수의 이벤트로 저장하는 것이 후에 그래프를 생성하거나 데이터를 살펴볼 때 편리한 경우가 발생하였다.

예를 들어 아래와 같이 리스트 형태의 데이터가 있다고 가정해보자.

{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "data" => [
    [0] {
           "message" => "test message 0"
         },
    [1] {
           "message" => "test message 1"
         },
    [2] {
           "message" => "test message 2"
         }
  ]
}

위의 데이터 관리를 쉽게 하기 위해서는 아래와 같이

{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "meesage" => "test message 0"
}
{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "meesage" => "test message 1"
}
{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "meesage" => "test message 2"
}

이벤트를 나누는게 관리하기가 확실히 편하다. 물론 리스트 형태로 데이터를 저장해도 되고 나중에 관리 방법이 없는 건 아니다. 하지만 향후 편리성을 생각한다면 미리 분할해서 넣는것도 좋은 방법이라 생각이 들었다.

그래서 방법을 고민하다가 pipeline을 이용하여 재귀적으로 동작하는 파이프라인을 구성하였다.

먼저 pipeline을 3가지로 만들었다.

1. 데이터 읽기 파이프라인
2. 데이터 스플릿 파이프라인
3. 데이터 쓰기 파이프라인

먼저 데이터 읽기 파이프라인은 아래와 같은 형태로 만들었다.

input {
  # 데이터 입력부는 상황에 따라 다르므로 생략
}
filter {
  # 데이터 전처리는 이 부분에서, 여기서는 생략
}
output {
  pipeline {
    send_to => spliter_pipeline
  }
}

입력과 필터 부분은 사실 상황에 따라 다르므로 생략하고 핵심은 데이터를 받아서 전처리를 수행하고 spliter_pipeline으로 전달한다는 것이다. 그럼 다음으로 데이터 스플릿 파이프라인은 아래와 같이 구현하였다.

input {
  pipeline {
    address => spliter_pipeline
  }
}
filter {
  ruby {
    code => 'event.set("data_length", event.get("data").length)'
  }
  if [data_length] == 0 {
    drop { }
  }
  clone {
    clones => [ "parsed" ]
    add_field => { "message" => "%{[data][0][message]}" }
    remove_field => [ "data", "data_length" ]
  }
  if [type] != "parsed" {
    mutate {
      remove_field => [ "[data][0]", "data_length" ]
    }
  }
}
output {
  if [type] != "parsed" {
    pipeline {
      send_to => spliter_pipeline
    }
  } else {
    pipeline {
      send_to => writer_pipeline
    }
  }
}

하나 하나 뜯어보면, 먼저 input은 spliter_pipeline으로 받는다. 즉, 전처리가 된 데이터가 이쪽으로 들어오게 된다. 전처리가 된 데이터는 처음 예제와 같이

{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "data" => [
    [0] {
           "message" => "test message 0"
         },
    [1] {
           "message" => "test message 1"
         },
    [2] {
           "message" => "test message 2"
         }
  ]
}

리스트 형태의 데이터가 들어온다. 그럼 첫번째 필터 부분을 보자.

  ruby {
    code => 'event.set("data_length", event.get("data").length)'
  }
  if [data_length] == 0 {
    drop { }
  }

첫번째 필터 부분을 보면 루비 코드로 현재 “data” 리스트의 길이를 구하고, 그 길이를 “data_length” 값으로 저장을 한다. 만약 “data_length”, “data” 리스트의 길이가 0이라면 해당 이벤트는 지워버린다.

다음 필터 부분은 clone 필터를 이용하여 이벤트를 복제한다.

  clone {
    clones => [ "parsed" ]
    add_field => { "message" => "%{[data][0][message]}" }
    remove_field => [ "data", "data_length" ]
  }

이벤트를 복제하여 해당 이벤트의 “type”에는 “parsed”라고 지정하고, “message”에는 “data”의 0번째 “message”를 입력한다. 그리고 “data”, “data_length”를 지워준다.

다음 필터 부분은 원본 이벤트의 “data”에서 복제된 0번 데이터를 제거하고 “data_length” 값을 삭제해준다.

  if [type] != "parsed" {
    mutate {
      remove_field => [ "[data][0]", "data_length" ]
    }
  }

if 구문에 의해 복제된 이벤트가 아닌 원본 이벤트를 수정하게 되고, “data”의 0번째 데이터를 삭제하고, data_length를 삭제하여 초기화 해준다.

실제 필터 부분을 예제로 본다면 먼저 들어오는 이벤트는

{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "data" => [
    [0] {
           "message" => "test message 0"
         },
    [1] {
           "message" => "test message 1"
         },
    [2] {
           "message" => "test message 2"
         }
  ]
}

복제된 이벤트의 원본은

{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "data" => [
    [0] {
           "message" => "test message 1"
         },
    [1] {
           "message" => "test message 2"
         }
  ]
}

복제된 이벤트는

{
  "@timestamp" => 2022-09-05T03:26:51.973Z,
  "@version" => 1,
  "type" => "parsed",
  "message" => "test_message 0"
}

의 형태를 가지게 된다.

여기서 재귀적으로 동작하게 output을 아래와 같이 선언하였다.

output {
  if [type] != "parsed" {
    pipeline {
      send_to => spliter_pipeline
    }
  } else {
    pipeline {
      send_to => writer_pipeline
    }
  }
}

만약 “type” 값에 “parsed”라고 선언되어 있지 않으면 다시 spliter_pipeline으로 보내고, “parsed”라고 선언되어 있으면 “writer_pipeline”으로 보내는 것이다.

그림으로 간단하게 표현하면

와 같이 되는 것이다. 사실 이 상태로 바로 Elastic Search에 데이터를 추가하여도 상관없지만 기존 다수의 룰이 별도의 writer를 두고 있기 때문에 데이터 쓰기를 별도로 아래와 같이 구현하였다.

input {
  pipeline {
    address => writer_pipeline
  }
}
filter {
  mutate {
    remove_field => [ "type" ]
  }
}
output {
  elasticsearch {
    #설정은 생략
  }
}

이렇게 다수의 pipeline으로 구성하여 데이터를 넘겨주는 형태를 취하면 재귀적인 동작과 같이 필요한 동작을 구현할 수가 있다.

생각해보면 어렵지 않은 방법이고 심지어 이전에 사용해봤던 방법임에도 30분정도 고민하고 있었기에 기록으로 남겨둔다.

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다