Compare commits

..

3 Commits

Author SHA1 Message Date
fa7a296a66 v1.0.0 beta 7; ratelimt war 2026-03-02 16:49:00 +03:00
7101aba548 v1.0.0 beta 6 2026-03-02 00:08:26 +03:00
2de46a27c8 v1.0.0 beta 5 2026-03-01 23:40:27 +03:00
13 changed files with 236 additions and 40 deletions

6
bot.go
View File

@@ -11,9 +11,9 @@ import (
"git.nix13.pw/scuroneko/extypes" "git.nix13.pw/scuroneko/extypes"
"git.nix13.pw/scuroneko/laniakea/tgapi" "git.nix13.pw/scuroneko/laniakea/tgapi"
"git.nix13.pw/scuroneko/laniakea/utils"
"git.nix13.pw/scuroneko/slog" "git.nix13.pw/scuroneko/slog"
"github.com/alitto/pond/v2" "github.com/alitto/pond/v2"
"golang.org/x/time/rate"
) )
type BotOpts struct { type BotOpts struct {
@@ -99,9 +99,9 @@ type Bot[T DbContext] struct {
func NewBot[T any](opts *BotOpts) *Bot[T] { func NewBot[T any](opts *BotOpts) *Bot[T] {
updateQueue := make(chan *tgapi.Update, 512) updateQueue := make(chan *tgapi.Update, 512)
var limiter *rate.Limiter var limiter *utils.RateLimiter
if opts.RateLimit > 0 { if opts.RateLimit > 0 {
limiter = rate.NewLimiter(rate.Limit(opts.RateLimit), opts.RateLimit) limiter = utils.NewRateLimiter()
} }
apiOpts := tgapi.NewAPIOpts(opts.Token).SetAPIUrl(opts.APIUrl).UseTestServer(opts.UseTestServer).SetLimiter(limiter) apiOpts := tgapi.NewAPIOpts(opts.Token).SetAPIUrl(opts.APIUrl).UseTestServer(opts.UseTestServer).SetLimiter(limiter)

View File

@@ -32,7 +32,7 @@ func (g *LinearDraftIdGenerator) Next() uint64 {
type DraftProvider struct { type DraftProvider struct {
api *tgapi.API api *tgapi.API
chatID int chatID int64
messageThreadID int messageThreadID int
parseMode tgapi.ParseMode parseMode tgapi.ParseMode
entities []tgapi.MessageEntity entities []tgapi.MessageEntity
@@ -43,7 +43,7 @@ type DraftProvider struct {
type Draft struct { type Draft struct {
api *tgapi.API api *tgapi.API
chatID int chatID int64
messageThreadID int messageThreadID int
parseMode tgapi.ParseMode parseMode tgapi.ParseMode
entities []tgapi.MessageEntity entities []tgapi.MessageEntity
@@ -87,3 +87,20 @@ func (d *Draft) Push(newText string) error {
_, err := d.api.SendMessageDraft(params) _, err := d.api.SendMessageDraft(params)
return err return err
} }
func (d *Draft) Flush() error {
if d.Message == "" {
return nil
}
params := tgapi.SendMessageP{
ChatID: d.chatID,
ParseMode: d.parseMode,
Entities: d.entities,
Text: d.Message,
}
if d.messageThreadID > 0 {
params.MessageThreadID = d.messageThreadID
}
_, err := d.api.SendMessage(params)
return err
}

View File

@@ -1,6 +1,7 @@
package laniakea package laniakea
import ( import (
"context"
"fmt" "fmt"
"git.nix13.pw/scuroneko/laniakea/tgapi" "git.nix13.pw/scuroneko/laniakea/tgapi"
@@ -110,7 +111,15 @@ func (ctx *MsgContext) answer(text string, keyboard *InlineKeyboard) *AnswerMess
if ctx.Msg.MessageThreadID > 0 { if ctx.Msg.MessageThreadID > 0 {
params.MessageThreadID = ctx.Msg.MessageThreadID params.MessageThreadID = ctx.Msg.MessageThreadID
} }
if ctx.Msg.DirectMessageTopic != nil {
params.DirectMessagesTopicID = ctx.Msg.DirectMessageTopic.TopicID
}
cont := context.Background()
if err := ctx.Api.Limiter.Wait(cont, ctx.Msg.Chat.ID); err != nil {
ctx.botLogger.Errorln(err)
return nil
}
msg, err := ctx.Api.SendMessage(params) msg, err := ctx.Api.SendMessage(params)
if err != nil { if err != nil {
ctx.botLogger.Errorln(err) ctx.botLogger.Errorln(err)
@@ -217,6 +226,12 @@ func (ctx *MsgContext) error(err error) {
func (ctx *MsgContext) Error(err error) { ctx.error(err) } func (ctx *MsgContext) Error(err error) { ctx.error(err) }
func (ctx *MsgContext) NewDraft() *Draft { func (ctx *MsgContext) NewDraft() *Draft {
c := context.Background()
if err := ctx.Api.Limiter.Wait(c, ctx.Msg.Chat.ID); err != nil {
ctx.botLogger.Errorln(err)
return nil
}
draft := ctx.draftProvider.NewDraft() draft := ctx.draftProvider.NewDraft()
draft.chatID = ctx.Msg.Chat.ID draft.chatID = ctx.Msg.Chat.ID
draft.messageThreadID = ctx.Msg.MessageThreadID draft.messageThreadID = ctx.Msg.MessageThreadID

View File

@@ -12,7 +12,6 @@ import (
"git.nix13.pw/scuroneko/laniakea/utils" "git.nix13.pw/scuroneko/laniakea/utils"
"git.nix13.pw/scuroneko/slog" "git.nix13.pw/scuroneko/slog"
"golang.org/x/time/rate"
) )
type APIOpts struct { type APIOpts struct {
@@ -21,7 +20,7 @@ type APIOpts struct {
useTestServer bool useTestServer bool
apiUrl string apiUrl string
limiter *rate.Limiter limiter *utils.RateLimiter
dropOverflowLimit bool dropOverflowLimit bool
} }
@@ -46,7 +45,7 @@ func (opts *APIOpts) SetAPIUrl(apiUrl string) *APIOpts {
} }
return opts return opts
} }
func (opts *APIOpts) SetLimiter(limiter *rate.Limiter) *APIOpts { func (opts *APIOpts) SetLimiter(limiter *utils.RateLimiter) *APIOpts {
opts.limiter = limiter opts.limiter = limiter
return opts return opts
} }
@@ -63,7 +62,7 @@ type API struct {
apiUrl string apiUrl string
pool *WorkerPool pool *WorkerPool
limiter *rate.Limiter Limiter *utils.RateLimiter
dropOverflowLimit bool dropOverflowLimit bool
} }
@@ -88,11 +87,17 @@ func (api *API) CloseApi() error {
} }
func (api *API) GetLogger() *slog.Logger { return api.logger } func (api *API) GetLogger() *slog.Logger { return api.logger }
type ResponseParameters struct {
MigrateToChatID *int64 `json:"migrate_to_chat_id,omitempty"`
RetryAfter *int `json:"retry_after,omitempty"`
}
type ApiResponse[R any] struct { type ApiResponse[R any] struct {
Ok bool `json:"ok"` Ok bool `json:"ok"`
Description string `json:"description,omitempty"` Description string `json:"description,omitempty"`
Result R `json:"result,omitempty"` Result R `json:"result,omitempty"`
ErrorCode int `json:"error_code,omitempty"` ErrorCode int `json:"error_code,omitempty"`
Parameters *ResponseParameters `json:"parameters,omitempty"`
} }
type TelegramRequest[R, P any] struct { type TelegramRequest[R, P any] struct {
method string method string
@@ -104,13 +109,13 @@ func NewRequest[R, P any](method string, params P) TelegramRequest[R, P] {
} }
func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, error) { func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, error) {
var zero R var zero R
if api.limiter != nil { if api.Limiter != nil {
if api.dropOverflowLimit { if api.dropOverflowLimit {
if !api.limiter.Allow() { if !api.Limiter.GlobalAllow() {
return zero, errors.New("rate limited") return zero, errors.New("rate limited")
} }
} else { } else {
if err := api.limiter.Wait(ctx); err != nil { if err := api.Limiter.GlobalWait(ctx); err != nil {
return zero, err return zero, err
} }
} }
@@ -149,10 +154,23 @@ func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, erro
return zero, err return zero, err
} }
api.logger.Debugln("RES", r.method, string(data)) api.logger.Debugln("RES", r.method, string(data))
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusTooManyRequests {
return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(data)) return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(data))
} }
return parseBody[R](data)
responseData, err := parseBody[R](data)
if errors.Is(err, ErrRateLimit) {
if responseData.Parameters != nil {
after := 0
if responseData.Parameters.RetryAfter != nil {
after = *responseData.Parameters.RetryAfter
}
api.Limiter.SetGlobalLock(after)
return r.doRequest(ctx, api)
}
return zero, ErrRateLimit
}
return responseData.Result, err
} }
func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) { func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) {
var zero R var zero R
@@ -184,15 +202,17 @@ func readBody(body io.ReadCloser) ([]byte, error) {
reader := io.LimitReader(body, 10<<20) reader := io.LimitReader(body, 10<<20)
return io.ReadAll(reader) return io.ReadAll(reader)
} }
func parseBody[R any](data []byte) (R, error) { func parseBody[R any](data []byte) (ApiResponse[R], error) {
var zero R
var resp ApiResponse[R] var resp ApiResponse[R]
err := json.Unmarshal(data, &resp) err := json.Unmarshal(data, &resp)
if err != nil { if err != nil {
return zero, err return resp, err
} }
if !resp.Ok { if !resp.Ok {
return zero, fmt.Errorf("[%d] %s", resp.ErrorCode, resp.Description) if resp.ErrorCode == 429 {
return resp, ErrRateLimit
} }
return resp.Result, nil return resp, fmt.Errorf("[%d] %s", resp.ErrorCode, resp.Description)
}
return resp, nil
} }

View File

@@ -2,7 +2,7 @@ package tgapi
type SendPhotoP struct { type SendPhotoP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"` BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"` ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"` MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"`

View File

@@ -1,7 +1,7 @@
package tgapi package tgapi
type Chat struct { type Chat struct {
ID int `json:"id"` ID int64 `json:"id"`
Type string `json:"type"` Type string `json:"type"`
Title *string `json:"title,omitempty"` Title *string `json:"title,omitempty"`
Username *string `json:"username,omitempty"` Username *string `json:"username,omitempty"`

5
tgapi/errors.go Normal file
View File

@@ -0,0 +1,5 @@
package tgapi
import "errors"
var ErrRateLimit = errors.New("rate limit exceeded")

View File

@@ -2,9 +2,9 @@ package tgapi
type SendMessageP struct { type SendMessageP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"` BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"` ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"` MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` DirectMessagesTopicID int64 `json:"direct_messages_topic_id,omitempty"`
Text string `json:"text"` Text string `json:"text"`
ParseMode ParseMode `json:"parse_mode,omitempty"` ParseMode ParseMode `json:"parse_mode,omitempty"`
@@ -266,7 +266,7 @@ func (api *API) SendDice(params SendDiceP) (Message, error) {
} }
type SendMessageDraftP struct { type SendMessageDraftP struct {
ChatID int `json:"chat_id"` ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"` MessageThreadID int `json:"message_thread_id,omitempty"`
DraftID uint64 `json:"draft_id"` DraftID uint64 `json:"draft_id"`
Text string `json:"text"` Text string `json:"text"`
@@ -281,7 +281,7 @@ func (api *API) SendMessageDraft(params SendMessageDraftP) (bool, error) {
type SendChatActionP struct { type SendChatActionP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"` BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"` ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"` MessageThreadID int `json:"message_thread_id,omitempty"`
Action ChatActionType `json:"action"` Action ChatActionType `json:"action"`
} }
@@ -307,7 +307,7 @@ func (api *API) SetMessageReaction(params SetMessageReactionP) (bool, error) {
type EditMessageTextP struct { type EditMessageTextP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"` BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id,omitempty"` ChatID int64 `json:"chat_id,omitempty"`
MessageID int `json:"message_id,omitempty"` MessageID int `json:"message_id,omitempty"`
InlineMessageID string `json:"inline_message_id,omitempty"` InlineMessageID string `json:"inline_message_id,omitempty"`
Text string `json:"text"` Text string `json:"text"`
@@ -331,7 +331,7 @@ func (api *API) EditMessageText(params EditMessageTextP) (Message, bool, error)
type EditMessageCaptionP struct { type EditMessageCaptionP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"` BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id,omitempty"` ChatID int64 `json:"chat_id,omitempty"`
MessageID int `json:"message_id,omitempty"` MessageID int `json:"message_id,omitempty"`
InlineMessageID string `json:"inline_message_id,omitempty"` InlineMessageID string `json:"inline_message_id,omitempty"`
Caption string `json:"caption"` Caption string `json:"caption"`
@@ -495,7 +495,7 @@ func (api *API) DeclineSuggestedPost(params DeclineSuggestedPostP) (bool, error)
} }
type DeleteMessageP struct { type DeleteMessageP struct {
ChatID int `json:"chat_id"` ChatID int64 `json:"chat_id"`
MessageID int `json:"message_id"` MessageID int `json:"message_id"`
} }

View File

@@ -6,9 +6,15 @@ type MessageReplyMarkup struct {
InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"` InlineKeyboard [][]InlineKeyboardButton `json:"inline_keyboard"`
} }
type DirectMessageTopic struct {
TopicID int64 `json:"topic_id"`
User *User `json:"user,omitempty"`
}
type Message struct { type Message struct {
MessageID int `json:"message_id"` MessageID int `json:"message_id"`
MessageThreadID int `json:"message_thread_id,omitempty"` MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessageTopic *DirectMessageTopic `json:"direct_message_topic,omitempty"`
BusinessConnectionId string `json:"business_connection_id,omitempty"` BusinessConnectionId string `json:"business_connection_id,omitempty"`
From *User `json:"from,omitempty"` From *User `json:"from,omitempty"`

View File

@@ -66,13 +66,13 @@ func NewUploaderRequest[R, P any](method string, params P, files ...UploaderFile
} }
func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R, error) { func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R, error) {
var zero R var zero R
if up.api.limiter != nil { if up.api.Limiter != nil {
if up.api.dropOverflowLimit { if up.api.dropOverflowLimit {
if !up.api.limiter.Allow() { if !up.api.Limiter.GlobalAllow() {
return zero, errors.New("rate limited") return zero, errors.New("rate limited")
} }
} else { } else {
if err := up.api.limiter.Wait(ctx); err != nil { if err := up.api.Limiter.GlobalWait(ctx); err != nil {
return zero, err return zero, err
} }
} }
@@ -109,7 +109,11 @@ func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R,
return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(body)) return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(body))
} }
return parseBody[R](body) respBody, err := parseBody[R](body)
if err != nil {
return zero, err
}
return respBody.Result, nil
} }
func (r UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) { func (r UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) {
var zero R var zero R

View File

@@ -2,7 +2,7 @@ package tgapi
type UploadPhotoP struct { type UploadPhotoP struct {
BusinessConnectionID string `json:"business_connection_id,omitempty"` BusinessConnectionID string `json:"business_connection_id,omitempty"`
ChatID int `json:"chat_id"` ChatID int64 `json:"chat_id"`
MessageThreadID int `json:"message_thread_id,omitempty"` MessageThreadID int `json:"message_thread_id,omitempty"`
DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"`

129
utils/limiter.go Normal file
View File

@@ -0,0 +1,129 @@
package utils
import (
"context"
"sync"
"time"
"golang.org/x/time/rate"
)
type RateLimiter struct {
globalLockUntil time.Time
globalLimiter *rate.Limiter
globalMu sync.RWMutex
chatLocks map[int64]time.Time
chatLimiters map[int64]*rate.Limiter
chatMu sync.Mutex
}
func NewRateLimiter() *RateLimiter {
return &RateLimiter{
// 30 запросов в секунду (burst=30)
globalLimiter: rate.NewLimiter(rate.Limit(30), 30),
chatLimiters: make(map[int64]*rate.Limiter),
}
}
func (rl *RateLimiter) SetGlobalLock(retryAfter int) {
if retryAfter <= 0 {
return
}
rl.globalMu.Lock()
defer rl.globalMu.Unlock()
rl.globalLockUntil = time.Now().Add(time.Duration(retryAfter) * time.Second)
}
func (rl *RateLimiter) SetChatLock(chatID int64, retryAfter int) {
rl.chatMu.Lock()
defer rl.chatMu.Unlock()
rl.chatLocks[chatID] = time.Now().Add(time.Duration(retryAfter) * time.Second)
}
func (rl *RateLimiter) GlobalWait(ctx context.Context) error {
rl.globalMu.RLock()
until := rl.globalLockUntil
rl.globalMu.RUnlock()
if !until.IsZero() {
if time.Now().Before(until) {
// Ждём до окончания блокировки или отмены контекста
select {
case <-time.After(time.Until(until)):
// блокировка снята
case <-ctx.Done():
return ctx.Err()
}
}
}
// Теперь ждём разрешения rate limiter'а
return rl.globalLimiter.Wait(ctx)
}
func (rl *RateLimiter) Wait(ctx context.Context, chatID int64) error {
rl.chatMu.Lock()
until, ok := rl.chatLocks[chatID]
rl.chatMu.Unlock()
if ok && !until.IsZero() {
if time.Now().Before(until) {
select {
case <-time.After(time.Until(until)):
// блокировка снята
case <-ctx.Done():
return ctx.Err()
}
}
}
if err := rl.GlobalWait(ctx); err != nil {
return err
}
rl.chatMu.Lock()
chatLimiter, ok := rl.chatLimiters[chatID]
if !ok {
chatLimiter = rate.NewLimiter(rate.Limit(1), 1)
rl.chatLimiters[chatID] = chatLimiter
}
rl.chatMu.Unlock()
return chatLimiter.Wait(ctx)
}
func (rl *RateLimiter) GlobalAllow() bool {
rl.globalMu.RLock()
until := rl.globalLockUntil
rl.globalMu.RUnlock()
if !until.IsZero() {
if time.Now().Before(until) {
// Ждём до окончания блокировки или отмены контекста
select {
case <-time.After(time.Until(until)):
rl.globalLimiter.Allow()
}
}
}
return rl.globalLimiter.Allow()
}
func (rl *RateLimiter) Allow(chatID int64) bool {
rl.chatMu.Lock()
until, ok := rl.chatLocks[chatID]
rl.chatMu.Unlock()
if ok && !until.IsZero() {
if time.Now().Before(until) {
select {
case <-time.After(time.Until(until)):
}
}
}
if !rl.globalLimiter.Allow() {
return false
}
rl.chatMu.Lock()
chatLimiter, ok := rl.chatLimiters[chatID]
if !ok {
chatLimiter = rate.NewLimiter(rate.Limit(1), 1)
rl.chatLimiters[chatID] = chatLimiter
}
rl.chatMu.Unlock()
return chatLimiter.Allow()
}

View File

@@ -1,9 +1,9 @@
package utils package utils
const ( const (
VersionString = "1.0.0-beta.4" VersionString = "1.0.0-beta.7"
VersionMajor = 1 VersionMajor = 1
VersionMinor = 0 VersionMinor = 0
VersionPatch = 0 VersionPatch = 0
Beta = 4 Beta = 7
) )