engine/database/elastic/elasticsearch.go

221 lines
5.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package elastic
import (
"bytes"
"context"
"encoding/json"
"log"
"sync/atomic"
"time"
"git.apinb.com/bsm-sdk/engine/vars"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
type ES struct {
Client *elasticsearch.Client
}
func NewElastic(endpoints []string, username, password string) (*ES, error) {
cfg := elasticsearch.Config{
Addresses: endpoints,
Username: username,
Password: password,
}
var err error
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, err
}
return &ES{
Client: client,
}, nil
}
// idx 为空,默认随机唯一字符串
//
// doc := map[string]interface{}{
// "title": "中国",
// "content": "中国早日统一台湾",
// "time": time.Now().Unix(),
// "date": time.Now(),
// }
func (es *ES) CreateDocument(index string, id string, doc *interface{}) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
log.Println("Elastic NewEncoder:", err)
}
// Set up the request object.
req := esapi.IndexRequest{
Index: index,
DocumentID: id,
Body: &buf,
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), es.Client)
if err != nil {
log.Println("Elastic Error:", res.String())
}
defer res.Body.Close()
if res.IsError() {
log.Println("Elastic Error:", res.String())
}
}
// 批量写入文档。
// Action field configures the operation to perform (index, create, delete, update)
// create 如果文档不存在就创建,但如果文档存在就返回错误
// index 如果文档不存在就创建,如果文档存在就更新
// update 更新一个文档,如果文档不存在就返回错误
// delete 删除一个文档如果要删除的文档id不存在就返回错误
func (es *ES) Batch(index string, documens []map[string]interface{}, action string) {
log.SetFlags(0)
var (
countSuccessful uint64
err error
)
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//
// Create the BulkIndexer
//
// NOTE: For optimal performance, consider using a third-party JSON decoding package.
// See an example in the "benchmarks" folder.
//
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: index, // The default index name
Client: es.Client, // The Elasticsearch client
NumWorkers: vars.ESNumWorkers, // The number of worker goroutines
FlushBytes: vars.ESFlushBytes, // The flush threshold in bytes
FlushInterval: 30 * time.Second, // The periodic flush interval
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
for _, doc := range documens {
id := doc["id"].(string)
// Prepare the data payload: encode article to JSON
data, err := json.Marshal(doc)
if err != nil {
log.Fatalf("Cannot encode documen %s: %s", id, err)
}
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//
// Add an item to the BulkIndexer
//
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: action,
// DocumentID is the (optional) document ID
DocumentID: id,
// Body is an `io.Reader` with the payload
Body: bytes.NewReader(data),
// OnSuccess is called for each successful operation
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)
},
// OnFailure is called for each failed operation
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Printf("Unexpected error: %s", err)
}
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
}
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
// Close the indexer
//
if err := bi.Close(context.Background()); err != nil {
log.Printf("Unexpected error: %s", err)
}
stats := bi.Stats()
if stats.NumFailed > 0 {
log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
} else {
log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
}
}
func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) {
var buf bytes.Buffer
if err = json.NewEncoder(&buf).Encode(query); err != nil {
return
}
// Perform the search request.
res, err = es.Client.Search(
es.Client.Search.WithContext(context.Background()),
es.Client.Search.WithIndex(index),
es.Client.Search.WithBody(&buf),
es.Client.Search.WithTrackTotalHits(true),
es.Client.Search.WithFrom(0),
es.Client.Search.WithSize(10),
es.Client.Search.WithSort("time:desc"),
es.Client.Search.WithPretty(),
)
if err != nil {
return
}
defer res.Body.Close()
return
}
// 删除 index 根据 索引名 id
func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) {
res, err = es.Client.Delete(
index, // Index name
idx, // Document ID
es.Client.Delete.WithRefresh("true"),
)
if err != nil {
return
}
defer res.Body.Close()
return
}
func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) {
var buf bytes.Buffer
if err = json.NewEncoder(&buf).Encode(query); err != nil {
return
}
// Perform the search request.
res, err = es.Client.DeleteByQuery(
index,
&buf,
)
if err != nil {
return
}
defer res.Body.Close()
return
}