Compare commits

...

4 Commits

Author SHA1 Message Date
6cf3355a36 v1.0.0 beta 8; ratelimt war 2026-03-02 17:06:13 +03:00
fa7a296a66 v1.0.0 beta 7; ratelimt war 2026-03-02 16:49:00 +03:00
7101aba548 v1.0.0 beta 6 2026-03-02 00:08:26 +03:00
2de46a27c8 v1.0.0 beta 5 2026-03-01 23:40:27 +03:00
13 changed files with 274 additions and 43 deletions

6
bot.go
View File

@@ -11,9 +11,9 @@ import (
"git.nix13.pw/scuroneko/extypes"
"git.nix13.pw/scuroneko/laniakea/tgapi"
"git.nix13.pw/scuroneko/laniakea/utils"
"git.nix13.pw/scuroneko/slog"
"github.com/alitto/pond/v2"
"golang.org/x/time/rate"
)
type BotOpts struct {
@@ -99,9 +99,9 @@ type Bot[T DbContext] struct {
func NewBot[T any](opts *BotOpts) *Bot[T] {
updateQueue := make(chan *tgapi.Update, 512)
var limiter *rate.Limiter
var limiter *utils.RateLimiter
if opts.RateLimit > 0 {
limiter = rate.NewLimiter(rate.Limit(opts.RateLimit), opts.RateLimit)
limiter = utils.NewRateLimiter()
}
apiOpts := tgapi.NewAPIOpts(opts.Token).SetAPIUrl(opts.APIUrl).UseTestServer(opts.UseTestServer).SetLimiter(limiter)

View File

@@ -32,7 +32,7 @@ func (g *LinearDraftIdGenerator) Next() uint64 {
type DraftProvider struct {
api *tgapi.API
chatID int
chatID int64
messageThreadID int
parseMode tgapi.ParseMode
entities []tgapi.MessageEntity
@@ -43,7 +43,7 @@ type DraftProvider struct {
type Draft struct {
api *tgapi.API
chatID int
chatID int64
messageThreadID int
parseMode tgapi.ParseMode
entities []tgapi.MessageEntity
@@ -87,3 +87,20 @@ func (d *Draft) Push(newText string) error {
_, err := d.api.SendMessageDraft(params)
return err
}
func (d *Draft) Flush() error {
if d.Message == "" {
return nil
}
params := tgapi.SendMessageP{
ChatID: d.chatID,
ParseMode: d.parseMode,
Entities: d.entities,
Text: d.Message,
}
if d.messageThreadID > 0 {
params.MessageThreadID = d.messageThreadID
}
_, err := d.api.SendMessage(params)
return err
}

View File

@@ -1,6 +1,7 @@
package laniakea
import (
"context"
"fmt"
"git.nix13.pw/scuroneko/laniakea/tgapi"
@@ -110,7 +111,15 @@ func (ctx *MsgContext) answer(text string, keyboard *InlineKeyboard) *AnswerMess
if ctx.Msg.MessageThreadID > 0 {
params.MessageThreadID = ctx.Msg.MessageThreadID
}
if ctx.Msg.DirectMessageTopic != nil {
params.DirectMessagesTopicID = ctx.Msg.DirectMessageTopic.TopicID
}
cont := context.Background()
if err := ctx.Api.Limiter.Wait(cont, ctx.Msg.Chat.ID); err != nil {
ctx.botLogger.Errorln(err)
return nil
}
msg, err := ctx.Api.SendMessage(params)
if err != nil {
ctx.botLogger.Errorln(err)
@@ -217,6 +226,12 @@ func (ctx *MsgContext) error(err error) {
func (ctx *MsgContext) Error(err error) { ctx.error(err) }
func (ctx *MsgContext) NewDraft() *Draft {
c := context.Background()
if err := ctx.Api.Limiter.Wait(c, ctx.Msg.Chat.ID); err != nil {
ctx.botLogger.Errorln(err)
return nil
}
draft := ctx.draftProvider.NewDraft()
draft.chatID = ctx.Msg.Chat.ID
draft.messageThreadID = ctx.Msg.MessageThreadID

View File

@@ -12,7 +12,6 @@ import (
"git.nix13.pw/scuroneko/laniakea/utils"
"git.nix13.pw/scuroneko/slog"
"golang.org/x/time/rate"
)
type APIOpts struct {
@@ -21,7 +20,7 @@ type APIOpts struct {
useTestServer bool
apiUrl string
limiter *rate.Limiter
limiter *utils.RateLimiter
dropOverflowLimit bool
}
@@ -46,7 +45,7 @@ func (opts *APIOpts) SetAPIUrl(apiUrl string) *APIOpts {
}
return opts
}
func (opts *APIOpts) SetLimiter(limiter *rate.Limiter) *APIOpts {
func (opts *APIOpts) SetLimiter(limiter *utils.RateLimiter) *APIOpts {
opts.limiter = limiter
return opts
}
@@ -63,7 +62,7 @@ type API struct {
apiUrl string
pool *WorkerPool
limiter *rate.Limiter
Limiter *utils.RateLimiter
dropOverflowLimit bool
}
@@ -88,29 +87,39 @@ func (api *API) CloseApi() error {
}
func (api *API) GetLogger() *slog.Logger { return api.logger }
type ResponseParameters struct {
MigrateToChatID *int64 `json:"migrate_to_chat_id,omitempty"`
RetryAfter *int `json:"retry_after,omitempty"`
}
type ApiResponse[R any] struct {
Ok bool `json:"ok"`
Description string `json:"description,omitempty"`
Result R `json:"result,omitempty"`
ErrorCode int `json:"error_code,omitempty"`
Parameters *ResponseParameters `json:"parameters,omitempty"`
}
type TelegramRequest[R, P any] struct {
method string
params P
chatId int64
}
func NewRequest[R, P any](method string, params P) TelegramRequest[R, P] {
return TelegramRequest[R, P]{method: method, params: params}
return TelegramRequest[R, P]{method, params, 0}
}
func NewRequestWithChatID[R, P any](method string, params P, chatId int64) TelegramRequest[R, P] {
return TelegramRequest[R, P]{method, params, chatId}
}
func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, error) {
var zero R
if api.limiter != nil {
if api.Limiter != nil {
if api.dropOverflowLimit {
if !api.limiter.Allow() {
if !api.Limiter.GlobalAllow() {
return zero, errors.New("rate limited")
}
} else {
if err := api.limiter.Wait(ctx); err != nil {
if err := api.Limiter.GlobalWait(ctx); err != nil {
return zero, err
}
}
@@ -149,10 +158,28 @@ func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, erro
return zero, err
}
api.logger.Debugln("RES", r.method, string(data))
if res.StatusCode != http.StatusOK {
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusTooManyRequests {
return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(data))
}
return parseBody[R](data)
responseData, err := parseBody[R](data)
if errors.Is(err, ErrRateLimit) {
if responseData.Parameters != nil {
after := 0
if responseData.Parameters.RetryAfter != nil {
after = *responseData.Parameters.RetryAfter
}
if r.chatId > 0 {
api.Limiter.SetChatLock(r.chatId, after)
} else {
api.Limiter.SetGlobalLock(after)
}
time.Sleep(time.Duration(after) * time.Second)
return r.doRequest(ctx, api)
}
return zero, ErrRateLimit
}
return responseData.Result, err
}
func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) {
var zero R
@@ -184,15 +211,17 @@ func readBody(body io.ReadCloser) ([]byte, error) {
reader := io.LimitReader(body, 10<<20)
return io.ReadAll(reader)
}
func parseBody[R any](data []byte) (R, error) {
var zero R
func parseBody[R any](data []byte) (ApiResponse[R], error) {
var resp ApiResponse[R]
err := json.Unmarshal(data, &resp)
if err != nil {
return zero, err
return resp, err
}
if !resp.Ok {
return zero, fmt.Errorf("[%d] %s", resp.ErrorCode, resp.Description)
if resp.ErrorCode == 429 {
return resp, ErrRateLimit
}
return resp.Result, nil
return resp, fmt.Errorf("[%d] %s", resp.ErrorCode, resp.Description)
}
return resp, nil
}

View File

@@ -2,7 +2,7 @@ package tgapi
type SendPhotoP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"`
ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"`

View File

@@ -1,7 +1,7 @@
package tgapi
type Chat struct {
ID int `json:"id"`
ID int64 `json:"id"`
Type string `json:"type"`
Title *string `json:"title,omitempty"`
Username *string `json:"username,omitempty"`

5
tgapi/errors.go Normal file
View File

@@ -0,0 +1,5 @@
package tgapi
import "errors"
var ErrRateLimit = errors.New("rate limit exceeded")

View File

@@ -2,9 +2,9 @@ package tgapi
type SendMessageP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"`
ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"`
DirectMessagesTopicID int64 `json:"direct_messages_topic_id,omitempty"`
Text string `json:"text"`
ParseMode ParseMode `json:"parse_mode,omitempty"`
@@ -21,7 +21,7 @@ type SendMessageP struct {
}
func (api *API) SendMessage(params SendMessageP) (Message, error) {
req := NewRequest[Message, SendMessageP]("sendMessage", params)
req := NewRequestWithChatID[Message, SendMessageP]("sendMessage", params, params.ChatID)
return req.Do(api)
}
@@ -266,7 +266,7 @@ func (api *API) SendDice(params SendDiceP) (Message, error) {
}
type SendMessageDraftP struct {
ChatID int `json:"chat_id"`
ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"`
DraftID uint64 `json:"draft_id"`
Text string `json:"text"`
@@ -275,13 +275,13 @@ type SendMessageDraftP struct {
}
func (api *API) SendMessageDraft(params SendMessageDraftP) (bool, error) {
req := NewRequest[bool]("sendMessageDraft", params)
req := NewRequestWithChatID[bool]("sendMessageDraft", params, params.ChatID)
return req.Do(api)
}
type SendChatActionP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"`
ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"`
Action ChatActionType `json:"action"`
}
@@ -307,7 +307,7 @@ func (api *API) SetMessageReaction(params SetMessageReactionP) (bool, error) {
type EditMessageTextP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id,omitempty"`
ChatID int64 `json:"chat_id,omitempty"`
MessageID int `json:"message_id,omitempty"`
InlineMessageID string `json:"inline_message_id,omitempty"`
Text string `json:"text"`
@@ -331,7 +331,7 @@ func (api *API) EditMessageText(params EditMessageTextP) (Message, bool, error)
type EditMessageCaptionP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id,omitempty"`
ChatID int64 `json:"chat_id,omitempty"`
MessageID int `json:"message_id,omitempty"`
InlineMessageID string `json:"inline_message_id,omitempty"`
Caption string `json:"caption"`
@@ -495,7 +495,7 @@ func (api *API) DeclineSuggestedPost(params DeclineSuggestedPostP) (bool, error)
}
type DeleteMessageP struct {
ChatID int `json:"chat_id"`
ChatID int64 `json:"chat_id"`
MessageID int `json:"message_id"`
}

View File

@@ -6,9 +6,15 @@ type MessageReplyMarkup struct {
InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"`
}
type DirectMessageTopic struct {
TopicID int64 `json:"topic_id"`
User *User `json:"user,omitempty"`
}
type Message struct {
MessageID int `json:"message_id"`
MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessageTopic *DirectMessageTopic `json:"direct_message_topic,omitempty"`
BusinessConnectionId string `json:"business_connection_id,omitempty"`
From *User `json:"from,omitempty"`

View File

@@ -66,13 +66,13 @@ func NewUploaderRequest[R, P any](method string, params P, files ...UploaderFile
}
func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R, error) {
var zero R
if up.api.limiter != nil {
if up.api.Limiter != nil {
if up.api.dropOverflowLimit {
if !up.api.limiter.Allow() {
if !up.api.Limiter.GlobalAllow() {
return zero, errors.New("rate limited")
}
} else {
if err := up.api.limiter.Wait(ctx); err != nil {
if err := up.api.Limiter.GlobalWait(ctx); err != nil {
return zero, err
}
}
@@ -109,7 +109,11 @@ func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R,
return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(body))
}
return parseBody[R](body)
respBody, err := parseBody[R](body)
if err != nil {
return zero, err
}
return respBody.Result, nil
}
func (r UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) {
var zero R

View File

@@ -2,7 +2,7 @@ package tgapi
type UploadPhotoP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"`
ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"`

155
utils/limiter.go Normal file
View File

@@ -0,0 +1,155 @@
package utils
import (
"context"
"sync"
"time"
"golang.org/x/time/rate"
)
type RateLimiter struct {
globalLockUntil time.Time
globalLimiter *rate.Limiter
globalMu sync.RWMutex
chatLocks map[int64]time.Time
chatLimiters map[int64]*rate.Limiter
chatMu sync.Mutex
}
func NewRateLimiter() *RateLimiter {
return &RateLimiter{
globalLimiter: rate.NewLimiter(30, 30),
chatLimiters: make(map[int64]*rate.Limiter),
chatLocks: make(map[int64]time.Time), // инициализация!
}
}
func (rl *RateLimiter) SetGlobalLock(retryAfter int) {
if retryAfter <= 0 {
return
}
rl.globalMu.Lock()
defer rl.globalMu.Unlock()
rl.globalLockUntil = time.Now().Add(time.Duration(retryAfter) * time.Second)
}
func (rl *RateLimiter) SetChatLock(chatID int64, retryAfter int) {
if retryAfter <= 0 {
return
}
rl.chatMu.Lock()
defer rl.chatMu.Unlock()
rl.chatLocks[chatID] = time.Now().Add(time.Duration(retryAfter) * time.Second)
}
// GlobalWait блокирует до возможности сделать глобальный запрос.
func (rl *RateLimiter) GlobalWait(ctx context.Context) error {
// Ждём окончания глобальной блокировки, если она есть
if err := rl.waitForGlobalUnlock(ctx); err != nil {
return err
}
// Ждём разрешения rate limiter'а
return rl.globalLimiter.Wait(ctx)
}
// Wait блокирует до возможности сделать запрос для конкретного чата.
func (rl *RateLimiter) Wait(ctx context.Context, chatID int64) error {
// Ждём окончания блокировки чата
if err := rl.waitForChatUnlock(ctx, chatID); err != nil {
return err
}
// Затем глобальной блокировки
if err := rl.waitForGlobalUnlock(ctx); err != nil {
return err
}
// Получаем или создаём лимитер для чата
limiter := rl.getChatLimiter(chatID)
return limiter.Wait(ctx)
}
// GlobalAllow неблокирующая проверка глобального запроса.
func (rl *RateLimiter) GlobalAllow() bool {
rl.globalMu.RLock()
until := rl.globalLockUntil
rl.globalMu.RUnlock()
if !until.IsZero() && time.Now().Before(until) {
return false
}
return rl.globalLimiter.Allow()
}
// Allow неблокирующая проверка запроса для чата.
func (rl *RateLimiter) Allow(chatID int64) bool {
// Проверяем глобальную блокировку
rl.globalMu.RLock()
globalUntil := rl.globalLockUntil
rl.globalMu.RUnlock()
if !globalUntil.IsZero() && time.Now().Before(globalUntil) {
return false
}
// Проверяем блокировку чата
rl.chatMu.Lock()
chatUntil, ok := rl.chatLocks[chatID]
rl.chatMu.Unlock()
if ok && !chatUntil.IsZero() && time.Now().Before(chatUntil) {
return false
}
// Проверяем глобальный лимитер
if !rl.globalLimiter.Allow() {
return false
}
// Проверяем лимитер чата
limiter := rl.getChatLimiter(chatID)
return limiter.Allow()
}
// Внутренние вспомогательные методы
func (rl *RateLimiter) waitForGlobalUnlock(ctx context.Context) error {
rl.globalMu.RLock()
until := rl.globalLockUntil
rl.globalMu.RUnlock()
if until.IsZero() || time.Now().After(until) {
return nil
}
select {
case <-time.After(time.Until(until)):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rl *RateLimiter) waitForChatUnlock(ctx context.Context, chatID int64) error {
rl.chatMu.Lock()
until, ok := rl.chatLocks[chatID]
rl.chatMu.Unlock()
if !ok || until.IsZero() || time.Now().After(until) {
return nil
}
select {
case <-time.After(time.Until(until)):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rl *RateLimiter) getChatLimiter(chatID int64) *rate.Limiter {
rl.chatMu.Lock()
defer rl.chatMu.Unlock()
if lim, ok := rl.chatLimiters[chatID]; ok {
return lim
}
lim := rate.NewLimiter(1, 1) // 1 запрос/сек
rl.chatLimiters[chatID] = lim
return lim
}

View File

@@ -1,9 +1,9 @@
package utils
const (
VersionString = "1.0.0-beta.4"
VersionString = "1.0.0-beta.8"
VersionMajor = 1
VersionMinor = 0
VersionPatch = 0
Beta = 4
Beta = 8
)