Elasticsearch에 Insert 되는 데이터를 주기적으로 MongoDB 저장하는 방법

2017. 12. 26. 11:26서버 프로그래밍

모든 raw 데이터를 Elasticsearch에 저장하고 있는 시스템에서, 가급적 Elasticsearch 자체를 건드리지 않는 방법으로 새로 Insert 되는 데이터에 대해서만 MongoDB 샤딩 시스템에 저장하는 방법을 검토해보았다.

가급적 구현하기 쉬우면서 Elasticsearch에 부하를 주지 않는 방법이 필요해서, Elasticsearch에서 Trigger를 제공하는지 알아 보았다. Elasticsearch에서는 Insert 이벤트가 발생하면 바로 동작하는 트리거를 제공하지는 않고, 주기적으로 스케쥴링 되는 Watcher를 제공하는 것으로 보인다.

https://www.elastic.co/guide/en/watcher/current/trigger.html


그래서, Node.js를 이용하여 사용할 수 있는 Elasticsearch watcher를 찾았다. 기존에 이미 구동되고 있는 Elasticsearch 시스템을 수정하지 않고, 외부에서 주기적으로 감시하면서 Insert된 도큐먼트를 추출할 수 있기 때문에 적당한 방법이라고 판단된다.

https://github.com/Gustu/elasticsearch-nodejs-watcher


Elasticsearch와 연동되는 kibana의 DevTools를 이용하여 데이터 구조에 맞는 쿼리를 테스트해보았다.

당연하게도 Elasticsearch NodeJS Watcher의 샘플 소스에 명시된 쿼리만으로는 원하는 결과를 얻을 수 있기 때문이다.

GET index-1234567890/_search

{

  "size":10000,

  "query": {

      "bool": {

          "must": {

            "match_all": {}

          },

          "filter": {

            "range": {"created_at": {"gte": "now-30s"}}

          }

      }

  }

}


최종적으로 다음과 같은 방법으로 30초 이내에 Insert된 도큐먼트를 추출해서 MongoDB에 저장하는 기능을 구현할 수 있었다.

* size를 10000 (최대값)으로 지정해주지 않으면, 디폴트 값인 10이 적용되어 10개씩의 도큐먼트만 리턴된다.

* 예제 소스를 보면 range 다음에 @timestamp라고 되어 있어서 고유한 함수가 있는 줄 알았는데, 해당 부분에 날자 정보를 포함하고 있는 키 이름을 입력해주어야 한다.


const elasticWatcher = require("elasticsearch-nodejs-watcher");

const connection = {

    host: 'http://elasticsearch server ip:9200',

    log: 'trace'

};

const watcher = {

    schedule: "*/30 * * * * *",

    query: {

        index: 'index-*',

        body: {

            size: 10000,

            query: {

                bool: {

                    must: {

                      match_all: {}

                    },

                    filter: {

                      range: {"created_at": {gte: "now-30s"}}

                    }

                }

            }

        }

    },

    predicate: ({hits: {total}}) => total > 0,

    action: data => { 

          MongoDB 저장 로직

    },

    errorHandler: err => console.log(JSON.stringify(err))

};

elasticWatcher.schedule(connection, watcher);