package syncer import ( "context" "fmt" "gemini-balancer/internal/store" "sync" "time" "github.com/sirupsen/logrus" ) const ( ReconnectDelay = 5 * time.Second ReloadTimeout = 30 * time.Second ) type LoaderFunc[T any] func() (T, error) type CacheSyncer[T any] struct { mu sync.RWMutex cache T loader LoaderFunc[T] store store.Store channelName string logger *logrus.Entry stopChan chan struct{} wg sync.WaitGroup } func NewCacheSyncer[T any]( loader LoaderFunc[T], store store.Store, channelName string, logger *logrus.Logger, ) (*CacheSyncer[T], error) { s := &CacheSyncer[T]{ loader: loader, store: store, channelName: channelName, logger: logger.WithField("component", fmt.Sprintf("CacheSyncer[%s]", channelName)), stopChan: make(chan struct{}), } if err := s.reload(); err != nil { return nil, fmt.Errorf("initial load failed: %w", err) } s.wg.Add(1) go s.listenForUpdates() return s, nil } func (s *CacheSyncer[T]) Get() T { s.mu.RLock() defer s.mu.RUnlock() return s.cache } func (s *CacheSyncer[T]) Invalidate() error { s.logger.Info("Publishing invalidation notification") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := s.store.Publish(ctx, s.channelName, []byte("reload")); err != nil { s.logger.WithError(err).Error("Failed to publish invalidation") return err } return nil } func (s *CacheSyncer[T]) Stop() { close(s.stopChan) s.wg.Wait() s.logger.Info("CacheSyncer stopped") } func (s *CacheSyncer[T]) reload() error { s.logger.Info("Reloading cache...") ctx, cancel := context.WithTimeout(context.Background(), ReloadTimeout) defer cancel() type result struct { data T err error } resultChan := make(chan result, 1) go func() { data, err := s.loader() resultChan <- result{data, err} }() select { case res := <-resultChan: if res.err != nil { s.logger.WithError(res.err).Error("Failed to reload cache") return res.err } s.mu.Lock() s.cache = res.data s.mu.Unlock() s.logger.Info("Cache reloaded successfully") return nil case <-ctx.Done(): s.logger.Error("Cache reload timeout") return fmt.Errorf("reload timeout after %v", ReloadTimeout) } } func (s *CacheSyncer[T]) listenForUpdates() { defer s.wg.Done() for { select { case <-s.stopChan: return default: } if err := s.subscribeAndListen(); err != nil { s.logger.WithError(err).Warnf("Subscription error, retrying in %v", ReconnectDelay) select { case <-time.After(ReconnectDelay): case <-s.stopChan: return } } } } func (s *CacheSyncer[T]) subscribeAndListen() error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() subscription, err := s.store.Subscribe(ctx, s.channelName) if err != nil { return fmt.Errorf("failed to subscribe: %w", err) } defer subscription.Close() s.logger.Info("Subscribed to channel") for { select { case msg, ok := <-subscription.Channel(): if !ok { return fmt.Errorf("subscription channel closed") } s.logger.WithField("message", string(msg.Payload)).Info("Received invalidation notification") if err := s.reload(); err != nil { s.logger.WithError(err).Error("Failed to reload after notification") } case <-s.stopChan: return nil } } }