221 lines
5.6 KiB
Go
221 lines
5.6 KiB
Go
package elastic
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"log"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"git.apinb.com/bsm-sdk/engine/vars"
|
||
"github.com/elastic/go-elasticsearch/v8"
|
||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||
"github.com/elastic/go-elasticsearch/v8/esutil"
|
||
)
|
||
|
||
type ES struct {
|
||
Client *elasticsearch.Client
|
||
}
|
||
|
||
func NewElastic(endpoints []string, username, password string) (*ES, error) {
|
||
|
||
cfg := elasticsearch.Config{
|
||
Addresses: endpoints,
|
||
Username: username,
|
||
Password: password,
|
||
}
|
||
var err error
|
||
client, err := elasticsearch.NewClient(cfg)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return &ES{
|
||
Client: client,
|
||
}, nil
|
||
}
|
||
|
||
// idx 为空,默认随机唯一字符串
|
||
//
|
||
// doc := map[string]interface{}{
|
||
// "title": "中国",
|
||
// "content": "中国早日统一台湾",
|
||
// "time": time.Now().Unix(),
|
||
// "date": time.Now(),
|
||
// }
|
||
func (es *ES) CreateDocument(index string, id string, doc *interface{}) {
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
|
||
log.Println("Elastic NewEncoder:", err)
|
||
}
|
||
// Set up the request object.
|
||
req := esapi.IndexRequest{
|
||
Index: index,
|
||
DocumentID: id,
|
||
Body: &buf,
|
||
Refresh: "true",
|
||
}
|
||
|
||
// Perform the request with the client.
|
||
res, err := req.Do(context.Background(), es.Client)
|
||
if err != nil {
|
||
log.Println("Elastic Error:", res.String())
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
log.Println("Elastic Error:", res.String())
|
||
}
|
||
}
|
||
|
||
// 批量写入文档。
|
||
// Action field configures the operation to perform (index, create, delete, update)
|
||
// create 如果文档不存在就创建,但如果文档存在就返回错误
|
||
// index 如果文档不存在就创建,如果文档存在就更新
|
||
// update 更新一个文档,如果文档不存在就返回错误
|
||
// delete 删除一个文档,如果要删除的文档id不存在,就返回错误
|
||
func (es *ES) Batch(index string, documens []map[string]interface{}, action string) {
|
||
log.SetFlags(0)
|
||
|
||
var (
|
||
countSuccessful uint64
|
||
|
||
err error
|
||
)
|
||
|
||
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
||
//
|
||
// Create the BulkIndexer
|
||
//
|
||
// NOTE: For optimal performance, consider using a third-party JSON decoding package.
|
||
// See an example in the "benchmarks" folder.
|
||
//
|
||
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||
Index: index, // The default index name
|
||
Client: es.Client, // The Elasticsearch client
|
||
NumWorkers: vars.ESNumWorkers, // The number of worker goroutines
|
||
FlushBytes: vars.ESFlushBytes, // The flush threshold in bytes
|
||
FlushInterval: 30 * time.Second, // The periodic flush interval
|
||
})
|
||
if err != nil {
|
||
log.Fatalf("Error creating the indexer: %s", err)
|
||
}
|
||
|
||
for _, doc := range documens {
|
||
id := doc["id"].(string)
|
||
// Prepare the data payload: encode article to JSON
|
||
data, err := json.Marshal(doc)
|
||
if err != nil {
|
||
log.Fatalf("Cannot encode documen %s: %s", id, err)
|
||
}
|
||
|
||
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
||
//
|
||
// Add an item to the BulkIndexer
|
||
//
|
||
err = bi.Add(
|
||
context.Background(),
|
||
esutil.BulkIndexerItem{
|
||
|
||
Action: action,
|
||
|
||
// DocumentID is the (optional) document ID
|
||
DocumentID: id,
|
||
|
||
// Body is an `io.Reader` with the payload
|
||
Body: bytes.NewReader(data),
|
||
|
||
// OnSuccess is called for each successful operation
|
||
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
|
||
atomic.AddUint64(&countSuccessful, 1)
|
||
},
|
||
|
||
// OnFailure is called for each failed operation
|
||
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
|
||
if err != nil {
|
||
log.Printf("ERROR: %s", err)
|
||
} else {
|
||
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
|
||
}
|
||
},
|
||
},
|
||
)
|
||
if err != nil {
|
||
log.Printf("Unexpected error: %s", err)
|
||
}
|
||
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
|
||
}
|
||
|
||
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
||
// Close the indexer
|
||
//
|
||
if err := bi.Close(context.Background()); err != nil {
|
||
log.Printf("Unexpected error: %s", err)
|
||
}
|
||
|
||
stats := bi.Stats()
|
||
if stats.NumFailed > 0 {
|
||
log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
|
||
} else {
|
||
log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
|
||
}
|
||
}
|
||
|
||
func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) {
|
||
var buf bytes.Buffer
|
||
if err = json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return
|
||
}
|
||
// Perform the search request.
|
||
res, err = es.Client.Search(
|
||
es.Client.Search.WithContext(context.Background()),
|
||
es.Client.Search.WithIndex(index),
|
||
es.Client.Search.WithBody(&buf),
|
||
es.Client.Search.WithTrackTotalHits(true),
|
||
es.Client.Search.WithFrom(0),
|
||
es.Client.Search.WithSize(10),
|
||
es.Client.Search.WithSort("time:desc"),
|
||
es.Client.Search.WithPretty(),
|
||
)
|
||
if err != nil {
|
||
return
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
return
|
||
}
|
||
|
||
// 删除 index 根据 索引名 id
|
||
func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) {
|
||
res, err = es.Client.Delete(
|
||
index, // Index name
|
||
idx, // Document ID
|
||
es.Client.Delete.WithRefresh("true"),
|
||
)
|
||
if err != nil {
|
||
return
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
return
|
||
}
|
||
|
||
func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) {
|
||
var buf bytes.Buffer
|
||
if err = json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return
|
||
}
|
||
// Perform the search request.
|
||
res, err = es.Client.DeleteByQuery(
|
||
index,
|
||
&buf,
|
||
)
|
||
if err != nil {
|
||
return
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
return
|
||
}
|