package elastic

import (
	"bytes"
	"context"
	"encoding/json"
	"log"
	"sync/atomic"
	"time"

	"git.apinb.com/bsm-sdk/core/vars"
	"github.com/elastic/go-elasticsearch/v8"
	"github.com/elastic/go-elasticsearch/v8/esapi"
	"github.com/elastic/go-elasticsearch/v8/esutil"
)

type ES struct {
	Client *elasticsearch.Client
}

func NewElastic(endpoints []string, username, password string) (*ES, error) {

	cfg := elasticsearch.Config{
		Addresses: endpoints,
		Username:  username,
		Password:  password,
	}
	var err error
	client, err := elasticsearch.NewClient(cfg)
	if err != nil {
		return nil, err
	}

	return &ES{
		Client: client,
	}, nil
}

// idx 为空,默认随机唯一字符串
//
//	doc := map[string]interface{}{
//		"title":   "中国",
//		"content": "中国早日统一台湾",
//		"time":    time.Now().Unix(),
//		"date":    time.Now(),
//	}
func (es *ES) CreateDocument(index string, id string, doc *interface{}) {
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(doc); err != nil {
		log.Println("Elastic NewEncoder:", err)
	}
	// Set up the request object.
	req := esapi.IndexRequest{
		Index:      index,
		DocumentID: id,
		Body:       &buf,
		Refresh:    "true",
	}

	// Perform the request with the client.
	res, err := req.Do(context.Background(), es.Client)
	if err != nil {
		log.Println("Elastic Error:", res.String())
	}
	defer res.Body.Close()

	if res.IsError() {
		log.Println("Elastic Error:", res.String())
	}
}

// 批量写入文档。
// Action field configures the operation to perform (index, create, delete, update)
// create 如果文档不存在就创建,但如果文档存在就返回错误
// index 如果文档不存在就创建,如果文档存在就更新
// update 更新一个文档,如果文档不存在就返回错误
// delete 删除一个文档,如果要删除的文档id不存在,就返回错误
func (es *ES) Batch(index string, documens []map[string]interface{}, action string) {
	log.SetFlags(0)

	var (
		countSuccessful uint64

		err error
	)

	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
	//
	// Create the BulkIndexer
	//
	// NOTE: For optimal performance, consider using a third-party JSON decoding package.
	//       See an example in the "benchmarks" folder.
	//
	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:         index,             // The default index name
		Client:        es.Client,         // The Elasticsearch client
		NumWorkers:    vars.ESNumWorkers, // The number of worker goroutines
		FlushBytes:    vars.ESFlushBytes, // The flush threshold in bytes
		FlushInterval: 30 * time.Second,  // The periodic flush interval
	})
	if err != nil {
		log.Fatalf("Error creating the indexer: %s", err)
	}

	for _, doc := range documens {
		id := doc["id"].(string)
		// Prepare the data payload: encode article to JSON
		data, err := json.Marshal(doc)
		if err != nil {
			log.Fatalf("Cannot encode documen %s: %s", id, err)
		}

		// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
		//
		// Add an item to the BulkIndexer
		//
		err = bi.Add(
			context.Background(),
			esutil.BulkIndexerItem{

				Action: action,

				// DocumentID is the (optional) document ID
				DocumentID: id,

				// Body is an `io.Reader` with the payload
				Body: bytes.NewReader(data),

				// OnSuccess is called for each successful operation
				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
					atomic.AddUint64(&countSuccessful, 1)
				},

				// OnFailure is called for each failed operation
				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
					if err != nil {
						log.Printf("ERROR: %s", err)
					} else {
						log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
					}
				},
			},
		)
		if err != nil {
			log.Printf("Unexpected error: %s", err)
		}
		// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
	}

	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
	// Close the indexer
	//
	if err := bi.Close(context.Background()); err != nil {
		log.Printf("Unexpected error: %s", err)
	}

	stats := bi.Stats()
	if stats.NumFailed > 0 {
		log.Printf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
	} else {
		log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
	}
}

func (es *ES) Search(index string, query map[string]interface{}) (res *esapi.Response, err error) {
	var buf bytes.Buffer
	if err = json.NewEncoder(&buf).Encode(query); err != nil {
		return
	}
	// Perform the search request.
	res, err = es.Client.Search(
		es.Client.Search.WithContext(context.Background()),
		es.Client.Search.WithIndex(index),
		es.Client.Search.WithBody(&buf),
		es.Client.Search.WithTrackTotalHits(true),
		es.Client.Search.WithFrom(0),
		es.Client.Search.WithSize(10),
		es.Client.Search.WithSort("time:desc"),
		es.Client.Search.WithPretty(),
	)
	if err != nil {
		return
	}
	defer res.Body.Close()

	return
}

// 删除 index 根据 索引名 id
func (es *ES) Delete(index, idx string) (res *esapi.Response, err error) {
	res, err = es.Client.Delete(
		index, // Index name
		idx,   // Document ID
		es.Client.Delete.WithRefresh("true"),
	)
	if err != nil {
		return
	}
	defer res.Body.Close()

	return
}

func (es *ES) DeleteByQuery(index []string, query map[string]interface{}) (res *esapi.Response, err error) {
	var buf bytes.Buffer
	if err = json.NewEncoder(&buf).Encode(query); err != nil {
		return
	}
	// Perform the search request.
	res, err = es.Client.DeleteByQuery(
		index,
		&buf,
	)
	if err != nil {
		return
	}
	defer res.Body.Close()

	return
}