engine/database/elastic/elasticsearch.go

221 lines
5.6 KiB
Go
Raw Normal View History

2024-02-11 01:31:01 +08:00
package elastic
import (
"bytes"
"context"
"encoding/json"
"log"
"sync/atomic"
"time"
"git.apinb.com/bsm-sdk/engine/vars"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
type ES struct {
Client *elasticsearch.Client
}
func NewElastic(endpoints []string, username, password string) (*ES, error) {
cfg := elasticsearch.Config{
Addresses: endpoints,
Username: username,
Password: password,
}
var err error
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, err
}
return &ES{
Client: client,
}, nil
}
// idx 为空,默认随机唯一字符串
//
// doc := map[string]interface{}{
// "title": "中国",
// "content": "中国早日统一台湾",
// "time": time.Now().Unix(),
// "date": time.Now(),
// }
func (es *ES) CreateDocument(index string, id string, doc *interface{}) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(doc); err != nil {
log.Println("Elastic NewEncoder:", err)
}
// Set up the request object.
req := esapi.IndexRequest{
Index: index,
DocumentID: id,
Body: &buf,
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), es.Client)
if err != nil {
log.Println("Elastic Error:", res.String())
}
defer res.Body.Close()
if res.IsError() {
log.Println("Elastic Error:", res.String())
}
}
// 批量写入文档。
// Action field configures the operation to perform (index, create, delete, update)
// create 如果文档不存在就创建,但如果文档存在就返回错误
// index 如果文档不存在就创建,如果文档存在就更新
// update 更新一个文档,如果文档不存在就返回错误
// delete 删除一个文档如果要删除的文档id不存在就返回错误
func (es *ES) Batch(index string, documens []map[string]interface{}, action string) {
log.SetFlags(0)
var (
countSuccessful uint64
err error
)
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//
// Create the BulkIndexer
//
// NOTE: For optimal performance, consider using a third-party JSON decoding package.
// See an example in the "benchmarks" folder.
//
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: index, // The default index name
Client: es.Client, // The Elasticsearch client
NumWorkers: vars.ESNumWorkers, // The number of worker goroutines
FlushBytes: vars.ESFlushBytes, // The flush threshold in bytes
FlushInterval: 30 * time.Second, // The periodic flush interval
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
for _, doc := range documens {
id := doc["id"].(string)
// Prepare the data payload: encode article to JSON
data, err := json.Marshal(doc)
if err != nil {
log.Fatalf("Cannot encode documen %s: %s", id, err)
}
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//
// Add an item to the BulkIndexer
//
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: action,
// DocumentID is the (optional) document ID
DocumentID: id,
// Body is an `io.Reader` with the payload
Body: bytes.NewReader(data),
// OnSuccess is called for each successful operation
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)
},
// OnFailure is called for each failed operation
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Printf("Unexpected error: %s", err)
}
// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
}
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
// Close the indexer
//
if err := bi.Close(context.Background()); err != nil {
log.Printf("Unexpected error: %s", err)
}
stats := bi.Stats()
if stats.NumFailed > 0 {
log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
} else {
log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
}
}
func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) {
var buf bytes.Buffer
if err = json.NewEncoder(&buf).Encode(query); err != nil {
return
}
// Perform the search request.
res, err = es.Client.Search(
es.Client.Search.WithContext(context.Background()),
es.Client.Search.WithIndex(index),
es.Client.Search.WithBody(&buf),
es.Client.Search.WithTrackTotalHits(true),
es.Client.Search.WithFrom(0),
es.Client.Search.WithSize(10),
es.Client.Search.WithSort("time:desc"),
es.Client.Search.WithPretty(),
)
if err != nil {
return
}
defer res.Body.Close()
return
}
// 删除 index 根据 索引名 id
func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) {
res, err = es.Client.Delete(
index, // Index name
idx, // Document ID
es.Client.Delete.WithRefresh("true"),
)
if err != nil {
return
}
defer res.Body.Close()
return
}
func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) {
var buf bytes.Buffer
if err = json.NewEncoder(&buf).Encode(query); err != nil {
return
}
// Perform the search request.
res, err = es.Client.DeleteByQuery(
index,
&buf,
)
if err != nil {
return
}
defer res.Body.Close()
return
}