198 lines
5.2 KiB
Go
198 lines
5.2 KiB
Go
// Filename: internal/service/analytics_service.go
|
|
|
|
package service
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"gemini-balancer/internal/db/dialect"
|
|
"gemini-balancer/internal/models"
|
|
"gemini-balancer/internal/store"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
const (
|
|
flushLoopInterval = 1 * time.Minute
|
|
)
|
|
|
|
type AnalyticsServiceLogger struct{ *logrus.Entry }
|
|
|
|
type AnalyticsService struct {
|
|
db *gorm.DB
|
|
store store.Store
|
|
logger *logrus.Entry
|
|
stopChan chan struct{}
|
|
wg sync.WaitGroup
|
|
dialect dialect.DialectAdapter
|
|
}
|
|
|
|
func NewAnalyticsService(db *gorm.DB, s store.Store, logger *logrus.Logger, d dialect.DialectAdapter) *AnalyticsService {
|
|
return &AnalyticsService{
|
|
db: db,
|
|
store: s,
|
|
logger: logger.WithField("component", "Analytics📊"),
|
|
stopChan: make(chan struct{}),
|
|
dialect: d,
|
|
}
|
|
}
|
|
|
|
func (s *AnalyticsService) Start() {
|
|
s.wg.Add(2) // 2 (flushLoop, eventListener)
|
|
go s.flushLoop()
|
|
go s.eventListener()
|
|
s.logger.Info("AnalyticsService (Command Side) started.")
|
|
}
|
|
|
|
func (s *AnalyticsService) Stop() {
|
|
close(s.stopChan)
|
|
s.wg.Wait()
|
|
s.logger.Info("AnalyticsService stopped. Performing final data flush...")
|
|
s.flushToDB() // 停止前刷盘
|
|
s.logger.Info("AnalyticsService final data flush completed.")
|
|
}
|
|
|
|
func (s *AnalyticsService) eventListener() {
|
|
defer s.wg.Done()
|
|
sub, err := s.store.Subscribe(models.TopicRequestFinished)
|
|
if err != nil {
|
|
s.logger.Fatalf("Failed to subscribe to topic %s: %v", models.TopicRequestFinished, err)
|
|
return
|
|
}
|
|
defer sub.Close()
|
|
s.logger.Info("AnalyticsService subscribed to request events.")
|
|
|
|
for {
|
|
select {
|
|
case msg := <-sub.Channel():
|
|
var event models.RequestFinishedEvent
|
|
if err := json.Unmarshal(msg.Payload, &event); err != nil {
|
|
s.logger.Errorf("Failed to unmarshal analytics event: %v", err)
|
|
continue
|
|
}
|
|
s.handleAnalyticsEvent(&event)
|
|
case <-s.stopChan:
|
|
s.logger.Info("AnalyticsService stopping event listener.")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *AnalyticsService) handleAnalyticsEvent(event *models.RequestFinishedEvent) {
|
|
if event.GroupID == 0 {
|
|
return
|
|
}
|
|
key := fmt.Sprintf("analytics:hourly:%s", time.Now().UTC().Format("2006-01-02T15"))
|
|
fieldPrefix := fmt.Sprintf("%d:%s", event.GroupID, event.ModelName)
|
|
|
|
pipe := s.store.Pipeline()
|
|
pipe.HIncrBy(key, fieldPrefix+":requests", 1)
|
|
if event.IsSuccess {
|
|
pipe.HIncrBy(key, fieldPrefix+":success", 1)
|
|
}
|
|
if event.PromptTokens > 0 {
|
|
pipe.HIncrBy(key, fieldPrefix+":prompt", int64(event.PromptTokens))
|
|
}
|
|
if event.CompletionTokens > 0 {
|
|
pipe.HIncrBy(key, fieldPrefix+":completion", int64(event.CompletionTokens))
|
|
}
|
|
|
|
if err := pipe.Exec(); err != nil {
|
|
s.logger.Warnf("[%s] Failed to record analytics event to store for group %d: %v", event.CorrelationID, event.GroupID, err)
|
|
}
|
|
}
|
|
|
|
func (s *AnalyticsService) flushLoop() {
|
|
defer s.wg.Done()
|
|
ticker := time.NewTicker(flushLoopInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.flushToDB()
|
|
case <-s.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *AnalyticsService) flushToDB() {
|
|
now := time.Now().UTC()
|
|
keysToFlush := []string{
|
|
fmt.Sprintf("analytics:hourly:%s", now.Add(-1*time.Hour).Format("2006-01-02T15")),
|
|
fmt.Sprintf("analytics:hourly:%s", now.Format("2006-01-02T15")),
|
|
}
|
|
|
|
for _, key := range keysToFlush {
|
|
data, err := s.store.HGetAll(key)
|
|
if err != nil || len(data) == 0 {
|
|
continue
|
|
}
|
|
|
|
statsToFlush, parsedFields := s.parseStatsFromHash(now.Truncate(time.Hour), data)
|
|
|
|
if len(statsToFlush) > 0 {
|
|
upsertClause := s.dialect.OnConflictUpdateAll(
|
|
[]string{"time", "group_id", "model_name"}, // conflict columns
|
|
[]string{"request_count", "success_count", "prompt_tokens", "completion_tokens"}, // update columns
|
|
)
|
|
err := s.db.Clauses(upsertClause).Create(&statsToFlush).Error
|
|
if err != nil {
|
|
s.logger.Errorf("Failed to flush analytics data for key %s: %v", key, err)
|
|
} else {
|
|
s.logger.Infof("Successfully flushed %d records from key %s.", len(statsToFlush), key)
|
|
_ = s.store.HDel(key, parsedFields...)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *AnalyticsService) parseStatsFromHash(t time.Time, data map[string]string) ([]models.StatsHourly, []string) {
|
|
tempAggregator := make(map[string]*models.StatsHourly)
|
|
var parsedFields []string
|
|
for field, valueStr := range data {
|
|
parts := strings.Split(field, ":")
|
|
if len(parts) != 3 {
|
|
continue
|
|
}
|
|
groupIDStr, modelName, counterType := parts[0], parts[1], parts[2]
|
|
|
|
aggKey := groupIDStr + ":" + modelName
|
|
if _, ok := tempAggregator[aggKey]; !ok {
|
|
gid, err := strconv.Atoi(groupIDStr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
tempAggregator[aggKey] = &models.StatsHourly{
|
|
Time: t,
|
|
GroupID: uint(gid),
|
|
ModelName: modelName,
|
|
}
|
|
}
|
|
val, _ := strconv.ParseInt(valueStr, 10, 64)
|
|
switch counterType {
|
|
case "requests":
|
|
tempAggregator[aggKey].RequestCount = val
|
|
case "success":
|
|
tempAggregator[aggKey].SuccessCount = val
|
|
case "prompt":
|
|
tempAggregator[aggKey].PromptTokens = val
|
|
case "completion":
|
|
tempAggregator[aggKey].CompletionTokens = val
|
|
}
|
|
parsedFields = append(parsedFields, field)
|
|
}
|
|
var result []models.StatsHourly
|
|
for _, stats := range tempAggregator {
|
|
if stats.RequestCount > 0 {
|
|
result = append(result, *stats)
|
|
}
|
|
}
|
|
return result, parsedFields
|
|
}
|