package elastic import ( "bytes" "context" "encoding/json" "log" "sync/atomic" "time" "git.apinb.com/bsm-sdk/engine/vars" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/go-elasticsearch/v8/esutil" ) type ES struct { Client *elasticsearch.Client } func NewElastic(endpoints []string, username, password string) (*ES, error) { cfg := elasticsearch.Config{ Addresses: endpoints, Username: username, Password: password, } var err error client, err := elasticsearch.NewClient(cfg) if err != nil { return nil, err } return &ES{ Client: client, }, nil } // idx 为空,默认随机唯一字符串 // // doc := map[string]interface{}{ // "title": "中国", // "content": "中国早日统一台湾", // "time": time.Now().Unix(), // "date": time.Now(), // } func (es *ES) CreateDocument(index string, id string, doc *interface{}) { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(doc); err != nil { log.Println("Elastic NewEncoder:", err) } // Set up the request object. req := esapi.IndexRequest{ Index: index, DocumentID: id, Body: &buf, Refresh: "true", } // Perform the request with the client. res, err := req.Do(context.Background(), es.Client) if err != nil { log.Println("Elastic Error:", res.String()) } defer res.Body.Close() if res.IsError() { log.Println("Elastic Error:", res.String()) } } // 批量写入文档。 // Action field configures the operation to perform (index, create, delete, update) // create 如果文档不存在就创建,但如果文档存在就返回错误 // index 如果文档不存在就创建,如果文档存在就更新 // update 更新一个文档,如果文档不存在就返回错误 // delete 删除一个文档,如果要删除的文档id不存在,就返回错误 func (es *ES) Batch(index string, documens []map[string]interface{}, action string) { log.SetFlags(0) var ( countSuccessful uint64 err error ) // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Create the BulkIndexer // // NOTE: For optimal performance, consider using a third-party JSON decoding package. // See an example in the "benchmarks" folder. // bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: index, // The default index name Client: es.Client, // The Elasticsearch client NumWorkers: vars.ESNumWorkers, // The number of worker goroutines FlushBytes: vars.ESFlushBytes, // The flush threshold in bytes FlushInterval: 30 * time.Second, // The periodic flush interval }) if err != nil { log.Fatalf("Error creating the indexer: %s", err) } for _, doc := range documens { id := doc["id"].(string) // Prepare the data payload: encode article to JSON data, err := json.Marshal(doc) if err != nil { log.Fatalf("Cannot encode documen %s: %s", id, err) } // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Add an item to the BulkIndexer // err = bi.Add( context.Background(), esutil.BulkIndexerItem{ Action: action, // DocumentID is the (optional) document ID DocumentID: id, // Body is an `io.Reader` with the payload Body: bytes.NewReader(data), // OnSuccess is called for each successful operation OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { atomic.AddUint64(&countSuccessful, 1) }, // OnFailure is called for each failed operation OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { log.Printf("ERROR: %s", err) } else { log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) if err != nil { log.Printf("Unexpected error: %s", err) } // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< } // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // Close the indexer // if err := bi.Close(context.Background()); err != nil { log.Printf("Unexpected error: %s", err) } stats := bi.Stats() if stats.NumFailed > 0 { log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed) } else { log.Printf("Successfully indexed [%d] documents", stats.NumFlushed) } } func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) { var buf bytes.Buffer if err = json.NewEncoder(&buf).Encode(query); err != nil { return } // Perform the search request. res, err = es.Client.Search( es.Client.Search.WithContext(context.Background()), es.Client.Search.WithIndex(index), es.Client.Search.WithBody(&buf), es.Client.Search.WithTrackTotalHits(true), es.Client.Search.WithFrom(0), es.Client.Search.WithSize(10), es.Client.Search.WithSort("time:desc"), es.Client.Search.WithPretty(), ) if err != nil { return } defer res.Body.Close() return } // 删除 index 根据 索引名 id func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) { res, err = es.Client.Delete( index, // Index name idx, // Document ID es.Client.Delete.WithRefresh("true"), ) if err != nil { return } defer res.Body.Close() return } func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) { var buf bytes.Buffer if err = json.NewEncoder(&buf).Encode(query); err != nil { return } // Perform the search request. res, err = es.Client.DeleteByQuery( index, &buf, ) if err != nil { return } defer res.Body.Close() return }