package ingest import ( "bytes" "encoding/json" "fmt" "net/http" "strings" "time" "git.apinb.com/ops/logs/internal/config" "git.apinb.com/ops/logs/internal/impl" "git.apinb.com/ops/logs/internal/models" ) const ( outboxStatusPending = "pending" outboxStatusRetrying = "retrying" outboxStatusSent = "sent" outboxStatusDead = "dead" ) func enqueueAlert(logEventID uint, body AlertReceiveBody) error { payload, err := json.Marshal(body) if err != nil { return err } return enqueuePayload(logEventID, string(payload)) } func enqueueRawEvent(logEventID uint, body AlertReceiveBody, parseStatus string) error { payload, err := json.Marshal(buildRawEventIngestBody(body, parseStatus)) if err != nil { return err } return enqueuePayload(logEventID, string(payload)) } func enqueuePayload(logEventID uint, payloadJSON string) error { row := models.AlertOutbox{ LogEventID: logEventID, PayloadJSON: payloadJSON, Status: outboxStatusPending, RetryCount: 0, NextRetryAt: time.Now(), LastError: "", } return impl.DBService.Create(&row).Error } func StartAlertDispatcher() { go func() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for range ticker.C { processAlertOutboxBatch(20) } }() } func processAlertOutboxBatch(limit int) { if limit <= 0 { limit = 20 } var rows []models.AlertOutbox now := time.Now() err := impl.DBService. Where("status IN ? AND next_retry_at <= ?", []string{outboxStatusPending, outboxStatusRetrying}, now). Order("id asc"). Limit(limit). Find(&rows).Error if err != nil || len(rows) == 0 { return } for _, row := range rows { processOneOutbox(row) } } func processOneOutbox(row models.AlertOutbox) { var body AlertReceiveBody if err := json.Unmarshal([]byte(row.PayloadJSON), &body); err != nil { markOutboxDead(row.ID, row.RetryCount, "invalid_payload: "+err.Error()) return } if err := forwardOutboxPayload(row.PayloadJSON, body); err != nil { markOutboxRetry(row, err.Error()) return } _ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", row.ID).Updates(map[string]interface{}{ "status": outboxStatusSent, "last_error": "", "next_retry_at": time.Now(), }).Error _ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Updates(map[string]interface{}{ "alert_sent": true, "dispatch_status": "sent", }).Error } func forwardOutboxPayload(payloadJSON string, legacyBody AlertReceiveBody) error { var rawEvent RawEventIngestBody if err := json.Unmarshal([]byte(payloadJSON), &rawEvent); err == nil && rawEvent.SourceType != "" && len(rawEvent.RawPayload) > 0 { return forwardRawEvent(rawEvent) } return forwardAlert(legacyBody) } func forwardRawEvent(body RawEventIngestBody) error { cfg := config.Spec.AlertForward if cfg == nil || !cfg.Enabled || cfg.BaseURL == "" { return nil } if len(body.RawPayload) == 0 { return fmt.Errorf("raw_payload 不能为空") } raw, err := json.Marshal(body) if err != nil { return err } url := cfg.BaseURL + "/Alert/v1/raw-events/ingest" req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(raw)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") if cfg.InternalKey != "" { req.Header.Set("X-Internal-Key", cfg.InternalKey) } client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("alert returned HTTP %d", resp.StatusCode) } return nil } func markOutboxRetry(row models.AlertOutbox, msg string) { retry := row.RetryCount + 1 const maxRetry = 5 if retry > maxRetry { markOutboxDead(row.ID, retry, msg) return } backoff := time.Duration(retry*retry) * time.Second if backoff > 60*time.Second { backoff = 60 * time.Second } _ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", row.ID).Updates(map[string]interface{}{ "status": outboxStatusRetrying, "retry_count": retry, "next_retry_at": time.Now().Add(backoff), "last_error": truncateError(msg, 1024), }).Error _ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Update("dispatch_status", "retrying").Error } func markOutboxDead(id uint, retry int, msg string) { _ = impl.DBService.Model(&models.AlertOutbox{}).Where("id = ?", id).Updates(map[string]interface{}{ "status": outboxStatusDead, "retry_count": retry, "next_retry_at": time.Now(), "last_error": truncateError(msg, 1024), }).Error var row models.AlertOutbox if err := impl.DBService.Select("log_event_id").First(&row, id).Error; err == nil && row.LogEventID > 0 { _ = impl.DBService.Model(&models.LogEvent{}).Where("id = ?", row.LogEventID).Update("dispatch_status", "dead").Error } } func truncateError(s string, n int) string { s = strings.TrimSpace(s) if len(s) <= n { return s } return s[:n] }