// [REVISED] - 用这个更智能的版本完整替换 package scheduler import ( "context" "fmt" // [NEW] 导入 fmt "gemini-balancer/internal/repository" "gemini-balancer/internal/service" "gemini-balancer/internal/settings" "gemini-balancer/internal/store" "strconv" // [NEW] 导入 strconv "strings" // [NEW] 导入 strings "sync" "time" "github.com/go-co-op/gocron" "github.com/sirupsen/logrus" ) // ... (Scheduler struct 和 NewScheduler 保持不变) ... const LogCleanupTaskTag = "log-cleanup-task" type Scheduler struct { gocronScheduler *gocron.Scheduler logger *logrus.Entry statsService *service.StatsService logService *service.LogService settingsManager *settings.SettingsManager keyRepo repository.KeyRepository store store.Store stopChan chan struct{} wg sync.WaitGroup } func NewScheduler( statsSvc *service.StatsService, logSvc *service.LogService, keyRepo repository.KeyRepository, settingsMgr *settings.SettingsManager, store store.Store, logger *logrus.Logger, ) *Scheduler { s := gocron.NewScheduler(time.UTC) s.TagsUnique() return &Scheduler{ gocronScheduler: s, logger: logger.WithField("component", "Scheduler📆"), statsService: statsSvc, logService: logSvc, settingsManager: settingsMgr, keyRepo: keyRepo, store: store, stopChan: make(chan struct{}), } } // ... (Start 和 listenForSettingsUpdates 保持不变) ... func (s *Scheduler) Start() { s.logger.Info("Starting scheduler and registering jobs...") // --- 注册静态定时任务 --- _, err := s.gocronScheduler.Cron("5 * * * *").Tag("stats-aggregation").Do(func() { s.logger.Info("Executing hourly request stats aggregation...") ctx := context.Background() if err := s.statsService.AggregateHourlyStats(ctx); err != nil { s.logger.WithError(err).Error("Hourly stats aggregation failed.") } else { s.logger.Info("Hourly stats aggregation completed successfully.") } }) if err != nil { s.logger.Errorf("Failed to schedule [stats-aggregation]: %v", err) } _, err = s.gocronScheduler.Cron("15 3 * * *").Tag("cleanup-soft-deleted-keys").Do(func() { s.logger.Info("Executing daily cleanup of soft-deleted API keys...") const retentionDays = 7 count, err := s.keyRepo.HardDeleteSoftDeletedBefore(time.Now().AddDate(0, 0, -retentionDays)) if err != nil { s.logger.WithError(err).Error("Daily cleanup of soft-deleted keys failed.") } else if count > 0 { s.logger.Infof("Daily cleanup completed: Permanently deleted %d expired soft-deleted keys.", count) } else { s.logger.Info("Daily cleanup completed: No expired soft-deleted keys found to delete.") } }) if err != nil { s.logger.Errorf("Failed to schedule [cleanup-soft-deleted-keys]: %v", err) } // --- 动态任务初始化 --- if err := s.UpdateLogCleanupTask(); err != nil { s.logger.WithError(err).Error("Failed to initialize log cleanup task on startup.") } // --- 启动后台监听器和调度器 --- s.wg.Add(1) go s.listenForSettingsUpdates() s.gocronScheduler.StartAsync() s.logger.Info("Scheduler started.") } func (s *Scheduler) listenForSettingsUpdates() { defer s.wg.Done() s.logger.Info("Starting listener for system settings updates...") for { select { case <-s.stopChan: s.logger.Info("Stopping settings update listener.") return default: } ctx, cancel := context.WithCancel(context.Background()) subscription, err := s.store.Subscribe(ctx, settings.SettingsUpdateChannel) if err != nil { s.logger.WithError(err).Warnf("Failed to subscribe to settings channel, retrying in 5s...") cancel() time.Sleep(5 * time.Second) continue } s.logger.Infof("Successfully subscribed to channel '%s'.", settings.SettingsUpdateChannel) listenLoop: for { select { case msg, ok := <-subscription.Channel(): if !ok { s.logger.Warn("Subscription channel closed by publisher. Re-subscribing...") break listenLoop } s.logger.Infof("Received settings update notification: %s", string(msg.Payload)) if err := s.UpdateLogCleanupTask(); err != nil { s.logger.WithError(err).Error("Failed to update log cleanup task after notification.") } case <-s.stopChan: s.logger.Info("Stopping settings update listener.") subscription.Close() cancel() return } } subscription.Close() cancel() } } // [MODIFIED] - UpdateLogCleanupTask 现在会动态生成 cron 表达式 func (s *Scheduler) UpdateLogCleanupTask() error { if err := s.gocronScheduler.RemoveByTag(LogCleanupTaskTag); err != nil { // This is not an error, just means the job didn't exist } settings := s.settingsManager.GetSettings() if !settings.LogAutoCleanupEnabled || settings.LogAutoCleanupRetentionDays <= 0 { s.logger.Info("Log auto-cleanup is disabled. Task removed or not scheduled.") return nil } days := settings.LogAutoCleanupRetentionDays // [NEW] 解析时间并生成 cron 表达式 cronSpec, err := parseTimeToCron(settings.LogAutoCleanupTime) if err != nil { s.logger.WithError(err).Warnf("Invalid cleanup time format '%s'. Falling back to default '04:05'.", settings.LogAutoCleanupTime) cronSpec = "5 4 * * *" // 安全回退 } s.logger.Infof("Scheduling/updating daily log cleanup task to retain last %d days of logs, using cron spec: '%s'", days, cronSpec) _, err = s.gocronScheduler.Cron(cronSpec).Tag(LogCleanupTaskTag).Do(func() { s.logger.Infof("Executing daily log cleanup, deleting logs older than %d days...", days) ctx := context.Background() deletedCount, err := s.logService.DeleteOldLogs(ctx, days) if err != nil { s.logger.WithError(err).Error("Daily log cleanup task failed.") } else { s.logger.Infof("Daily log cleanup task completed. Deleted %d old logs.", deletedCount) } }) if err != nil { s.logger.WithError(err).Error("Failed to schedule new log cleanup task.") return err } s.logger.Info("Log cleanup task updated successfully.") return nil } // [NEW] - 用于解析 "HH:mm" 格式时间为 cron 表达式的辅助函数 func parseTimeToCron(timeStr string) (string, error) { parts := strings.Split(timeStr, ":") if len(parts) != 2 { return "", fmt.Errorf("invalid time format, expected HH:mm") } hour, err := strconv.Atoi(parts[0]) if err != nil || hour < 0 || hour > 23 { return "", fmt.Errorf("invalid hour value: %s", parts[0]) } minute, err := strconv.Atoi(parts[1]) if err != nil || minute < 0 || minute > 59 { return "", fmt.Errorf("invalid minute value: %s", parts[1]) } return fmt.Sprintf("%d %d * * *", minute, hour), nil } func (s *Scheduler) Stop() { s.logger.Info("Stopping scheduler...") close(s.stopChan) s.gocronScheduler.Stop() s.wg.Wait() s.logger.Info("Scheduler stopped gracefully.") }