抖音网友 發表於 2019-9-17 16:04:00

go对elasticsearch的增删改查

<h2 id="环境">环境</h2>
<p>elasticsearch 6.8 (6.x版本应该都没问题)</p>
<p>go客户端sdk: github.com/elastic/go-elasticsearch/v6</p>
<p>其实自己封装api也行,反正elasticsearch对外交互的协议是restful接口</p>
<h2 id="注意点">注意点</h2>
<p>发起的请求,如果成功了,一定要记得关闭返回Response的Body,否则会占用一个连接。</p>
<h2 id="全局变量和函数">全局变量和函数</h2>
<pre><code class="language-go">var c *elasticsearch.Client

func init() {
    var err error
    config := elasticsearch.Config{}
    config.Addresses = []string{"http://127.0.0.1:9200"}
    c, err = elasticsearch.NewClient(config)
    checkError(err)
}

func checkError(err error) {
    if err != nil {
      fmt.Println(err)
      os.Exit(1)
    }   
}
</code></pre>
<h2 id="创建索引">创建索引</h2>
<pre><code class="language-go">func createIndex() {
    body := mapinterface{}{
      "mappings": mapinterface{}{
            "test_type": mapinterface{}{
                "properties": mapinterface{}{
                  "str": mapinterface{}{
                        "type": "keyword",   // 表示这个字段不分词
                  },
                },
            },
      },
    }
    jsonBody, _ := json.Marshal(body)
    req := esapi.IndicesCreateRequest{
      Index: "test_index",
      Body:bytes.NewReader(jsonBody),
    }
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"acknowledged":true,"shards_acknowledged":true,"index":"test_index"}</p>
<h2 id="删除索引">删除索引</h2>
<pre><code class="language-go">func deleteIndex() {
    req := esapi.IndicesDeleteRequest{
      Index: []string{"test_index"},
    }
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"acknowledged":true}</p>
<h2 id="往索引插入数据">往索引插入数据</h2>
<h3 id="插入单条数据">插入单条数据</h3>
<pre><code class="language-go">func insertSingle() {
    body := mapinterface{}{
      "num": 0,
      "v":   0,
      "str": "test",
    }
    jsonBody, _ := json.Marshal(body)

    req := esapi.CreateRequest{    // 如果是esapi.IndexRequest则是插入/替换
      Index:      "test_index",
      DocumentType: "test_type",
      DocumentID:   "test_1",
      Body:         bytes.NewReader(jsonBody),
    }
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"_index":"test_index","_type":"test_type","_id":"test_1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1}</p>
<h3 id="批量插入很明显也可以批量做其他操作">批量插入(很明显,也可以批量做其他操作)</h3>
<pre><code class="language-go">func insertBatch() {
    var bodyBuf bytes.Buffer
    for i := 2; i &lt; 10; i++ {
      createLine := mapinterface{}{
            "create": mapinterface{}{
                "_index": "test_index",
                "_id":    "test_" + strconv.Itoa(i),
                "_type":"test_type",
            },
      }
      jsonStr, _ := json.Marshal(createLine)
      bodyBuf.Write(jsonStr)
      bodyBuf.WriteByte('\n')

      body := mapinterface{}{
            "num": i % 3,
            "v":   i,
            "str": "test" + strconv.Itoa(i),
      }
      jsonStr, _ = json.Marshal(body)
      bodyBuf.Write(jsonStr)
      bodyBuf.WriteByte('\n')
    }

    req := esapi.BulkRequest{
      Body: &amp;bodyBuf,
    }
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"took":31,"errors":false,"items":[{"create":{"_index":"test_index","_type":"test_type","_id":"test_2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_3","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_4","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_5","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_6","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_7","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_8","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}},{"create":{"_index":"test_index","_type":"test_type","_id":"test_9","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1,"status":201}}]}</p>
<h2 id="查询">查询</h2>
<h3 id="通过sql查询">通过sql查询</h3>
<pre><code class="language-go">func selectBySql() {
    query := mapinterface{}{
      "query": "select count(*) as cnt, max(v) as value, num from test_index where num &gt; 0 group by num",
    }
    jsonBody, _ := json.Marshal(query)
    req := esapi.XPackSQLQueryRequest{
      Body: bytes.NewReader(jsonBody),
    }
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"columns":[{"name":"cnt","type":"long"},{"name":"value","type":"long"},{"name":"num","type":"long"}],"rows":[,],"cursor":"q47zAgFjAQp0ZXN0X2luZGV4igEBAQljb21wb3NpdGUHZ3JvdXBieQEDbWF4Ajg2AAD/AQF2AAAA/wAA/wEAAjc4AQNudW0AAAH/AADoBwEKAQI3OAIAAAAAAAAAAgACAQAAAAABAP////8PAAAAAAEFcmFuZ2U/gAAAAANudW0BAAAAAP8AAAAAAAAAAAAAAAABWgMAAgIAAAAAAAH/////DwMBawI3OAEBWgABbQI4NgV2YWx1ZQAAAVoBawI3OAABWgABBw=="}</p>
<h3 id="通过search-api查询">通过Search Api查询</h3>
<pre><code class="language-go">func selectBySearch() {
    query := mapinterface{}{
      "query": mapinterface{}{
            "bool": mapinterface{}{
                "filter": mapinterface{}{
                  "range": mapinterface{}{
                        "num": mapinterface{}{
                            "gt": 0,
                        },
                  },
                },
            },
      },
      "size": 0,
      "aggs": mapinterface{}{
            "num": mapinterface{}{
                "terms": mapinterface{}{
                  "field": "num",
                  //"size":1,
                },
                "aggs": mapinterface{}{
                  "max_v": mapinterface{}{
                        "max": mapinterface{}{
                            "field": "v",
                        },
                  },
                },
            },
      },
    }   
    jsonBody, _ := json.Marshal(query)
   
    req := esapi.SearchRequest{
      Index:      []string{"test_index"},
      DocumentType: []string{"test_type"},
      Body:         bytes.NewReader(jsonBody),
    }   
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"took":10,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":5,"max_score":0.0,"hits":[]},"aggregations":{"num":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":2,"doc_count":3,"max_v":{"value":8.0}},{"key":1,"doc_count":2,"max_v":{"value":7.0}}]}}}</p>
<p>但是elasticsearch对聚合查询分页并不是很友好,基本上都是得自己手动分页。</p>
<h2 id="局部更新批量更新略">局部更新(批量更新略)</h2>
<h3 id="根据id更新">根据id更新</h3>
<pre><code class="language-go">func updateSingle() {         
    body := mapinterface{}{
      "doc": mapinterface{}{
            "v": 100,   
      },
    }                     
    jsonBody, _ := json.Marshal(body)
    req := esapi.UpdateRequest{
      Index:      "test_index",
      DocumentType: "test_type",
      DocumentID:   "test_1",
      Body:         bytes.NewReader(jsonBody),
    }                     
                        
    res, err := req.Do(context.Background(), c)
    checkError(err)      
    defer res.Body.Close()
    fmt.Println(res.String())               
}
</code></pre>
<p> {"_index":"test_index","_type":"test_type","_id":"test_1","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":3,"_primary_term":1}</p>
<p>除了doc方式之外,还有script方式</p>
<h3 id="根据条件更新">根据条件更新</h3>
<pre><code class="language-go">func updateByQuery() {
    body := mapinterface{}{
      "script": mapinterface{}{
            "lang": "painless",
            "source": `
                ctx._source.v = params.value;
            `,
            "params": mapinterface{}{
                "value": 101,
            },
      },
      "query": mapinterface{}{
            "match_all": mapinterface{}{},
      },
    }
    jsonBody, _ := json.Marshal(body)
    req := esapi.UpdateByQueryRequest{
      Index: []string{"test_index"},
      Body:bytes.NewReader(jsonBody),
    }
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"took":109,"timed_out":false,"total":9,"updated":9,"deleted":0,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}</p>
<h2 id="删除">删除</h2>
<h3 id="根据id删除">根据id删除</h3>
<pre><code class="language-go">func deleteSingle() {
    req := esapi.DeleteRequest{
      Index:      "test_index",
      DocumentType: "test_type",
      DocumentID:   "test_1",
    }   

    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}       
</code></pre>
<p> {"_index":"test_index","_type":"test_type","_id":"test_1","_version":6,"result":"deleted","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":7,"_primary_term":1}</p>
<h3 id="根据条件删除">根据条件删除</h3>
<pre><code class="language-go">func deleteByQuery() {
    body := mapinterface{}{
      "query": mapinterface{}{
            "match_all": mapinterface{}{},
      },
    }
    jsonBody, _ := json.Marshal(body)
    req := esapi.DeleteByQueryRequest{
      Index: []string{"test_index"},
      Body:bytes.NewReader(jsonBody),
    }
    res, err := req.Do(context.Background(), c)
    checkError(err)
    defer res.Body.Close()
    fmt.Println(res.String())
}
</code></pre>
<p> {"took":17,"timed_out":false,"total":9,"deleted":9,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[]}</p><br><br>
来源:https://www.cnblogs.com/Me1onRind/p/11534544.html
頁: [1]
查看完整版本: go对elasticsearch的增删改查