// Filename: internal/service/analytics_service.go package service import ( "encoding/json" "fmt" "gemini-balancer/internal/db/dialect" "gemini-balancer/internal/models" "gemini-balancer/internal/store" "strconv" "strings" "sync" "time" "github.com/sirupsen/logrus" "gorm.io/gorm" ) const ( flushLoopInterval = 1 * time.Minute ) type AnalyticsServiceLogger struct{ *logrus.Entry } type AnalyticsService struct { db *gorm.DB store store.Store logger *logrus.Entry stopChan chan struct{} wg sync.WaitGroup dialect dialect.DialectAdapter } func NewAnalyticsService(db *gorm.DB, s store.Store, logger *logrus.Logger, d dialect.DialectAdapter) *AnalyticsService { return &AnalyticsService{ db: db, store: s, logger: logger.WithField("component", "Analytics📊"), stopChan: make(chan struct{}), dialect: d, } } func (s *AnalyticsService) Start() { s.wg.Add(2) // 2 (flushLoop, eventListener) go s.flushLoop() go s.eventListener() s.logger.Info("AnalyticsService (Command Side) started.") } func (s *AnalyticsService) Stop() { close(s.stopChan) s.wg.Wait() s.logger.Info("AnalyticsService stopped. Performing final data flush...") s.flushToDB() // 停止前刷盘 s.logger.Info("AnalyticsService final data flush completed.") } func (s *AnalyticsService) eventListener() { defer s.wg.Done() sub, err := s.store.Subscribe(models.TopicRequestFinished) if err != nil { s.logger.Fatalf("Failed to subscribe to topic %s: %v", models.TopicRequestFinished, err) return } defer sub.Close() s.logger.Info("AnalyticsService subscribed to request events.") for { select { case msg := <-sub.Channel(): var event models.RequestFinishedEvent if err := json.Unmarshal(msg.Payload, &event); err != nil { s.logger.Errorf("Failed to unmarshal analytics event: %v", err) continue } s.handleAnalyticsEvent(&event) case <-s.stopChan: s.logger.Info("AnalyticsService stopping event listener.") return } } } func (s *AnalyticsService) handleAnalyticsEvent(event *models.RequestFinishedEvent) { if event.RequestLog.GroupID == nil { return } key := fmt.Sprintf("analytics:hourly:%s", time.Now().UTC().Format("2006-01-02T15")) fieldPrefix := fmt.Sprintf("%d:%s", *event.RequestLog.GroupID, event.RequestLog.ModelName) pipe := s.store.Pipeline() pipe.HIncrBy(key, fieldPrefix+":requests", 1) if event.RequestLog.IsSuccess { pipe.HIncrBy(key, fieldPrefix+":success", 1) } if event.RequestLog.PromptTokens > 0 { pipe.HIncrBy(key, fieldPrefix+":prompt", int64(event.RequestLog.PromptTokens)) } if event.RequestLog.CompletionTokens > 0 { pipe.HIncrBy(key, fieldPrefix+":completion", int64(event.RequestLog.CompletionTokens)) } if err := pipe.Exec(); err != nil { s.logger.Warnf("[%s] Failed to record analytics event to store for group %d: %v", event.CorrelationID, *event.RequestLog.GroupID, err) } } func (s *AnalyticsService) flushLoop() { defer s.wg.Done() ticker := time.NewTicker(flushLoopInterval) defer ticker.Stop() for { select { case <-ticker.C: s.flushToDB() case <-s.stopChan: return } } } func (s *AnalyticsService) flushToDB() { now := time.Now().UTC() keysToFlush := []string{ fmt.Sprintf("analytics:hourly:%s", now.Add(-1*time.Hour).Format("2006-01-02T15")), fmt.Sprintf("analytics:hourly:%s", now.Format("2006-01-02T15")), } for _, key := range keysToFlush { data, err := s.store.HGetAll(key) if err != nil || len(data) == 0 { continue } statsToFlush, parsedFields := s.parseStatsFromHash(now.Truncate(time.Hour), data) if len(statsToFlush) > 0 { upsertClause := s.dialect.OnConflictUpdateAll( []string{"time", "group_id", "model_name"}, // conflict columns []string{"request_count", "success_count", "prompt_tokens", "completion_tokens"}, // update columns ) err := s.db.Clauses(upsertClause).Create(&statsToFlush).Error if err != nil { s.logger.Errorf("Failed to flush analytics data for key %s: %v", key, err) } else { s.logger.Infof("Successfully flushed %d records from key %s.", len(statsToFlush), key) _ = s.store.HDel(key, parsedFields...) } } } } func (s *AnalyticsService) parseStatsFromHash(t time.Time, data map[string]string) ([]models.StatsHourly, []string) { tempAggregator := make(map[string]*models.StatsHourly) var parsedFields []string for field, valueStr := range data { parts := strings.Split(field, ":") if len(parts) != 3 { continue } groupIDStr, modelName, counterType := parts[0], parts[1], parts[2] aggKey := groupIDStr + ":" + modelName if _, ok := tempAggregator[aggKey]; !ok { gid, err := strconv.Atoi(groupIDStr) if err != nil { continue } tempAggregator[aggKey] = &models.StatsHourly{ Time: t, GroupID: uint(gid), ModelName: modelName, } } val, _ := strconv.ParseInt(valueStr, 10, 64) switch counterType { case "requests": tempAggregator[aggKey].RequestCount = val case "success": tempAggregator[aggKey].SuccessCount = val case "prompt": tempAggregator[aggKey].PromptTokens = val case "completion": tempAggregator[aggKey].CompletionTokens = val } parsedFields = append(parsedFields, field) } var result []models.StatsHourly for _, stats := range tempAggregator { if stats.RequestCount > 0 { result = append(result, *stats) } } return result, parsedFields }