跳到主要内容

17、ElasticSearch 实战:摄取节点 - 管道(pipeline)介绍

摄取节点的作用

之前在介绍ES集群的时候,曾经介绍过集群中存在一个摄取节点,在数据保存到文档索引之前,我们可能需要对文档进行预处理,而摄取节点会拦截这些请求,根据需要将文档中的内容进行处理然后传递会索引或者API中。

摄取节点的配置

默认配置下所有节点都启用了ingest。因此任何一个ES节点都可以处理ingest任务。就像之前ES集群中描述的,我们可以创建一个专门处理相关业务的ingest节点。

控制节点的ingest开关在其elasticsearch.yml中的参数

## 关闭ingest
node.ingest: false

节点和管道

摄取节点节点对数据的处理主要是通过管道(pipeline),在索引和请求中指定管道参数,这样ingest节点在拦截请求后就指定使用哪条管道进行处理。

管道的操作

创建管道

我们可以使用_ingest/pipelineAPI来创建管道,我们现在创建一个对des设置默认值的管道

PUT _ingest/pipeline/test_pipeline
{
   
     
  "description" : "这是测试的管道内容",
  "processors" : [
    {
   
     
      "set" : {
   
     
        "field": "des",
        "value": "这是管道默认数据"
      }
    }
  ]
}

为管道设置版本号

这里需要注意,根据官方文档的介绍,此版本号没有实际作用,仅仅方便使用者根据其业务环境进行外部的版本管理

PUT _ingest/pipeline/test_pipeline
{
   
     
  "description" : "这是测试的管道内容",
  "version": 100, 
  "processors" : [
    {
   
     
      "set" : {
   
     
        "field": "des",
        "value": "这是管道默认数据"
      }
    }
  ]
}

查询管道

查询索引中的管道

通过GET请求我们可以查询出,当前管道的配置信息

GET _ingest/pipeline/test_pipeline

返回内容

{
   
     
  "test_pipeline" : {
   
     
    "description" : "这是测试的管道内容",
    "version" : 100,
    "processors" : [
      {
   
     
        "set" : {
   
     
          "field" : "des",
          "value" : "这是管道默认数据"
        }
      }
    ]
  }
}

查询管道版本

GET _ingest/pipeline/test_pipeline?filter_path=*.version

返回数据

{
   
     
  "test_pipeline" : {
   
     
    "version" : 100
  }
}

删除管道

删除管道

删除管道类似删除文档之前需要使用DELETE请求调用其插入的API就可以完成

DELETE _ingest/pipeline/test_pipeline

删除所有管道

和文档操作类似,你可以使用*符号匹配所有的管道进行清除。

