diff --git a/tgapi/api.go b/tgapi/api.go index 0004e6c..fd2647a 100644 --- a/tgapi/api.go +++ b/tgapi/api.go @@ -102,10 +102,14 @@ type ApiResponse[R any] struct { type TelegramRequest[R, P any] struct { method string params P + chatId int64 } func NewRequest[R, P any](method string, params P) TelegramRequest[R, P] { - return TelegramRequest[R, P]{method: method, params: params} + return TelegramRequest[R, P]{method, params, 0} +} +func NewRequestWithChatID[R, P any](method string, params P, chatId int64) TelegramRequest[R, P] { + return TelegramRequest[R, P]{method, params, chatId} } func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, error) { var zero R @@ -165,7 +169,12 @@ func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, erro if responseData.Parameters.RetryAfter != nil { after = *responseData.Parameters.RetryAfter } - api.Limiter.SetGlobalLock(after) + if r.chatId > 0 { + api.Limiter.SetChatLock(r.chatId, after) + } else { + api.Limiter.SetGlobalLock(after) + } + time.Sleep(time.Duration(after) * time.Second) return r.doRequest(ctx, api) } return zero, ErrRateLimit diff --git a/tgapi/messages_methods.go b/tgapi/messages_methods.go index 2ef6bc2..2e8d0bb 100644 --- a/tgapi/messages_methods.go +++ b/tgapi/messages_methods.go @@ -21,7 +21,7 @@ type SendMessageP struct { } func (api *API) SendMessage(params SendMessageP) (Message, error) { - req := NewRequest[Message, SendMessageP]("sendMessage", params) + req := NewRequestWithChatID[Message, SendMessageP]("sendMessage", params, params.ChatID) return req.Do(api) } @@ -275,7 +275,7 @@ type SendMessageDraftP struct { } func (api *API) SendMessageDraft(params SendMessageDraftP) (bool, error) { - req := NewRequest[bool]("sendMessageDraft", params) + req := NewRequestWithChatID[bool]("sendMessageDraft", params, params.ChatID) return req.Do(api) } diff --git a/utils/limiter.go b/utils/limiter.go index 23ed999..50cf27a 100644 --- a/utils/limiter.go +++ b/utils/limiter.go @@ -20,9 +20,9 @@ type RateLimiter struct { func NewRateLimiter() *RateLimiter { return &RateLimiter{ - // 30 запросов в секунду (burst=30) - globalLimiter: rate.NewLimiter(rate.Limit(30), 30), + globalLimiter: rate.NewLimiter(30, 30), chatLimiters: make(map[int64]*rate.Limiter), + chatLocks: make(map[int64]time.Time), // инициализация! } } @@ -34,96 +34,122 @@ func (rl *RateLimiter) SetGlobalLock(retryAfter int) { defer rl.globalMu.Unlock() rl.globalLockUntil = time.Now().Add(time.Duration(retryAfter) * time.Second) } + func (rl *RateLimiter) SetChatLock(chatID int64, retryAfter int) { + if retryAfter <= 0 { + return + } rl.chatMu.Lock() defer rl.chatMu.Unlock() rl.chatLocks[chatID] = time.Now().Add(time.Duration(retryAfter) * time.Second) } +// GlobalWait блокирует до возможности сделать глобальный запрос. func (rl *RateLimiter) GlobalWait(ctx context.Context) error { - rl.globalMu.RLock() - until := rl.globalLockUntil - rl.globalMu.RUnlock() - - if !until.IsZero() { - if time.Now().Before(until) { - // Ждём до окончания блокировки или отмены контекста - select { - case <-time.After(time.Until(until)): - // блокировка снята - case <-ctx.Done(): - return ctx.Err() - } - } - } - // Теперь ждём разрешения rate limiter'а - return rl.globalLimiter.Wait(ctx) -} -func (rl *RateLimiter) Wait(ctx context.Context, chatID int64) error { - rl.chatMu.Lock() - until, ok := rl.chatLocks[chatID] - rl.chatMu.Unlock() - if ok && !until.IsZero() { - if time.Now().Before(until) { - select { - case <-time.After(time.Until(until)): - // блокировка снята - case <-ctx.Done(): - return ctx.Err() - } - } - } - - if err := rl.GlobalWait(ctx); err != nil { + // Ждём окончания глобальной блокировки, если она есть + if err := rl.waitForGlobalUnlock(ctx); err != nil { return err } - rl.chatMu.Lock() - chatLimiter, ok := rl.chatLimiters[chatID] - if !ok { - chatLimiter = rate.NewLimiter(rate.Limit(1), 1) - rl.chatLimiters[chatID] = chatLimiter - } - rl.chatMu.Unlock() - return chatLimiter.Wait(ctx) + // Ждём разрешения rate limiter'а + return rl.globalLimiter.Wait(ctx) } +// Wait блокирует до возможности сделать запрос для конкретного чата. +func (rl *RateLimiter) Wait(ctx context.Context, chatID int64) error { + // Ждём окончания блокировки чата + if err := rl.waitForChatUnlock(ctx, chatID); err != nil { + return err + } + // Затем глобальной блокировки + if err := rl.waitForGlobalUnlock(ctx); err != nil { + return err + } + // Получаем или создаём лимитер для чата + limiter := rl.getChatLimiter(chatID) + return limiter.Wait(ctx) +} + +// GlobalAllow неблокирующая проверка глобального запроса. func (rl *RateLimiter) GlobalAllow() bool { rl.globalMu.RLock() until := rl.globalLockUntil rl.globalMu.RUnlock() - if !until.IsZero() { - if time.Now().Before(until) { - // Ждём до окончания блокировки или отмены контекста - select { - case <-time.After(time.Until(until)): - rl.globalLimiter.Allow() - } - } + if !until.IsZero() && time.Now().Before(until) { + return false } return rl.globalLimiter.Allow() } + +// Allow неблокирующая проверка запроса для чата. func (rl *RateLimiter) Allow(chatID int64) bool { - rl.chatMu.Lock() - until, ok := rl.chatLocks[chatID] - rl.chatMu.Unlock() - if ok && !until.IsZero() { - if time.Now().Before(until) { - select { - case <-time.After(time.Until(until)): - } - } + // Проверяем глобальную блокировку + rl.globalMu.RLock() + globalUntil := rl.globalLockUntil + rl.globalMu.RUnlock() + if !globalUntil.IsZero() && time.Now().Before(globalUntil) { + return false } + // Проверяем блокировку чата + rl.chatMu.Lock() + chatUntil, ok := rl.chatLocks[chatID] + rl.chatMu.Unlock() + if ok && !chatUntil.IsZero() && time.Now().Before(chatUntil) { + return false + } + + // Проверяем глобальный лимитер if !rl.globalLimiter.Allow() { return false } - rl.chatMu.Lock() - chatLimiter, ok := rl.chatLimiters[chatID] - if !ok { - chatLimiter = rate.NewLimiter(rate.Limit(1), 1) - rl.chatLimiters[chatID] = chatLimiter - } - rl.chatMu.Unlock() - return chatLimiter.Allow() + + // Проверяем лимитер чата + limiter := rl.getChatLimiter(chatID) + return limiter.Allow() +} + +// Внутренние вспомогательные методы + +func (rl *RateLimiter) waitForGlobalUnlock(ctx context.Context) error { + rl.globalMu.RLock() + until := rl.globalLockUntil + rl.globalMu.RUnlock() + + if until.IsZero() || time.Now().After(until) { + return nil + } + select { + case <-time.After(time.Until(until)): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (rl *RateLimiter) waitForChatUnlock(ctx context.Context, chatID int64) error { + rl.chatMu.Lock() + until, ok := rl.chatLocks[chatID] + rl.chatMu.Unlock() + + if !ok || until.IsZero() || time.Now().After(until) { + return nil + } + select { + case <-time.After(time.Until(until)): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (rl *RateLimiter) getChatLimiter(chatID int64) *rate.Limiter { + rl.chatMu.Lock() + defer rl.chatMu.Unlock() + if lim, ok := rl.chatLimiters[chatID]; ok { + return lim + } + lim := rate.NewLimiter(1, 1) // 1 запрос/сек + rl.chatLimiters[chatID] = lim + return lim } diff --git a/utils/version.go b/utils/version.go index bf1a8b0..ab7dbf0 100644 --- a/utils/version.go +++ b/utils/version.go @@ -1,9 +1,9 @@ package utils const ( - VersionString = "1.0.0-beta.7" + VersionString = "1.0.0-beta.8" VersionMajor = 1 VersionMinor = 0 VersionPatch = 0 - Beta = 7 + Beta = 8 )