diff --git a/config.yaml b/config.yaml index 06c1cd2..f0c1824 100644 --- a/config.yaml +++ b/config.yaml @@ -12,7 +12,7 @@ server: # 日志级别 log: - level: "info" + level: "debug" # 日志轮转配置 max_size: 100 # MB diff --git a/frontend/js/pages/logs/index.js b/frontend/js/pages/logs/index.js index 41513fd..32197c1 100644 --- a/frontend/js/pages/logs/index.js +++ b/frontend/js/pages/logs/index.js @@ -5,17 +5,19 @@ import CustomSelectV2 from '../../components/customSelectV2.js'; import { debounce } from '../../utils/utils.js'; import FilterPopover from '../../components/filterPopover.js'; import { STATIC_ERROR_MAP, STATUS_CODE_MAP } from './logList.js'; +import SystemLogTerminal from './systemLog.js'; + const dataStore = { groups: new Map(), keys: new Map(), }; + class LogsPage { constructor() { this.state = { logs: [], pagination: { page: 1, pages: 1, total: 0, page_size: 20 }, isLoading: true, - // [优化] 统一将所有可筛选字段在此处初始化,便于管理 filters: { page: 1, page_size: 20, @@ -26,33 +28,106 @@ class LogsPage { status_codes: new Set(), }, selectedLogIds: new Set(), + currentView: 'error', }; this.elements = { - tableBody: document.getElementById('logs-table-body'), - selectedCount: document.querySelector('.flex-1.text-sm span.font-semibold:nth-child(1)'), - totalCount: document.querySelector('.flex-1.text-sm span:last-child'), - pageSizeSelect: document.querySelector('[data-component="custom-select-v2"] select'), - pageInfo: document.querySelector('.flex.w-\\[100px\\]'), - paginationBtns: document.querySelectorAll('[data-pagination-controls] button'), - selectAllCheckbox: document.querySelector('thead .table-head-cell input[type="checkbox"]'), - searchInput: document.getElementById('log-search-input'), - errorTypeFilterBtn: document.getElementById('filter-error-type-btn'), - errorCodeFilterBtn: document.getElementById('filter-error-code-btn'), + tabsContainer: document.querySelector('[data-sliding-tabs-container]'), + contentContainer: document.getElementById('log-content-container'), + errorFilters: document.getElementById('error-logs-filters'), + systemControls: document.getElementById('system-logs-controls'), + errorTemplate: document.getElementById('error-logs-template'), + systemTemplate: document.getElementById('system-logs-template'), }; - this.initialized = !!this.elements.tableBody; + this.initialized = !!this.elements.contentContainer; if (this.initialized) { - this.logList = new LogList(this.elements.tableBody, dataStore); - const selectContainer = document.querySelector('[data-component="custom-select-v2"]'); - if (selectContainer) { new CustomSelectV2(selectContainer); } + this.logList = null; + this.systemLogTerminal = null; this.debouncedLoadAndRender = debounce(() => this.loadAndRenderLogs(), 300); } } async init() { if (!this.initialized) return; + this._initPermanentEventListeners(); + await this.loadGroupsOnce(); + this.state.currentView = null; + this.switchToView('error'); + } + _initPermanentEventListeners() { + this.elements.tabsContainer.addEventListener('click', (event) => { + const tabItem = event.target.closest('[data-tab-target]'); + if (!tabItem) return; + event.preventDefault(); + const viewName = tabItem.dataset.tabTarget; + if (viewName) { + this.switchToView(viewName); + } + }); + } + switchToView(viewName) { + if (this.state.currentView === viewName && this.elements.contentContainer.innerHTML !== '') return; + if (this.systemLogTerminal) { + this.systemLogTerminal.disconnect(); + this.systemLogTerminal = null; + } + this.state.currentView = viewName; + this.elements.contentContainer.innerHTML = ''; + if (viewName === 'error') { + this.elements.errorFilters.classList.remove('hidden'); + this.elements.systemControls.classList.add('hidden'); + const template = this.elements.errorTemplate.content.cloneNode(true); + this.elements.contentContainer.appendChild(template); + requestAnimationFrame(() => { + this._initErrorLogView(); + }); + } else if (viewName === 'system') { + this.elements.errorFilters.classList.add('hidden'); + this.elements.systemControls.classList.remove('hidden'); + const template = this.elements.systemTemplate.content.cloneNode(true); + this.elements.contentContainer.appendChild(template); + requestAnimationFrame(() => { + this._initSystemLogView(); + }); + } + } + _initErrorLogView() { + this.elements.tableBody = document.getElementById('logs-table-body'); + this.elements.selectedCount = document.querySelector('.flex-1.text-sm span.font-semibold:nth-child(1)'); + this.elements.totalCount = document.querySelector('.flex-1.text-sm span:last-child'); + this.elements.pageSizeSelect = document.querySelector('[data-component="custom-select-v2"] select'); + this.elements.pageInfo = document.querySelector('.flex.w-\\[100px\\]'); + this.elements.paginationBtns = document.querySelectorAll('[data-pagination-controls] button'); + this.elements.selectAllCheckbox = document.querySelector('thead .table-head-cell input[type="checkbox"]'); + this.elements.searchInput = document.getElementById('log-search-input'); + this.elements.errorTypeFilterBtn = document.getElementById('filter-error-type-btn'); + this.elements.errorCodeFilterBtn = document.getElementById('filter-error-code-btn'); + this.logList = new LogList(this.elements.tableBody, dataStore); + const selectContainer = document.querySelector('[data-component="custom-select-v2"]'); + if (selectContainer) { new CustomSelectV2(selectContainer); } this.initFilterPopovers(); this.initEventListeners(); - await this.loadGroupsOnce(); - await this.loadAndRenderLogs(); + this.loadAndRenderLogs(); + } + _initSystemLogView() { + this.systemLogTerminal = new SystemLogTerminal( + this.elements.contentContainer, + this.elements.systemControls + ); + Swal.fire({ + title: '实时系统日志', + text: '您即将连接到实时日志流。这会与服务器建立一个持续的连接。', + icon: 'info', + confirmButtonText: '我明白了,开始连接', + showCancelButton: true, + cancelButtonText: '取消', + target: '#main-content-wrapper', + }).then((result) => { + if (result.isConfirmed) { + this.systemLogTerminal.connect(); + } else { + const errorLogTab = Array.from(this.elements.tabsContainer.querySelectorAll('[data-tab-target="error"]'))[0]; + if (errorLogTab) errorLogTab.click(); + } + }); } initFilterPopovers() { const errorTypeOptions = [ diff --git a/frontend/js/pages/logs/systemLog.js b/frontend/js/pages/logs/systemLog.js new file mode 100644 index 0000000..8dd87bc --- /dev/null +++ b/frontend/js/pages/logs/systemLog.js @@ -0,0 +1,157 @@ +// Filename: frontend/js/pages/logs/systemLog.js + +export default class SystemLogTerminal { + constructor(container, controlsContainer) { + this.container = container; + this.controlsContainer = controlsContainer; + this.ws = null; + this.isPaused = false; + this.shouldAutoScroll = true; + this.reconnectAttempts = 0; + this.maxReconnectAttempts = 5; + + this.elements = { + output: this.container.querySelector('#log-terminal-output'), + statusIndicator: this.controlsContainer.querySelector('#terminal-status-indicator'), + clearBtn: this.controlsContainer.querySelector('[data-action="clear-terminal"]'), + pauseBtn: this.controlsContainer.querySelector('[data-action="toggle-pause-terminal"]'), + scrollBtn: this.controlsContainer.querySelector('[data-action="toggle-scroll-terminal"]'), + disconnectBtn: this.controlsContainer.querySelector('[data-action="disconnect-terminal"]'), + }; + + this._initEventListeners(); + } + + _initEventListeners() { + this.elements.clearBtn.addEventListener('click', () => this.clear()); + this.elements.pauseBtn.addEventListener('click', () => this.togglePause()); + this.elements.scrollBtn.addEventListener('click', () => this.toggleAutoScroll()); + this.elements.disconnectBtn.addEventListener('click', () => this.disconnect()); + } + + connect() { + this.clear(); + this._appendMessage('info', '正在连接到实时日志流...'); + this._updateStatus('connecting', '连接中...'); + + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = `${protocol}//${window.location.host}/ws/system-logs`; + + this.ws = new WebSocket(wsUrl); + + this.ws.onopen = () => { + this._appendMessage('info', '✓ 已连接到系统日志流'); + this._updateStatus('connected', '已连接'); + this.reconnectAttempts = 0; + }; + + this.ws.onmessage = (event) => { + if (this.isPaused) return; + + try { + const data = JSON.parse(event.data); + const levelColors = { + 'error': 'text-red-500', + 'warning': 'text-yellow-400', + 'info': 'text-blue-400', + 'debug': 'text-zinc-400' + }; + const color = levelColors[data.level] || 'text-zinc-200'; + const timestamp = new Date(data.timestamp).toLocaleTimeString(); + const msg = `[${timestamp}] [${data.level.toUpperCase()}] ${data.message}`; + this._appendMessage(color, msg); + } catch (e) { + this._appendMessage('text-zinc-200', event.data); + } + }; + + this.ws.onerror = (error) => { + this._appendMessage('error', `✗ WebSocket 错误`); + this._updateStatus('error', '连接错误'); + }; + + this.ws.onclose = () => { + this._appendMessage('error', '✗ 连接已断开'); + this._updateStatus('disconnected', '未连接'); + + if (this.reconnectAttempts < this.maxReconnectAttempts) { + this.reconnectAttempts++; + setTimeout(() => { + this._appendMessage('info', `尝试重新连接 (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`); + this.connect(); + }, 3000); + } + }; + } + + disconnect() { + if (this.ws) { + this.ws.close(); + this.ws = null; + } + this.reconnectAttempts = this.maxReconnectAttempts; + this._updateStatus('disconnected', '未连接'); + } + + clear() { + if(this.elements.output) { + this.elements.output.innerHTML = ''; + } + } + + togglePause() { + this.isPaused = !this.isPaused; + const span = this.elements.pauseBtn.querySelector('span'); + const icon = this.elements.pauseBtn.querySelector('i'); + if (this.isPaused) { + span.textContent = '继续'; + icon.classList.replace('fa-pause', 'fa-play'); + } else { + span.textContent = '暂停'; + icon.classList.replace('fa-play', 'fa-pause'); + } + } + + toggleAutoScroll() { + this.shouldAutoScroll = !this.shouldAutoScroll; + const span = this.elements.scrollBtn.querySelector('span'); + if (this.shouldAutoScroll) { + span.textContent = '自动滚动'; + } else { + span.textContent = '手动滚动'; + } + } + + _appendMessage(colorClass, text) { + if (!this.elements.output) return; + + const p = document.createElement('p'); + p.className = colorClass; + p.textContent = text; + this.elements.output.appendChild(p); + + if (this.shouldAutoScroll) { + this.elements.output.scrollTop = this.elements.output.scrollHeight; + } + } + + _updateStatus(status, text) { + const indicator = this.elements.statusIndicator.querySelector('span.relative'); + const statusText = this.elements.statusIndicator.childNodes[2]; + + const colors = { + 'connecting': 'bg-yellow-500', + 'connected': 'bg-green-500', + 'disconnected': 'bg-zinc-500', + 'error': 'bg-red-500' + }; + + indicator.querySelectorAll('span').forEach(span => { + span.className = span.className.replace(/bg-\w+-\d+/g, colors[status] || colors.disconnected); + }); + + if (statusText) { + statusText.textContent = ` ${text}`; + } + } +} diff --git a/go.mod b/go.mod index 38d19c5..910b1b7 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/go-co-op/gocron v1.37.0 github.com/go-sql-driver/mysql v1.9.3 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.7.5 github.com/microcosm-cc/bluemonday v1.0.27 github.com/redis/go-redis/v9 v9.3.0 diff --git a/go.sum b/go.sum index fc28ad2..e0a521c 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,8 @@ github.com/gorilla/css v1.0.1 h1:ntNaBIghp6JmvWnxbZKANoLyuXTPZ4cAMlo6RyhlbO8= github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A3b0= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= diff --git a/internal/container/container.go b/internal/container/container.go index e8a12e3..583d3db 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -2,14 +2,11 @@ package container import ( - "fmt" "gemini-balancer/internal/app" "gemini-balancer/internal/channel" "gemini-balancer/internal/config" "gemini-balancer/internal/crypto" "gemini-balancer/internal/db" - "gemini-balancer/internal/db/dialect" - "gemini-balancer/internal/db/migrations" "gemini-balancer/internal/domain/proxy" "gemini-balancer/internal/domain/upstream" "gemini-balancer/internal/handlers" @@ -21,13 +18,10 @@ import ( "gemini-balancer/internal/service" "gemini-balancer/internal/settings" "gemini-balancer/internal/store" - "gemini-balancer/internal/syncer" "gemini-balancer/internal/task" "gemini-balancer/internal/webhandlers" - "github.com/sirupsen/logrus" "go.uber.org/dig" - "gorm.io/gorm" ) func BuildContainer() (*dig.Container, error) { @@ -35,20 +29,9 @@ func BuildContainer() (*dig.Container, error) { // =========== 阶段一: 基础设施层 (Infrastructure) =========== container.Provide(config.LoadConfig) - - container.Provide(func(cfg *config.Config, logger *logrus.Logger) (*gorm.DB, dialect.DialectAdapter, error) { - gormDB, adapter, err := db.NewDB(cfg, logger) - if err != nil { - return nil, nil, err - } - // 迁移运行逻辑 - if err := migrations.RunVersionedMigrations(gormDB, cfg, logger); err != nil { - return nil, nil, fmt.Errorf("failed to run versioned migrations: %w", err) - } - return gormDB, adapter, nil - }) + container.Provide(logging.NewLoggerWithWebSocket) + container.Provide(db.NewDBWithMigrations) container.Provide(store.NewStore) - container.Provide(logging.NewLogger) container.Provide(crypto.NewService) container.Provide(repository.NewAuthTokenRepository) container.Provide(repository.NewGroupRepository) @@ -85,10 +68,7 @@ func BuildContainer() (*dig.Container, error) { // --- Syncer & Loader for GroupManager --- container.Provide(service.NewGroupManagerLoader) // 为GroupManager配置Syncer - container.Provide(func(loader syncer.LoaderFunc[service.GroupManagerCacheData], store store.Store, logger *logrus.Logger) (*syncer.CacheSyncer[service.GroupManagerCacheData], error) { - const groupUpdateChannel = "groups:cache_invalidation" - return syncer.NewCacheSyncer(loader, store, groupUpdateChannel, logger) - }) + container.Provide(service.NewGroupManagerSyncer) // =========== 阶段三: 适配器与处理器层 (Handlers & Adapters) =========== diff --git a/internal/db/db.go b/internal/db/db.go index 38f49bd..108dcbd 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -2,8 +2,10 @@ package db import ( + "fmt" "gemini-balancer/internal/config" "gemini-balancer/internal/db/dialect" + "gemini-balancer/internal/db/migrations" stdlog "log" "os" "path/filepath" @@ -86,3 +88,16 @@ func NewDB(cfg *config.Config, appLogger *logrus.Logger) (*gorm.DB, dialect.Dial Logger.Info("Database connection established successfully.") return db, adapter, nil } + +func NewDBWithMigrations(cfg *config.Config, logger *logrus.Logger) (*gorm.DB, dialect.DialectAdapter, error) { + gormDB, adapter, err := NewDB(cfg, logger) + if err != nil { + return nil, nil, err + } + + if err := migrations.RunVersionedMigrations(gormDB, cfg, logger); err != nil { + return nil, nil, fmt.Errorf("failed to run versioned migrations: %w", err) + } + + return gormDB, adapter, nil +} diff --git a/internal/errors/api_error.go b/internal/errors/api_error.go index 3cfba3b..eb1637e 100644 --- a/internal/errors/api_error.go +++ b/internal/errors/api_error.go @@ -37,6 +37,7 @@ var ( ErrForbidden = &APIError{HTTPStatus: http.StatusForbidden, Code: "FORBIDDEN", Message: "You do not have permission to access this resource"} ErrTaskInProgress = &APIError{HTTPStatus: http.StatusConflict, Code: "TASK_IN_PROGRESS", Message: "A task is already in progress"} ErrBadGateway = &APIError{HTTPStatus: http.StatusBadGateway, Code: "BAD_GATEWAY", Message: "Upstream service error"} + ErrGatewayTimeout = &APIError{HTTPStatus: http.StatusGatewayTimeout, Code: "BAD_GATEWAY_TIMEOUT", Message: "Bad gateway timeout"} ErrNoActiveKeys = &APIError{HTTPStatus: http.StatusServiceUnavailable, Code: "NO_ACTIVE_KEYS", Message: "No active API keys available for this group"} ErrMaxRetriesExceeded = &APIError{HTTPStatus: http.StatusBadGateway, Code: "MAX_RETRIES_EXCEEDED", Message: "Request failed after maximum retries"} ErrNoKeysAvailable = &APIError{HTTPStatus: http.StatusServiceUnavailable, Code: "NO_KEYS_AVAILABLE", Message: "No API keys available to process the request"} diff --git a/internal/errors/upstream_errors.go b/internal/errors/upstream_errors.go index c85e456..a7c6b1c 100644 --- a/internal/errors/upstream_errors.go +++ b/internal/errors/upstream_errors.go @@ -37,6 +37,7 @@ var permanentErrorSubstrings = []string{ "permission_denied", // Catches the 'status' field in Google's JSON error, e.g., "status": "PERMISSION_DENIED". "service_disabled", // Catches the 'reason' field for disabled APIs, e.g., "reason": "SERVICE_DISABLED". "api has not been used", + "reported as leaked", // Leaked } // --- 2. Temporary Errors --- @@ -44,8 +45,10 @@ var permanentErrorSubstrings = []string{ // Action: Increment consecutive error count, potentially disable the key. var temporaryErrorSubstrings = []string{ "quota", + "Quota exceeded", "limit reached", "insufficient", + "request limit", "billing", "exceeded", "too many requests", @@ -74,6 +77,24 @@ var clientNetworkErrorSubstrings = []string{ "invalid query parameters", // 参数解析错误,归类为客户端错误 } +// --- 5. Retryable Network/Gateway Errors --- +// Errors that indicate temporary network or gateway issues, should retry with same or different key. +// Action: Retry the request. +var retryableNetworkErrorSubstrings = []string{ + "bad gateway", + "service unavailable", + "gateway timeout", + "connection refused", + "connection reset", + "stream transmission interrupted", // ✅ 新增:流式传输中断 + "failed to establish stream", // ✅ 新增:流式连接建立失败 + "upstream connect error", + "no healthy upstream", + "502", + "503", + "504", +} + // IsPermanentUpstreamError checks if an upstream error indicates the key is permanently invalid. func IsPermanentUpstreamError(msg string) bool { return containsSubstring(msg, permanentErrorSubstrings) @@ -97,6 +118,11 @@ func IsClientNetworkError(err error) bool { return containsSubstring(err.Error(), clientNetworkErrorSubstrings) } +// IsRetryableNetworkError checks if an error is a temporary network/gateway issue. +func IsRetryableNetworkError(msg string) bool { + return containsSubstring(msg, retryableNetworkErrorSubstrings) +} + // containsSubstring is a helper function to avoid code repetition. func containsSubstring(s string, substrings []string) bool { if s == "" { diff --git a/internal/handlers/proxy_handler.go b/internal/handlers/proxy_handler.go index f5bf442..1f7afcc 100644 --- a/internal/handlers/proxy_handler.go +++ b/internal/handlers/proxy_handler.go @@ -133,16 +133,25 @@ func (h *ProxyHandler) HandleProxy(c *gin.Context) { func (h *ProxyHandler) serveTransparentProxy(c *gin.Context, requestBody []byte, initialResources *service.RequestResources, finalOpConfig *models.KeyGroupSettings, modelName, groupName string, isPreciseRouting bool) { startTime := time.Now() correlationID := uuid.New().String() + // ✅ 检查是否是流式请求 + isStreamRequest := h.channel.IsStreamRequest(c, requestBody) + // ✅ 流式请求也支持重试 + if isStreamRequest { + h.serveStreamWithRetry(c, requestBody, initialResources, finalOpConfig, modelName, groupName, isPreciseRouting, correlationID, startTime) + return + } var finalRecorder *httptest.ResponseRecorder var lastUsedResources *service.RequestResources var finalProxyErr *errors.APIError var isSuccess bool var finalPromptTokens, finalCompletionTokens, actualRetries int - defer h.publishFinalLogEvent(c, startTime, correlationID, modelName, lastUsedResources, - finalRecorder, finalProxyErr, isSuccess, finalPromptTokens, finalCompletionTokens, - actualRetries, isPreciseRouting) + defer func() { + h.publishFinalLogEvent(c, startTime, correlationID, modelName, lastUsedResources, + finalRecorder, finalProxyErr, isSuccess, finalPromptTokens, finalCompletionTokens, + actualRetries, isPreciseRouting) + }() maxRetries := h.getMaxRetries(isPreciseRouting, finalOpConfig) totalAttempts := maxRetries + 1 @@ -158,6 +167,7 @@ func (h *ProxyHandler) serveTransparentProxy(c *gin.Context, requestBody []byte, resources, err := h.getResourcesForAttempt(c, attempt, initialResources, modelName, groupName, isPreciseRouting, correlationID) if err != nil { + h.logger.WithField("id", correlationID).Errorf("❌ getResourcesForAttempt failed: %v", err) if apiErr, ok := err.(*errors.APIError); ok { finalProxyErr = apiErr } else { @@ -165,7 +175,9 @@ func (h *ProxyHandler) serveTransparentProxy(c *gin.Context, requestBody []byte, } break } - lastUsedResources = resources + + h.logger.WithField("id", correlationID).Infof("✅ Got resources: KeyID=%d", resources.APIKey.ID) + // lastUsedResources = resources if attempt > 1 { actualRetries = attempt - 1 } @@ -176,8 +188,15 @@ func (h *ProxyHandler) serveTransparentProxy(c *gin.Context, requestBody []byte, c, correlationID, requestBody, resources, isPreciseRouting, groupName, &finalPromptTokens, &finalCompletionTokens, ) + h.logger.WithField("id", correlationID).Infof("✅ Before assignment: lastUsedResources=%v", lastUsedResources) finalRecorder, finalProxyErr, isSuccess = recorder, attemptErr, attemptSuccess + // ✅ 修正 isSuccess + if finalProxyErr != nil || (finalRecorder != nil && finalRecorder.Code >= 400) { + isSuccess = false + } + lastUsedResources = resources + h.logger.WithField("id", correlationID).Infof("✅ After assignment: lastUsedResources=%v", lastUsedResources) h.resourceService.ReportRequestResult(resources, isSuccess, finalProxyErr) if isSuccess { @@ -192,10 +211,307 @@ func (h *ProxyHandler) serveTransparentProxy(c *gin.Context, requestBody []byte, h.writeFinalResponse(c, correlationID, finalRecorder, finalProxyErr) } +// ✅ 修改 serveStreamWithRetry,添加 nil 检查 +func (h *ProxyHandler) serveStreamWithRetry(c *gin.Context, requestBody []byte, initialResources *service.RequestResources, finalOpConfig *models.KeyGroupSettings, modelName, groupName string, isPreciseRouting bool, correlationID string, startTime time.Time) { + initialResources.RequestConfig = h.buildFinalRequestConfig( + h.settingsManager.GetSettings(), + initialResources.RequestConfig, + ) + h.logger.WithField("id", correlationID).Info("🌊 Serving stream request with retry support") + var lastUsedResources *service.RequestResources + var finalProxyErr *errors.APIError + var isSuccess bool + var actualRetries int + defer func() { + h.publishFinalLogEvent(c, startTime, correlationID, modelName, lastUsedResources, + nil, finalProxyErr, isSuccess, 0, 0, actualRetries, isPreciseRouting) + }() + maxRetries := h.getMaxRetries(isPreciseRouting, finalOpConfig) + totalAttempts := maxRetries + 1 + for attempt := 1; attempt <= totalAttempts; attempt++ { + // ✅ 检查客户端是否断开连接 + if c.Request.Context().Err() != nil { + h.logger.WithField("id", correlationID).Info("Client disconnected, aborting retry loop.") + if finalProxyErr == nil { + finalProxyErr = errors.NewAPIError(errors.ErrBadRequest, "Client disconnected") + } + break + } + // ✅ 获取资源(第一次使用 initialResources,后续重试获取新资源) + resources, err := h.getResourcesForAttempt(c, attempt, initialResources, modelName, groupName, isPreciseRouting, correlationID) + if err != nil { + h.logger.WithField("id", correlationID).Errorf("❌ Failed to get resources: %v", err) + if apiErr, ok := err.(*errors.APIError); ok { + finalProxyErr = apiErr + } else { + finalProxyErr = errors.NewAPIError(errors.ErrInternalServer, "Failed to get resources for retry") + } + break + } + if attempt > 1 { + actualRetries = attempt - 1 + } + h.logger.WithField("id", correlationID).Infof("🔄 Stream attempt %d/%d (KeyID=%d, GroupID=%d)", + attempt, totalAttempts, resources.APIKey.ID, resources.KeyGroup.ID) + // ✅ 执行流式代理请求 + attemptErr, attemptSuccess := h.executeStreamAttempt( + c, correlationID, requestBody, resources, groupName, isPreciseRouting, + ) + finalProxyErr, isSuccess = attemptErr, attemptSuccess + lastUsedResources = resources + // ✅ 报告结果 + h.resourceService.ReportRequestResult(resources, isSuccess, finalProxyErr) + // ✅ 成功则退出 + if isSuccess { + h.logger.WithField("id", correlationID).Info("✅ Stream request succeeded") + break + } + // ✅ 判断是否应该停止重试 + if h.shouldStopRetrying(attempt, totalAttempts, finalProxyErr, correlationID) { + // ✅ 安全地记录错误信息(添加 nil 检查) + if finalProxyErr != nil { + h.logger.WithField("id", correlationID).Warnf("⛔ Stopping retry: %s", finalProxyErr.Message) + } else { + h.logger.WithField("id", correlationID).Warn("⛔ Stopping retry: unknown error") + } + break + } + // ✅ 发布重试日志事件 + h.publishStreamRetryLogEvent(c, startTime, correlationID, modelName, resources, attemptErr, actualRetries, isPreciseRouting) + // ✅ 简化重试日志 + if attempt < totalAttempts { + h.logger.WithField("id", correlationID).Infof("🔁 Retrying... (%d/%d)", attempt, totalAttempts-1) + } + } + // ✅ 如果所有尝试都失败,写入错误响应 + if !isSuccess && finalProxyErr != nil { + h.logger.WithField("id", correlationID).Warnf("❌ All stream attempts failed: %s (code=%s)", + finalProxyErr.Message, finalProxyErr.Code) + // ✅ 检查是否已经写入响应头 + if !c.Writer.Written() { + errToJSON(c, correlationID, finalProxyErr) + } else { + h.logger.WithField("id", correlationID).Warn("⚠️ Cannot write error, response already started") + } + } +} + +// 执行单次流式代理请求 +func (h *ProxyHandler) executeStreamAttempt( + c *gin.Context, + correlationID string, + requestBody []byte, + resources *service.RequestResources, + groupName string, + isPreciseRouting bool, +) (finalErr *errors.APIError, finalSuccess bool) { // ✅ 使用命名返回值 + + // ✅ 捕获 ReverseProxy 的 ErrAbortHandler panic + defer func() { + if r := recover(); r != nil { + // ✅ 如果是 http.ErrAbortHandler,说明流式响应已成功完成 + if r == http.ErrAbortHandler { + h.logger.WithField("id", correlationID).Debug("✅ Stream completed (ErrAbortHandler caught)") + // ✅ 修改命名返回值,确保返回成功状态 + finalErr = nil + finalSuccess = true + return + } + // ✅ 其他 panic 继续抛出 + h.logger.WithField("id", correlationID).Errorf("❌ Unexpected panic in stream: %v", r) + panic(r) + } + }() + var attemptErr *errors.APIError + var isSuccess bool + requestTimeout := time.Duration(h.settingsManager.GetSettings().RequestTimeoutSeconds) * time.Second + ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout) + defer cancel() + attemptReq := c.Request.Clone(ctx) + attemptReq.Body = io.NopCloser(bytes.NewReader(requestBody)) + attemptReq.ContentLength = int64(len(requestBody)) + // ✅ 创建独立的 ReverseProxy + streamProxy := &httputil.ReverseProxy{ + Transport: h.transparentProxy.Transport, + BufferPool: h.transparentProxy.BufferPool, + } + streamProxy.Director = func(r *http.Request) { + targetURL, _ := url.Parse(resources.UpstreamEndpoint.URL) + r.URL.Scheme, r.URL.Host, r.Host = targetURL.Scheme, targetURL.Host, targetURL.Host + var pureClientPath string + if isPreciseRouting { + pureClientPath = strings.TrimPrefix(r.URL.Path, "/proxy/"+groupName) + } else { + pureClientPath = r.URL.Path + } + r.URL.Path = h.channel.RewritePath(targetURL.Path, pureClientPath) + r.Header.Del("Authorization") + h.channel.ModifyRequest(r, resources.APIKey) + r.Header.Set("X-Correlation-ID", correlationID) + + // ✅ 添加:应用自定义请求头 + if resources.RequestConfig != nil { + for k, v := range resources.RequestConfig.CustomHeaders { + if strVal, ok := v.(string); ok { + r.Header.Set(k, strVal) + } + } + } + } + // ✅ 配置 Transport + transport := streamProxy.Transport.(*http.Transport) + if resources.ProxyConfig != nil { + proxyURLStr := fmt.Sprintf("%s://%s", resources.ProxyConfig.Protocol, resources.ProxyConfig.Address) + if proxyURL, err := url.Parse(proxyURLStr); err == nil { + transportCopy := transport.Clone() + transportCopy.Proxy = http.ProxyURL(proxyURL) + streamProxy.Transport = transportCopy + h.logger.WithField("id", correlationID).Infof("🔀 Using proxy: %s", proxyURLStr) + } + } + // ✅ 配置 ModifyResponse + streamProxy.ModifyResponse = func(resp *http.Response) error { + h.logger.WithField("id", correlationID).Infof("📨 Stream response: status=%d, contentType=%s", + resp.StatusCode, resp.Header.Get("Content-Type")) + // ✅ 处理 gzip 解压 + if resp.Header.Get("Content-Encoding") == "gzip" { + gzReader, err := gzip.NewReader(resp.Body) + if err != nil { + h.logger.WithField("id", correlationID).WithError(err).Error("Failed to create gzip reader") + attemptErr = errors.NewAPIError(errors.ErrBadGateway, "Failed to decompress response") + isSuccess = false + return fmt.Errorf("gzip decompression failed: %w", err) + } + resp.Body = gzReader + resp.Header.Del("Content-Encoding") + } + // ✅ 成功响应:直接透传 + if resp.StatusCode < 400 { + isSuccess = true + h.logger.WithField("id", correlationID).Info("✅ Stream response marked as success") + return nil + } + // ✅ 错误响应:读取错误信息(用于重试判断) + isSuccess = false + // ✅ 读取错误响应体 + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + h.logger.WithField("id", correlationID).WithError(err).Error("Failed to read error response") + attemptErr = errors.NewAPIError(errors.ErrBadGateway, "Failed to read upstream error") + } else { + // ✅ 根据状态码决定是否输出详细错误信息 + shouldLogErrorBody := h.shouldLogErrorBody(resp.StatusCode) + if shouldLogErrorBody { + h.logger.WithField("id", correlationID).Errorf("❌ Stream error: status=%d, body=%s", + resp.StatusCode, string(bodyBytes)) + } else { + // ✅ 对于常见错误(429、403等),只记录简要信息 + errorSummary := h.extractErrorSummary(bodyBytes) + h.logger.WithField("id", correlationID).Warnf("⚠️ Stream error: status=%d, summary=%s", + resp.StatusCode, errorSummary) + } + attemptErr = errors.NewAPIErrorWithUpstream(resp.StatusCode, + fmt.Sprintf("UPSTREAM_%d", resp.StatusCode), bodyBytes) + } + // ✅ 返回错误,触发 ErrorHandler(但不写入响应,因为可能需要重试) + return fmt.Errorf("upstream error: status %d", resp.StatusCode) + } + // ✅ 配置 ErrorHandler + streamProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + h.logger.WithField("id", correlationID).Debugf("Stream proxy error handler triggered: %v", err) + // ✅ 如果 attemptErr 未设置,根据错误类型创建 + if attemptErr == nil { + isSuccess = false + if err == context.DeadlineExceeded { + attemptErr = errors.NewAPIError(errors.ErrGatewayTimeout, "Request timeout") + } else if err == context.Canceled { + attemptErr = errors.NewAPIError(errors.ErrBadRequest, "Request canceled") + } else if errors.IsClientNetworkError(err) { + attemptErr = errors.NewAPIError(errors.ErrBadRequest, "Client connection closed") + } else { + attemptErr = errors.NewAPIError(errors.ErrBadGateway, err.Error()) + } + } + // ✅ 不在这里写入响应,让外层重试逻辑决定 + } + // ✅ 执行代理请求(可能抛出 ErrAbortHandler) + streamProxy.ServeHTTP(c.Writer, attemptReq) + // ✅ 正常返回(如果没有 panic) + return attemptErr, isSuccess +} + +// ✅ 新增:判断是否应该记录详细错误体 +func (h *ProxyHandler) shouldLogErrorBody(statusCode int) bool { + // ✅ 对于常见的客户端错误和限流错误,不记录详细错误体 + commonErrors := map[int]bool{ + 400: true, // Bad Request + 401: true, // Unauthorized + 403: true, // Forbidden + 404: true, // Not Found + 429: true, // Too Many Requests + } + + return !commonErrors[statusCode] +} + +// ✅ 新增:从错误响应中提取简要信息 +func (h *ProxyHandler) extractErrorSummary(bodyBytes []byte) string { + // ✅ 尝试解析 JSON 错误响应 + var errorResp struct { + Error struct { + Message string `json:"message"` + Code int `json:"code"` + Status string `json:"status"` + } `json:"error"` + } + if err := json.Unmarshal(bodyBytes, &errorResp); err == nil && errorResp.Error.Message != "" { + // ✅ 截取错误消息的前100个字符 + message := errorResp.Error.Message + if len(message) > 100 { + message = message[:100] + "..." + } + if errorResp.Error.Status != "" { + return fmt.Sprintf("%s: %s", errorResp.Error.Status, message) + } + return message + } + // ✅ 如果无法解析 JSON,返回前100个字符 + if len(bodyBytes) > 100 { + return string(bodyBytes[:100]) + "..." + } + return string(bodyBytes) +} + +// ✅ 新增:发布流式重试日志事件 +func (h *ProxyHandler) publishStreamRetryLogEvent(c *gin.Context, startTime time.Time, corrID, modelName string, res *service.RequestResources, attemptErr *errors.APIError, retries int, isPrecise bool) { + retryEvent := h.createLogEvent(c, startTime, corrID, modelName, res, models.LogTypeRetry, isPrecise) + retryEvent.RequestLog.LatencyMs = int(time.Since(startTime).Milliseconds()) + retryEvent.RequestLog.IsSuccess = false + retryEvent.RequestLog.Retries = retries + + if attemptErr != nil { + retryEvent.Error = attemptErr + retryEvent.RequestLog.ErrorCode = attemptErr.Code + retryEvent.RequestLog.ErrorMessage = attemptErr.Message + retryEvent.RequestLog.Status = attemptErr.Status + retryEvent.RequestLog.StatusCode = attemptErr.HTTPStatus + } + + eventData, err := json.Marshal(retryEvent) + if err != nil { + h.logger.WithField("id", corrID).WithError(err).Error("Failed to marshal stream retry log event") + return + } + + if err := h.store.Publish(context.Background(), models.TopicRequestFinished, eventData); err != nil { + h.logger.WithField("id", corrID).WithError(err).Error("Failed to publish stream retry log event") + } +} + func (h *ProxyHandler) executeProxyAttempt(c *gin.Context, corrID string, body []byte, res *service.RequestResources, isPrecise bool, groupName string, pTokens, cTokens *int) (*httptest.ResponseRecorder, *errors.APIError, bool) { recorder := httptest.NewRecorder() var attemptErr *errors.APIError - var isSuccess bool + isSuccess := false requestTimeout := time.Duration(h.settingsManager.GetSettings().RequestTimeoutSeconds) * time.Second ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout) @@ -205,11 +521,26 @@ func (h *ProxyHandler) executeProxyAttempt(c *gin.Context, corrID string, body [ attemptReq.Body = io.NopCloser(bytes.NewReader(body)) attemptReq.ContentLength = int64(len(body)) + h.logger.WithField("id", corrID).Infof("🚀 Starting proxy attempt with KeyID=%d", res.APIKey.ID) + h.configureProxy(corrID, res, isPrecise, groupName, &attemptErr, &isSuccess, pTokens, cTokens) *attemptReq = *attemptReq.WithContext(context.WithValue(attemptReq.Context(), proxyErrorContextKey{}, &attemptErr)) h.transparentProxy.ServeHTTP(recorder, attemptReq) + h.logger.WithField("id", corrID).Infof("📥 Proxy returned: status=%d, bodyLen=%d, err=%v, success=%v", + recorder.Code, recorder.Body.Len(), attemptErr, isSuccess) + + // 调试检查 ✅ + if recorder.Code == 0 && attemptErr != nil { + h.logger.WithField("id", corrID).Warnf("⚠️ Fixing zero status code to %d", attemptErr.HTTPStatus) + recorder.Code = attemptErr.HTTPStatus + if recorder.Body.Len() == 0 { + errJSON, _ := json.Marshal(gin.H{"error": attemptErr}) + recorder.Body.Write(errJSON) + } + } + return recorder, attemptErr, isSuccess } @@ -248,21 +579,71 @@ func (h *ProxyHandler) configureProxy(corrID string, res *service.RequestResourc func (h *ProxyHandler) createModifyResponseFunc(attemptErr **errors.APIError, isSuccess *bool, pTokens, cTokens *int) func(*http.Response) error { return func(resp *http.Response) error { + corrID := resp.Request.Header.Get("X-Correlation-ID") + + h.logger.WithField("id", corrID).Infof("📨 Upstream response: status=%d, contentType=%s", + resp.StatusCode, resp.Header.Get("Content-Type")) + + // 检查是否是流式响应 + isStream := strings.Contains(resp.Header.Get("Content-Type"), "text/event-stream") + + // 处理 gzip 压缩 var reader io.ReadCloser = resp.Body if resp.Header.Get("Content-Encoding") == "gzip" { gzReader, err := gzip.NewReader(resp.Body) if err != nil { - h.logger.WithError(err).Error("Failed to create gzip reader") - } else { - reader = gzReader - resp.Header.Del("Content-Encoding") + h.logger.WithField("id", corrID).WithError(err).Error("Failed to create gzip reader") + *attemptErr = errors.NewAPIError(errors.ErrBadGateway, "Failed to decompress response") + *isSuccess = false + resp.Body = io.NopCloser(bytes.NewReader([]byte{})) + return nil + } + reader = gzReader + resp.Header.Del("Content-Encoding") + + // ✅ 对于流式响应,需要替换 resp.Body 为解压后的 reader + if isStream { + resp.Body = reader } } - defer reader.Close() + + if isStream { + h.logger.WithField("id", corrID).Info("📡 Processing stream response") + + if resp.StatusCode < 400 { + *isSuccess = true + h.logger.WithField("id", corrID).Info("✅ Stream response marked as success, passing through") + // 不关闭 reader,让它继续流式传输 + return nil + } else { + // 错误响应才读取完整内容 + bodyBytes, err := io.ReadAll(reader) + reader.Close() + if err != nil { + h.logger.WithField("id", corrID).WithError(err).Error("Failed to read error response") + *attemptErr = errors.NewAPIError(errors.ErrBadGateway, "Failed to read upstream error") + } else { + h.logger.WithField("id", corrID).Errorf("❌ Stream error: status=%d, body=%s", + resp.StatusCode, string(bodyBytes)) + *attemptErr = errors.NewAPIErrorWithUpstream(resp.StatusCode, + fmt.Sprintf("UPSTREAM_%d", resp.StatusCode), bodyBytes) + } + *isSuccess = false + resp.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + return nil + } + } + + // 非流式响应:读取完整内容 + h.logger.WithField("id", corrID).Info("📄 Processing non-stream response") bodyBytes, err := io.ReadAll(reader) + reader.Close() + if err != nil { - *attemptErr = errors.NewAPIErrorWithUpstream(http.StatusBadGateway, "UPSTREAM_GATEWAY_ERROR", nil) + h.logger.WithField("id", corrID).WithError(err).Error("Failed to read response body") + *attemptErr = errors.NewAPIError(errors.ErrBadGateway, "Failed to read upstream response") + *isSuccess = false resp.Body = io.NopCloser(bytes.NewReader([]byte{})) return nil } @@ -270,9 +651,16 @@ func (h *ProxyHandler) createModifyResponseFunc(attemptErr **errors.APIError, is if resp.StatusCode < 400 { *isSuccess = true *pTokens, *cTokens = extractUsage(bodyBytes) + h.logger.WithField("id", corrID).Infof("✅ Success: bytes=%d, pTokens=%d, cTokens=%d", + len(bodyBytes), *pTokens, *cTokens) } else { - *attemptErr = errors.NewAPIErrorWithUpstream(resp.StatusCode, fmt.Sprintf("UPSTREAM_%d", resp.StatusCode), bodyBytes) + h.logger.WithField("id", corrID).Errorf("❌ Error: status=%d, body=%s", + resp.StatusCode, string(bodyBytes)) + *attemptErr = errors.NewAPIErrorWithUpstream(resp.StatusCode, + fmt.Sprintf("UPSTREAM_%d", resp.StatusCode), bodyBytes) + *isSuccess = false } + resp.Body = io.NopCloser(bytes.NewReader(bodyBytes)) return nil } @@ -319,10 +707,34 @@ func (h *ProxyHandler) shouldStopRetrying(attempt, totalAttempts int, err *error if attempt >= totalAttempts { return true } - if err != nil && errors.IsUnretryableRequestError(err.Message) { - h.logger.WithField("id", correlationID).Warnf("Attempt failed with unretryable request error. Aborting retries. Message: %s", err.Message) + + if err == nil { + return false + } + + // ✅ 不可重试的请求错误:立即停止 + if errors.IsUnretryableRequestError(err.Message) { + h.logger.WithField("id", correlationID).Warnf("Unretryable request error, aborting: %s", err.Message) return true } + + // ✅ 永久性上游错误:立即停止(Key 已失效) + if errors.IsPermanentUpstreamError(err.Message) { + h.logger.WithField("id", correlationID).Warnf("Permanent upstream error, aborting: %s", err.Message) + return false + } + + // ✅ 可重试的网络错误:继续重试 + if errors.IsRetryableNetworkError(err.Message) { + return false + } + + // ✅ 临时性错误(配额等):继续重试 + if errors.IsTemporaryUpstreamError(err.Message) { + return false + } + + // ✅ 其他未分类错误:继续重试 return false } diff --git a/internal/handlers/websocket_handler.go b/internal/handlers/websocket_handler.go new file mode 100644 index 0000000..cab65c6 --- /dev/null +++ b/internal/handlers/websocket_handler.go @@ -0,0 +1,68 @@ +// Filename: internal/handlers/websocket_handler.go +package handlers + +import ( + "net/http" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +type connWrapper struct { + conn *websocket.Conn + mu sync.Mutex +} + +type WebSocketHandler struct { + logger *logrus.Logger + clients sync.Map + upgrader websocket.Upgrader +} + +func NewWebSocketHandler(logger *logrus.Logger) *WebSocketHandler { + return &WebSocketHandler{ + logger: logger, + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + }, + } +} + +func (h *WebSocketHandler) HandleSystemLogs(c *gin.Context) { + conn, err := h.upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + h.logger.WithError(err).Error("Failed to upgrade websocket") + return + } + defer conn.Close() + + clientID := time.Now().UnixNano() + h.clients.Store(clientID, &connWrapper{conn: conn}) + defer h.clients.Delete(clientID) + + for { + if _, _, err := conn.ReadMessage(); err != nil { + break + } + } +} + +func (h *WebSocketHandler) BroadcastLog(entry *logrus.Entry) { + msg := map[string]interface{}{ + "timestamp": entry.Time.Format(time.RFC3339), + "level": entry.Level.String(), + "message": entry.Message, + "fields": entry.Data, + } + + h.clients.Range(func(key, value interface{}) bool { + wrapper := value.(*connWrapper) + wrapper.mu.Lock() + wrapper.conn.WriteJSON(msg) + wrapper.mu.Unlock() + return true + }) +} diff --git a/internal/logging/websocket_hook.go b/internal/logging/websocket_hook.go new file mode 100644 index 0000000..728af92 --- /dev/null +++ b/internal/logging/websocket_hook.go @@ -0,0 +1,36 @@ +// Filename: internal/logging/websocket_hook.go +package logging + +import ( + "gemini-balancer/internal/config" + "gemini-balancer/internal/handlers" + + "github.com/sirupsen/logrus" +) + +type WebSocketHook struct { + broadcaster func(*logrus.Entry) +} + +func NewWebSocketHook(broadcaster func(*logrus.Entry)) *WebSocketHook { + return &WebSocketHook{broadcaster: broadcaster} +} + +func (h *WebSocketHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +func (h *WebSocketHook) Fire(entry *logrus.Entry) error { + if h.broadcaster != nil { + go h.broadcaster(entry) + } + return nil +} + +func NewLoggerWithWebSocket(cfg *config.Config) (*logrus.Logger, *handlers.WebSocketHandler) { + logger := NewLogger(cfg) + wsHandler := handlers.NewWebSocketHandler(logger) + hook := NewWebSocketHook(wsHandler.BroadcastLog) + logger.AddHook(hook) + return logger, wsHandler +} diff --git a/internal/router/router.go b/internal/router/router.go index 6c6d324..b598346 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -36,6 +36,7 @@ func NewRouter( settingHandler *handlers.SettingHandler, dashboardHandler *handlers.DashboardHandler, taskHandler *handlers.TaskHandler, + wsHandler *handlers.WebSocketHandler, // Web Page Handlers webAuthHandler *webhandlers.WebAuthHandler, pageHandler *webhandlers.PageHandler, @@ -75,6 +76,7 @@ func NewRouter( registerWebRoutes(router, webAdminAuth, webAuthHandler, pageHandler) registerAdminRoutes(router, apiAdminAuth, keyGroupHandler, tokensHandler, apiKeyHandler, logHandler, settingHandler, dashboardHandler, taskHandler, upstreamModule, proxyModule) + registerWebSocketRoutes(router, wsHandler) registerProxyRoutes(router, proxyHandler, securityService, logger) return router @@ -408,3 +410,7 @@ func registerPublicAPIRoutes( // publicAPI.POST("/forgot-password", ipBanMiddleware, apiAuthHandler.HandleForgotPassword) } } + +func registerWebSocketRoutes(router *gin.Engine, wsHandler *handlers.WebSocketHandler) { + router.GET("/ws/system-logs", wsHandler.HandleSystemLogs) +} diff --git a/internal/service/group_manager.go b/internal/service/group_manager.go index 4ec10eb..0ec0777 100644 --- a/internal/service/group_manager.go +++ b/internal/service/group_manager.go @@ -9,6 +9,7 @@ import ( "gemini-balancer/internal/pkg/reflectutil" "gemini-balancer/internal/repository" "gemini-balancer/internal/settings" + "gemini-balancer/internal/store" "gemini-balancer/internal/syncer" "gemini-balancer/internal/utils" "net/url" @@ -421,3 +422,13 @@ func uniqueStrings(slice []string) []string { } return result } + +// GroupManager配置Syncer +func NewGroupManagerSyncer( + loader syncer.LoaderFunc[GroupManagerCacheData], + store store.Store, + logger *logrus.Logger, +) (*syncer.CacheSyncer[GroupManagerCacheData], error) { + const groupUpdateChannel = "groups:cache_invalidation" + return syncer.NewCacheSyncer(loader, store, groupUpdateChannel, logger) +} diff --git a/web/static/css/output.css b/web/static/css/output.css index 32d5368..f2db32d 100644 --- a/web/static/css/output.css +++ b/web/static/css/output.css @@ -28,6 +28,7 @@ --color-amber-600: oklch(66.6% 0.179 58.318); --color-yellow-100: oklch(97.3% 0.071 103.193); --color-yellow-300: oklch(90.5% 0.182 98.111); + --color-yellow-400: oklch(85.2% 0.199 91.936); --color-yellow-500: oklch(79.5% 0.184 86.047); --color-yellow-600: oklch(68.1% 0.162 75.834); --color-yellow-700: oklch(55.4% 0.135 66.442); @@ -420,9 +421,6 @@ .bottom-6 { bottom: calc(var(--spacing) * 6); } - .bottom-full { - bottom: 100%; - } .left-0 { left: calc(var(--spacing) * 0); } @@ -450,9 +448,6 @@ .z-50 { z-index: 50; } - .z-90 { - z-index: 90; - } .z-\[100\] { z-index: 100; } @@ -501,9 +496,6 @@ .my-1\.5 { margin-block: calc(var(--spacing) * 1.5); } - .mt-0 { - margin-top: calc(var(--spacing) * 0); - } .mt-0\.5 { margin-top: calc(var(--spacing) * 0.5); } @@ -622,9 +614,6 @@ width: calc(var(--spacing) * 6); height: calc(var(--spacing) * 6); } - .h-0 { - height: calc(var(--spacing) * 0); - } .h-0\.5 { height: calc(var(--spacing) * 0.5); } @@ -709,9 +698,6 @@ .w-0 { width: calc(var(--spacing) * 0); } - .w-1 { - width: calc(var(--spacing) * 1); - } .w-1\/4 { width: calc(1/4 * 100%); } @@ -820,9 +806,6 @@ .flex-1 { flex: 1; } - .flex-shrink { - flex-shrink: 1; - } .shrink-0 { flex-shrink: 0; } @@ -835,9 +818,6 @@ .caption-bottom { caption-side: bottom; } - .border-collapse { - border-collapse: collapse; - } .origin-center { transform-origin: center; } @@ -864,10 +844,6 @@ --tw-translate-x: 100%; translate: var(--tw-translate-x) var(--tw-translate-y); } - .-translate-y-1 { - --tw-translate-y: calc(var(--spacing) * -1); - translate: var(--tw-translate-x) var(--tw-translate-y); - } .-translate-y-1\/2 { --tw-translate-y: calc(calc(1/2 * 100%) * -1); translate: var(--tw-translate-x) var(--tw-translate-y); @@ -1024,9 +1000,6 @@ margin-block-end: calc(calc(var(--spacing) * 8) * calc(1 - var(--tw-space-y-reverse))); } } - .gap-x-1 { - column-gap: calc(var(--spacing) * 1); - } .gap-x-1\.5 { column-gap: calc(var(--spacing) * 1.5); } @@ -1172,9 +1145,6 @@ --tw-border-style: none; border-style: none; } - .border-black { - border-color: var(--color-black); - } .border-black\/10 { border-color: color-mix(in srgb, #000 10%, transparent); @supports (color: color-mix(in lab, red, red)) { @@ -1202,9 +1172,6 @@ .border-green-200 { border-color: var(--color-green-200); } - .border-primary { - border-color: var(--color-primary); - } .border-primary\/20 { border-color: var(--color-primary); @supports (color: color-mix(in lab, red, red)) { @@ -1241,9 +1208,6 @@ .border-zinc-300 { border-color: var(--color-zinc-300); } - .border-zinc-700 { - border-color: var(--color-zinc-700); - } .border-zinc-700\/50 { border-color: color-mix(in srgb, oklch(37% 0.013 285.805) 50%, transparent); @supports (color: color-mix(in lab, red, red)) { @@ -1316,9 +1280,6 @@ .bg-gray-500 { background-color: var(--color-gray-500); } - .bg-gray-950 { - background-color: var(--color-gray-950); - } .bg-gray-950\/5 { background-color: color-mix(in srgb, oklch(13% 0.028 261.692) 5%, transparent); @supports (color: color-mix(in lab, red, red)) { @@ -1533,10 +1494,6 @@ --tw-gradient-position: to right in oklab; background-image: linear-gradient(var(--tw-gradient-stops)); } - .from-blue-500 { - --tw-gradient-from: var(--color-blue-500); - --tw-gradient-stops: var(--tw-gradient-via-stops, var(--tw-gradient-position), var(--tw-gradient-from) var(--tw-gradient-from-position), var(--tw-gradient-to) var(--tw-gradient-to-position)); - } .from-blue-500\/30 { --tw-gradient-from: color-mix(in srgb, oklch(62.3% 0.214 259.815) 30%, transparent); @supports (color: color-mix(in lab, red, red)) { @@ -1607,9 +1564,6 @@ .px-8 { padding-inline: calc(var(--spacing) * 8); } - .py-0 { - padding-block: calc(var(--spacing) * 0); - } .py-0\.5 { padding-block: calc(var(--spacing) * 0.5); } @@ -1658,9 +1612,6 @@ .pr-20 { padding-right: calc(var(--spacing) * 20); } - .pb-1 { - padding-bottom: calc(var(--spacing) * 1); - } .pb-1\.5 { padding-bottom: calc(var(--spacing) * 1.5); } @@ -1785,6 +1736,9 @@ .text-amber-600 { color: var(--color-amber-600); } + .text-blue-400 { + color: var(--color-blue-400); + } .text-blue-500 { color: var(--color-blue-500); } @@ -1899,6 +1853,9 @@ .text-white { color: var(--color-white); } + .text-yellow-400 { + color: var(--color-yellow-400); + } .text-yellow-500 { color: var(--color-yellow-500); } @@ -1938,9 +1895,6 @@ .italic { font-style: italic; } - .underline { - text-decoration-line: underline; - } .opacity-0 { opacity: 0%; } @@ -1998,10 +1952,6 @@ --tw-inset-shadow: inset 0 2px 4px var(--tw-inset-shadow-color, oklab(from rgb(0 0 0 / 0.05) l a b / 25%)); box-shadow: var(--tw-inset-shadow), var(--tw-inset-ring-shadow), var(--tw-ring-offset-shadow), var(--tw-ring-shadow), var(--tw-shadow); } - .inset-shadow-sm { - --tw-inset-shadow: inset 0 2px 4px var(--tw-inset-shadow-color, rgb(0 0 0 / 0.05)); - box-shadow: var(--tw-inset-shadow), var(--tw-inset-ring-shadow), var(--tw-ring-offset-shadow), var(--tw-ring-shadow), var(--tw-shadow); - } .ring-black { --tw-ring-color: var(--color-black); } @@ -2023,10 +1973,6 @@ --tw-ring-color: color-mix(in oklab, var(--color-black) 15%, transparent); } } - .outline { - outline-style: var(--tw-outline-style); - outline-width: 1px; - } .blur { --tw-blur: blur(8px); filter: var(--tw-blur,) var(--tw-brightness,) var(--tw-contrast,) var(--tw-grayscale,) var(--tw-hue-rotate,) var(--tw-invert,) var(--tw-saturate,) var(--tw-sepia,) var(--tw-drop-shadow,); @@ -5116,11 +5062,6 @@ inherits: false; initial-value: 0 0 #0000; } -@property --tw-outline-style { - syntax: "*"; - inherits: false; - initial-value: solid; -} @property --tw-blur { syntax: "*"; inherits: false; @@ -5233,6 +5174,11 @@ inherits: false; initial-value: 1; } +@property --tw-outline-style { + syntax: "*"; + inherits: false; + initial-value: solid; +} @keyframes spin { to { transform: rotate(360deg); @@ -5290,7 +5236,6 @@ --tw-ring-offset-width: 0px; --tw-ring-offset-color: #fff; --tw-ring-offset-shadow: 0 0 #0000; - --tw-outline-style: solid; --tw-blur: initial; --tw-brightness: initial; --tw-contrast: initial; @@ -5318,6 +5263,7 @@ --tw-scale-x: 1; --tw-scale-y: 1; --tw-scale-z: 1; + --tw-outline-style: solid; } } } diff --git a/web/static/js/logs-OFCAHOEI.js b/web/static/js/logs-43KF5HY3.js similarity index 85% rename from web/static/js/logs-OFCAHOEI.js rename to web/static/js/logs-43KF5HY3.js index ad4bf39..1484c45 100644 --- a/web/static/js/logs-OFCAHOEI.js +++ b/web/static/js/logs-43KF5HY3.js @@ -843,6 +843,140 @@ var FilterPopover = class { } }; +// frontend/js/pages/logs/systemLog.js +var SystemLogTerminal = class { + constructor(container, controlsContainer) { + this.container = container; + this.controlsContainer = controlsContainer; + this.ws = null; + this.isPaused = false; + this.shouldAutoScroll = true; + this.reconnectAttempts = 0; + this.maxReconnectAttempts = 5; + this.elements = { + output: this.container.querySelector("#log-terminal-output"), + statusIndicator: this.controlsContainer.querySelector("#terminal-status-indicator"), + clearBtn: this.controlsContainer.querySelector('[data-action="clear-terminal"]'), + pauseBtn: this.controlsContainer.querySelector('[data-action="toggle-pause-terminal"]'), + scrollBtn: this.controlsContainer.querySelector('[data-action="toggle-scroll-terminal"]'), + disconnectBtn: this.controlsContainer.querySelector('[data-action="disconnect-terminal"]') + }; + this._initEventListeners(); + } + _initEventListeners() { + this.elements.clearBtn.addEventListener("click", () => this.clear()); + this.elements.pauseBtn.addEventListener("click", () => this.togglePause()); + this.elements.scrollBtn.addEventListener("click", () => this.toggleAutoScroll()); + this.elements.disconnectBtn.addEventListener("click", () => this.disconnect()); + } + connect() { + this.clear(); + this._appendMessage("info", "\u6B63\u5728\u8FDE\u63A5\u5230\u5B9E\u65F6\u65E5\u5FD7\u6D41..."); + this._updateStatus("connecting", "\u8FDE\u63A5\u4E2D..."); + const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; + const wsUrl = `${protocol}//${window.location.host}/ws/system-logs`; + this.ws = new WebSocket(wsUrl); + this.ws.onopen = () => { + this._appendMessage("info", "\u2713 \u5DF2\u8FDE\u63A5\u5230\u7CFB\u7EDF\u65E5\u5FD7\u6D41"); + this._updateStatus("connected", "\u5DF2\u8FDE\u63A5"); + this.reconnectAttempts = 0; + }; + this.ws.onmessage = (event) => { + if (this.isPaused) return; + try { + const data = JSON.parse(event.data); + const levelColors = { + "error": "text-red-500", + "warning": "text-yellow-400", + "info": "text-blue-400", + "debug": "text-zinc-400" + }; + const color = levelColors[data.level] || "text-zinc-200"; + const timestamp = new Date(data.timestamp).toLocaleTimeString(); + const msg = `[${timestamp}] [${data.level.toUpperCase()}] ${data.message}`; + this._appendMessage(color, msg); + } catch (e) { + this._appendMessage("text-zinc-200", event.data); + } + }; + this.ws.onerror = (error) => { + this._appendMessage("error", `\u2717 WebSocket \u9519\u8BEF`); + this._updateStatus("error", "\u8FDE\u63A5\u9519\u8BEF"); + }; + this.ws.onclose = () => { + this._appendMessage("error", "\u2717 \u8FDE\u63A5\u5DF2\u65AD\u5F00"); + this._updateStatus("disconnected", "\u672A\u8FDE\u63A5"); + if (this.reconnectAttempts < this.maxReconnectAttempts) { + this.reconnectAttempts++; + setTimeout(() => { + this._appendMessage("info", `\u5C1D\u8BD5\u91CD\u65B0\u8FDE\u63A5 (${this.reconnectAttempts}/${this.maxReconnectAttempts})...`); + this.connect(); + }, 3e3); + } + }; + } + disconnect() { + if (this.ws) { + this.ws.close(); + this.ws = null; + } + this.reconnectAttempts = this.maxReconnectAttempts; + this._updateStatus("disconnected", "\u672A\u8FDE\u63A5"); + } + clear() { + if (this.elements.output) { + this.elements.output.innerHTML = ""; + } + } + togglePause() { + this.isPaused = !this.isPaused; + const span = this.elements.pauseBtn.querySelector("span"); + const icon = this.elements.pauseBtn.querySelector("i"); + if (this.isPaused) { + span.textContent = "\u7EE7\u7EED"; + icon.classList.replace("fa-pause", "fa-play"); + } else { + span.textContent = "\u6682\u505C"; + icon.classList.replace("fa-play", "fa-pause"); + } + } + toggleAutoScroll() { + this.shouldAutoScroll = !this.shouldAutoScroll; + const span = this.elements.scrollBtn.querySelector("span"); + if (this.shouldAutoScroll) { + span.textContent = "\u81EA\u52A8\u6EDA\u52A8"; + } else { + span.textContent = "\u624B\u52A8\u6EDA\u52A8"; + } + } + _appendMessage(colorClass, text) { + if (!this.elements.output) return; + const p = document.createElement("p"); + p.className = colorClass; + p.textContent = text; + this.elements.output.appendChild(p); + if (this.shouldAutoScroll) { + this.elements.output.scrollTop = this.elements.output.scrollHeight; + } + } + _updateStatus(status, text) { + const indicator = this.elements.statusIndicator.querySelector("span.relative"); + const statusText = this.elements.statusIndicator.childNodes[2]; + const colors = { + "connecting": "bg-yellow-500", + "connected": "bg-green-500", + "disconnected": "bg-zinc-500", + "error": "bg-red-500" + }; + indicator.querySelectorAll("span").forEach((span) => { + span.className = span.className.replace(/bg-\w+-\d+/g, colors[status] || colors.disconnected); + }); + if (statusText) { + statusText.textContent = ` ${text}`; + } + } +}; + // frontend/js/pages/logs/index.js var dataStore = { groups: /* @__PURE__ */ new Map(), @@ -854,7 +988,6 @@ var LogsPage = class { logs: [], pagination: { page: 1, pages: 1, total: 0, page_size: 20 }, isLoading: true, - // [优化] 统一将所有可筛选字段在此处初始化,便于管理 filters: { page: 1, page_size: 20, @@ -864,36 +997,109 @@ var LogsPage = class { error_types: /* @__PURE__ */ new Set(), status_codes: /* @__PURE__ */ new Set() }, - selectedLogIds: /* @__PURE__ */ new Set() + selectedLogIds: /* @__PURE__ */ new Set(), + currentView: "error" }; this.elements = { - tableBody: document.getElementById("logs-table-body"), - selectedCount: document.querySelector(".flex-1.text-sm span.font-semibold:nth-child(1)"), - totalCount: document.querySelector(".flex-1.text-sm span:last-child"), - pageSizeSelect: document.querySelector('[data-component="custom-select-v2"] select'), - pageInfo: document.querySelector(".flex.w-\\[100px\\]"), - paginationBtns: document.querySelectorAll("[data-pagination-controls] button"), - selectAllCheckbox: document.querySelector('thead .table-head-cell input[type="checkbox"]'), - searchInput: document.getElementById("log-search-input"), - errorTypeFilterBtn: document.getElementById("filter-error-type-btn"), - errorCodeFilterBtn: document.getElementById("filter-error-code-btn") + tabsContainer: document.querySelector("[data-sliding-tabs-container]"), + contentContainer: document.getElementById("log-content-container"), + errorFilters: document.getElementById("error-logs-filters"), + systemControls: document.getElementById("system-logs-controls"), + errorTemplate: document.getElementById("error-logs-template"), + systemTemplate: document.getElementById("system-logs-template") }; - this.initialized = !!this.elements.tableBody; + this.initialized = !!this.elements.contentContainer; if (this.initialized) { - this.logList = new logList_default(this.elements.tableBody, dataStore); - const selectContainer = document.querySelector('[data-component="custom-select-v2"]'); - if (selectContainer) { - new CustomSelectV2(selectContainer); - } + this.logList = null; + this.systemLogTerminal = null; this.debouncedLoadAndRender = debounce(() => this.loadAndRenderLogs(), 300); } } async init() { if (!this.initialized) return; + this._initPermanentEventListeners(); + await this.loadGroupsOnce(); + this.state.currentView = null; + this.switchToView("error"); + } + _initPermanentEventListeners() { + this.elements.tabsContainer.addEventListener("click", (event) => { + const tabItem = event.target.closest("[data-tab-target]"); + if (!tabItem) return; + event.preventDefault(); + const viewName = tabItem.dataset.tabTarget; + if (viewName) { + this.switchToView(viewName); + } + }); + } + switchToView(viewName) { + if (this.state.currentView === viewName && this.elements.contentContainer.innerHTML !== "") return; + if (this.systemLogTerminal) { + this.systemLogTerminal.disconnect(); + this.systemLogTerminal = null; + } + this.state.currentView = viewName; + this.elements.contentContainer.innerHTML = ""; + if (viewName === "error") { + this.elements.errorFilters.classList.remove("hidden"); + this.elements.systemControls.classList.add("hidden"); + const template = this.elements.errorTemplate.content.cloneNode(true); + this.elements.contentContainer.appendChild(template); + requestAnimationFrame(() => { + this._initErrorLogView(); + }); + } else if (viewName === "system") { + this.elements.errorFilters.classList.add("hidden"); + this.elements.systemControls.classList.remove("hidden"); + const template = this.elements.systemTemplate.content.cloneNode(true); + this.elements.contentContainer.appendChild(template); + requestAnimationFrame(() => { + this._initSystemLogView(); + }); + } + } + _initErrorLogView() { + this.elements.tableBody = document.getElementById("logs-table-body"); + this.elements.selectedCount = document.querySelector(".flex-1.text-sm span.font-semibold:nth-child(1)"); + this.elements.totalCount = document.querySelector(".flex-1.text-sm span:last-child"); + this.elements.pageSizeSelect = document.querySelector('[data-component="custom-select-v2"] select'); + this.elements.pageInfo = document.querySelector(".flex.w-\\[100px\\]"); + this.elements.paginationBtns = document.querySelectorAll("[data-pagination-controls] button"); + this.elements.selectAllCheckbox = document.querySelector('thead .table-head-cell input[type="checkbox"]'); + this.elements.searchInput = document.getElementById("log-search-input"); + this.elements.errorTypeFilterBtn = document.getElementById("filter-error-type-btn"); + this.elements.errorCodeFilterBtn = document.getElementById("filter-error-code-btn"); + this.logList = new logList_default(this.elements.tableBody, dataStore); + const selectContainer = document.querySelector('[data-component="custom-select-v2"]'); + if (selectContainer) { + new CustomSelectV2(selectContainer); + } this.initFilterPopovers(); this.initEventListeners(); - await this.loadGroupsOnce(); - await this.loadAndRenderLogs(); + this.loadAndRenderLogs(); + } + _initSystemLogView() { + this.systemLogTerminal = new SystemLogTerminal( + this.elements.contentContainer, + this.elements.systemControls + ); + Swal.fire({ + title: "\u5B9E\u65F6\u7CFB\u7EDF\u65E5\u5FD7", + text: "\u60A8\u5373\u5C06\u8FDE\u63A5\u5230\u5B9E\u65F6\u65E5\u5FD7\u6D41\u3002\u8FD9\u4F1A\u4E0E\u670D\u52A1\u5668\u5EFA\u7ACB\u4E00\u4E2A\u6301\u7EED\u7684\u8FDE\u63A5\u3002", + icon: "info", + confirmButtonText: "\u6211\u660E\u767D\u4E86\uFF0C\u5F00\u59CB\u8FDE\u63A5", + showCancelButton: true, + cancelButtonText: "\u53D6\u6D88", + target: "#main-content-wrapper" + }).then((result) => { + if (result.isConfirmed) { + this.systemLogTerminal.connect(); + } else { + const errorLogTab = Array.from(this.elements.tabsContainer.querySelectorAll('[data-tab-target="error"]'))[0]; + if (errorLogTab) errorLogTab.click(); + } + }); } initFilterPopovers() { const errorTypeOptions = [ diff --git a/web/static/js/main.js b/web/static/js/main.js index 8269c35..db50da9 100644 --- a/web/static/js/main.js +++ b/web/static/js/main.js @@ -182,7 +182,7 @@ var pageModules = { // esbuild 看到这个 import() 语法,就会自动将 dashboard.js 及其依赖打包成一个独立的 chunk 文件 "dashboard": () => import("./dashboard-XFUWX3IN.js"), "keys": () => import("./keys-HRP4JR7B.js"), - "logs": () => import("./logs-OFCAHOEI.js") + "logs": () => import("./logs-43KF5HY3.js") // 'settings': () => import('./pages/settings.js'), // 未来启用 settings 页面 // 未来新增的页面,只需在这里添加一行映射,esbuild会自动处理 }; diff --git a/web/templates/logs.html b/web/templates/logs.html index d735e83..5599927 100644 --- a/web/templates/logs.html +++ b/web/templates/logs.html @@ -26,8 +26,8 @@
- 错误日志 - 系统日志 + 错误日志 + 系统日志 保留标签 保留标签
@@ -35,9 +35,9 @@
- + -
+
@@ -63,7 +63,6 @@
-
- -
- + + + + +
+ +
+
+ + + + + + {% endblock %} {% block modals %}