DELETE _ingest/pipeline/*

使用管道

现在我们存在一个这样的索引

"test_field2": {
   
     
        "aliases": {
   
     },
        "mappings": {
   
     
            "properties": {
   
     
                "channel": {
   
     
                    "type": "nested"
                },
                "des": {
   
     
                    "type": "text"
                },
                "name": {
   
     
                    "type": "keyword"
                }
            }
        },
        "settings": {
   
     
            "index": {
   
     
                "creation_date": "1574774756443",
                "number_of_shards": "1",
                "number_of_replicas": "1",
                "uuid": "RCF17AZOR1GPs84LKw88lA",
                "version": {
   
     
                    "created": "7020099"
                },
                "provided_name": "test_field2"
            }
        }
    }

现在我们尝试向索引中插入数据。

使用已经存在管道

PUT test_field2/_doc/1?pipeline=test_pipeline
{
   
     
    "name": "内容1",
    "desc": "描述1",
    "channel": [
        {
   
     
            "name": "one",
            "num": 33
        },
        {
   
     
            "name": "two",
            "num": 44
        }
    ]
}

然后查询ID为1的数据会得到下面内容

{
   
     
  "_index" : "test_field2",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
   
     
    "des" : "这是管道默认数据",
    "name" : "内容1",
    "channel" : [
      {
   
     
        "num" : 33,
        "name" : "one"
      },
      {
   
     
        "num" : 44,
        "name" : "two"
      }
    ],
    "desc" : "描述1"
  }
}

会发现其des内容并非我们插入的数据,而是管道设置的参数。

从上面的内容可以发现,管道其实很像是我们平时使用的拦截器操作,会拦截一些操作然后对其进行修改。

请求数据的时候定义管道

除了上面的使用方式,我们也可以在请求数据的时候使用管道

PUT test_field2/_doc/5
{
   
     
    "pipeline": {
   
     
        "description": "这是自定义的管道内容",
        "processors": [
            {
   
     
                "set": {
   
     
                    "field": "des",
                    "value": "这是管道默认数据"
                }
            }
        ]
    },
    "docs": [
        {
   
     
            "_source": {
   
     
                "name": "内容1",
                "desc": "描述1",
                "channel": [
                    {
   
     
                        "name": "one",
                        "num": 33
                    },
                    {
   
     
                        "name": "two",
                        "num": 44
                    }
                ]
            }
        }
    ]
}

此时我们尝试查询数据会发现数据使用的是自定义的管道参数

{
   
     
  "_index" : "test_field2",
  "_type" : "_doc",
  "_id" : "5",
  "_version" : 1,
  "_seq_no" : 2,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
   
     
    "pipeline" : {
   
     
      "description" : "这是自定义的管道内容",
      "processors" : [
        {
   
     
          "set" : {
   
     
            "field" : "des",
            "value" : "这是管道默认数据"
          }
        }
      ]
    },
    "docs" : [
      {
   
     
        "_source" : {
   
     
          "name" : "内容1",
          "desc" : "描述1",
          "channel" : [
            {
   
     
              "name" : "one",
              "num" : 33
            },
            {
   
     
              "name" : "two",
              "num" : 44
            }
          ]
        }
      }
    ]
  }
}

模拟管道

当然官方也提供了_simulate 方法让你模拟数据在经过了管道的处理后所得到的结果

POST _ingest/pipeline/_simulate
{
   
     
  "pipeline" :
  {
   
     
    "description": "_description",
    "processors": [
      {
   
     
        "set" : {
   
     
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
   
     
      "_index": "index",
      "_id": "id",
      "_source": {
   
     
        "foo": "bar"
      }
    },
    {
   
     
      "_index": "index",
      "_id": "id",
      "_source": {
   
     
        "foo": "rab"
      }
    }
  ]
}

返回结果

{
   
     
   "docs": [
      {
   
     
         "doc": {
   
     
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
   
     
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
   
     
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
   
     
         "doc": {
   
     
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
   
     
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
   
     
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

查看管道对文件的影响

POST _ingest/pipeline/_simulate?verbose
{
   
     
  "pipeline" :
  {
   
     
    "description": "_description",
    "processors": [
      {
   
     
        "set" : {
   
     
          "field" : "field2",
          "value" : "_value2"
        }
      },
      {
   
     
        "set" : {
   
     
          "field" : "field3",
          "value" : "_value3"
        }
      }
    ]
  },
  "docs": [
    {
   
     
      "_index": "index",
      "_id": "id",
      "_source": {
   
     
        "foo": "bar"
      }
    },
    {
   
     
      "_index": "index",
      "_id": "id",
      "_source": {
   
     
        "foo": "rab"
      }
    }
  ]
}

返回内容

{
   
     
   "docs": [
      {
   
     
         "processor_results": [
            {
   
     
               "doc": {
   
     
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
   
     
                     "field2": "_value2",
                     "foo": "bar"
                  },
                  "_ingest": {
   
     
                     "timestamp": "2017-05-04T22:46:09.674Z"
                  }
               }
            },
            {
   
     
               "doc": {
   
     
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
   
     
                     "field3": "_value3",
                     "field2": "_value2",
                     "foo": "bar"
                  },
                  "_ingest": {
   
     
                     "timestamp": "2017-05-04T22:46:09.675Z"
                  }
               }
            }
         ]
      },
      {
   
     
         "processor_results": [
            {
   
     
               "doc": {
   
     
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
   
     
                     "field2": "_value2",
                     "foo": "rab"
                  },
                  "_ingest": {
   
     
                     "timestamp": "2017-05-04T22:46:09.676Z"
                  }
               }
            },
            {
   
     
               "doc": {
   
     
                  "_id": "id",
                  "_index": "index",
                  "_type": "_doc",
                  "_source": {
   
     
                     "field3": "_value3",
                     "field2": "_value2",
                     "foo": "rab"
                  },
                  "_ingest": {
   
     
                     "timestamp": "2017-05-04T22:46:09.677Z"
                  }
               }
            }
         ]
      }
   ]
}

ps. 本文中所涉及的API以及文档内容基于7.2版本。