package ingest import ( "encoding/json" "fmt" "strconv" "strings" "time" "git.apinb.com/ops/logs/internal/impl" "git.apinb.com/ops/logs/internal/models" ) func BuildReplayRawEventPayload(ev models.LogEvent) (RawEventIngestBody, error) { sourceType := replaySourceType(ev.SourceKind) if sourceType == "" { return RawEventIngestBody{}, fmt.Errorf("unsupported source kind %q", ev.SourceKind) } rawObj := map[string]interface{}{ "source": sourceType, "replayed_at": time.Now().UTC().Format(time.RFC3339), "log_entry_id": ev.ID, "source_ip": ev.SourceIP, "remote_addr": ev.RemoteAddr, "raw_packet": ev.RawPayload, } if ev.TrapOID != "" { rawObj["trap_oid"] = ev.TrapOID } if ev.NormalizedDetail != "" { var detail interface{} if err := json.Unmarshal([]byte(ev.NormalizedDetail), &detail); err == nil { rawObj["parsed"] = detail } } rawBytes, err := json.Marshal(rawObj) if err != nil { return RawEventIngestBody{}, err } labels := map[string]string{ "source_type": "log", "source_subtype": replaySubtype(ev.SourceKind), "replay_of_log_event_id": strconv.FormatUint(uint64(ev.ID), 10), "ip": ev.SourceIP, "remote_addr": ev.RemoteAddr, "device": ev.DeviceName, "job": "logs-replay", } if uid := replayResourceUID(ev); uid != "" { labels["resource_uid"] = uid } return RawEventIngestBody{ SourceType: sourceType, ResourceUID: replayResourceUID(ev), EventTime: time.Now().UTC(), Severity: firstNonEmpty(ev.SeverityCode, "warning"), Title: replayTitle(ev), Message: firstNonEmpty(ev.NormalizedSummary, ev.RawPayload), Labels: labels, Annotations: map[string]string{ "replay": "true", "dispatch_status": ev.DispatchStatus, }, ParseStatus: "replayed", RawPayload: rawBytes, }, nil } func EnqueueReplayLogEvent(ev models.LogEvent) (uint, error) { body, err := BuildReplayRawEventPayload(ev) if err != nil { return 0, err } payload, err := json.Marshal(body) if err != nil { return 0, err } row := models.AlertOutbox{ LogEventID: ev.ID, PayloadJSON: string(payload), Status: outboxStatusPending, RetryCount: 0, NextRetryAt: time.Now(), } if err := enqueueOutboxRow(&row); err != nil { return 0, err } return row.ID, nil } func enqueueOutboxRow(row *models.AlertOutbox) error { return impl.DBService.Create(row).Error } func replaySourceType(kind string) string { switch strings.TrimSpace(kind) { case "syslog": return "syslog" case "snmp_trap", "trap": return "trap" default: return "" } } func replaySubtype(kind string) string { if kind == "snmp_trap" { return "snmp_trap" } return replaySourceType(kind) } func replayResourceUID(ev models.LogEvent) string { if strings.Contains(ev.ResourceID, ":") { return ev.ResourceID } if ev.ResourceType != "" && ev.ResourceID != "" { return ev.ResourceType + ":" + ev.ResourceID } return "" } func replayTitle(ev models.LogEvent) string { if ev.SourceKind == "snmp_trap" && ev.TrapOID != "" { return "重放 SNMP Trap " + ev.TrapOID } if ev.SourceKind == "syslog" { return "重放 Syslog" } return "重放日志事件" }