221 lines
5.6 KiB
Go
221 lines
5.6 KiB
Go
|
package elastic
|
|||
|
|
|||
|
import (
|
|||
|
"bytes"
|
|||
|
"context"
|
|||
|
"encoding/json"
|
|||
|
"log"
|
|||
|
"sync/atomic"
|
|||
|
"time"
|
|||
|
|
|||
|
"git.apinb.com/bsm-sdk/engine/vars"
|
|||
|
"github.com/elastic/go-elasticsearch/v8"
|
|||
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
|||
|
"github.com/elastic/go-elasticsearch/v8/esutil"
|
|||
|
)
|
|||
|
|
|||
|
type ES struct {
|
|||
|
Client *elasticsearch.Client
|
|||
|
}
|
|||
|
|
|||
|
func NewElastic(endpoints []string, username, password string) (*ES, error) {
|
|||
|
|
|||
|
cfg := elasticsearch.Config{
|
|||
|
Addresses: endpoints,
|
|||
|
Username: username,
|
|||
|
Password: password,
|
|||
|
}
|
|||
|
var err error
|
|||
|
client, err := elasticsearch.NewClient(cfg)
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
|
|||
|
return &ES{
|
|||
|
Client: client,
|
|||
|
}, nil
|
|||
|
}
|
|||
|
|
|||
|
// idx 为空,默认随机唯一字符串
|
|||
|
//
|
|||
|
// doc := map[string]interface{}{
|
|||
|
// "title": "中国",
|
|||
|
// "content": "中国早日统一台湾",
|
|||
|
// "time": time.Now().Unix(),
|
|||
|
// "date": time.Now(),
|
|||
|
// }
|
|||
|
func (es *ES) CreateDocument(index string, id string, doc *interface{}) {
|
|||
|
var buf bytes.Buffer
|
|||
|
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
|
|||
|
log.Println("Elastic NewEncoder:", err)
|
|||
|
}
|
|||
|
// Set up the request object.
|
|||
|
req := esapi.IndexRequest{
|
|||
|
Index: index,
|
|||
|
DocumentID: id,
|
|||
|
Body: &buf,
|
|||
|
Refresh: "true",
|
|||
|
}
|
|||
|
|
|||
|
// Perform the request with the client.
|
|||
|
res, err := req.Do(context.Background(), es.Client)
|
|||
|
if err != nil {
|
|||
|
log.Println("Elastic Error:", res.String())
|
|||
|
}
|
|||
|
defer res.Body.Close()
|
|||
|
|
|||
|
if res.IsError() {
|
|||
|
log.Println("Elastic Error:", res.String())
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// 批量写入文档。
|
|||
|
// Action field configures the operation to perform (index, create, delete, update)
|
|||
|
// create 如果文档不存在就创建,但如果文档存在就返回错误
|
|||
|
// index 如果文档不存在就创建,如果文档存在就更新
|
|||
|
// update 更新一个文档,如果文档不存在就返回错误
|
|||
|
// delete 删除一个文档,如果要删除的文档id不存在,就返回错误
|
|||
|
func (es *ES) Batch(index string, documens []map[string]interface{}, action string) {
|
|||
|
log.SetFlags(0)
|
|||
|
|
|||
|
var (
|
|||
|
countSuccessful uint64
|
|||
|
|
|||
|
err error
|
|||
|
)
|
|||
|
|
|||
|
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
|||
|
//
|
|||
|
// Create the BulkIndexer
|
|||
|
//
|
|||
|
// NOTE: For optimal performance, consider using a third-party JSON decoding package.
|
|||
|
// See an example in the "benchmarks" folder.
|
|||
|
//
|
|||
|
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
|||
|
Index: index, // The default index name
|
|||
|
Client: es.Client, // The Elasticsearch client
|
|||
|
NumWorkers: vars.ESNumWorkers, // The number of worker goroutines
|
|||
|
FlushBytes: vars.ESFlushBytes, // The flush threshold in bytes
|
|||
|
FlushInterval: 30 * time.Second, // The periodic flush interval
|
|||
|
})
|
|||
|
if err != nil {
|
|||
|
log.Fatalf("Error creating the indexer: %s", err)
|
|||
|
}
|
|||
|
|
|||
|
for _, doc := range documens {
|
|||
|
id := doc["id"].(string)
|
|||
|
// Prepare the data payload: encode article to JSON
|
|||
|
data, err := json.Marshal(doc)
|
|||
|
if err != nil {
|
|||
|
log.Fatalf("Cannot encode documen %s: %s", id, err)
|
|||
|
}
|
|||
|
|
|||
|
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
|||
|
//
|
|||
|
// Add an item to the BulkIndexer
|
|||
|
//
|
|||
|
err = bi.Add(
|
|||
|
context.Background(),
|
|||
|
esutil.BulkIndexerItem{
|
|||
|
|
|||
|
Action: action,
|
|||
|
|
|||
|
// DocumentID is the (optional) document ID
|
|||
|
DocumentID: id,
|
|||
|
|
|||
|
// Body is an `io.Reader` with the payload
|
|||
|
Body: bytes.NewReader(data),
|
|||
|
|
|||
|
// OnSuccess is called for each successful operation
|
|||
|
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
|
|||
|
atomic.AddUint64(&countSuccessful, 1)
|
|||
|
},
|
|||
|
|
|||
|
// OnFailure is called for each failed operation
|
|||
|
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
|
|||
|
if err != nil {
|
|||
|
log.Printf("ERROR: %s", err)
|
|||
|
} else {
|
|||
|
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
|
|||
|
}
|
|||
|
},
|
|||
|
},
|
|||
|
)
|
|||
|
if err != nil {
|
|||
|
log.Printf("Unexpected error: %s", err)
|
|||
|
}
|
|||
|
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
|
|||
|
}
|
|||
|
|
|||
|
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
|||
|
// Close the indexer
|
|||
|
//
|
|||
|
if err := bi.Close(context.Background()); err != nil {
|
|||
|
log.Printf("Unexpected error: %s", err)
|
|||
|
}
|
|||
|
|
|||
|
stats := bi.Stats()
|
|||
|
if stats.NumFailed > 0 {
|
|||
|
log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
|
|||
|
} else {
|
|||
|
log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) {
|
|||
|
var buf bytes.Buffer
|
|||
|
if err = json.NewEncoder(&buf).Encode(query); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
// Perform the search request.
|
|||
|
res, err = es.Client.Search(
|
|||
|
es.Client.Search.WithContext(context.Background()),
|
|||
|
es.Client.Search.WithIndex(index),
|
|||
|
es.Client.Search.WithBody(&buf),
|
|||
|
es.Client.Search.WithTrackTotalHits(true),
|
|||
|
es.Client.Search.WithFrom(0),
|
|||
|
es.Client.Search.WithSize(10),
|
|||
|
es.Client.Search.WithSort("time:desc"),
|
|||
|
es.Client.Search.WithPretty(),
|
|||
|
)
|
|||
|
if err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
defer res.Body.Close()
|
|||
|
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
// 删除 index 根据 索引名 id
|
|||
|
func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) {
|
|||
|
res, err = es.Client.Delete(
|
|||
|
index, // Index name
|
|||
|
idx, // Document ID
|
|||
|
es.Client.Delete.WithRefresh("true"),
|
|||
|
)
|
|||
|
if err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
defer res.Body.Close()
|
|||
|
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) {
|
|||
|
var buf bytes.Buffer
|
|||
|
if err = json.NewEncoder(&buf).Encode(query); err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
// Perform the search request.
|
|||
|
res, err = es.Client.DeleteByQuery(
|
|||
|
index,
|
|||
|
&buf,
|
|||
|
)
|
|||
|
if err != nil {
|
|||
|
return
|
|||
|
}
|
|||
|
defer res.Body.Close()
|
|||
|
|
|||
|
return
|
|||
|
}
|