From 9895edf966ac08b7952f40feba6d3f2cb8e6c120 Mon Sep 17 00:00:00 2001 From: ScuroNeko Date: Fri, 6 Mar 2026 11:59:17 +0300 Subject: [PATCH] v1.0.0 beta 9 --- bot.go | 5 + drafts.go | 34 ++++- handler.go | 50 +++---- msg_context.go | 2 +- runners.go | 3 + tgapi/api.go | 249 +++++++++++++++++++++++++---------- tgapi/attachments_methods.go | 34 ++--- tgapi/chat_methods.go | 154 +++++++++++----------- tgapi/errors.go | 2 + tgapi/forum_methods.go | 38 +++--- tgapi/messages_methods.go | 164 +++++++++++------------ tgapi/pool.go | 132 +++++++++++++------ tgapi/stickers_methods.go | 4 +- tgapi/uploader_api.go | 100 +++++++++----- tgapi/uploader_methods.go | 18 +-- utils/limiter.go | 99 ++++++++++---- utils/multipart.go | 79 ++++++----- utils/utils.go | 16 ++- utils/version.go | 4 +- 19 files changed, 731 insertions(+), 456 deletions(-) diff --git a/bot.go b/bot.go index 6abec8f..f7439de 100644 --- a/bot.go +++ b/bot.go @@ -74,6 +74,7 @@ type Bot[T DbContext] struct { token string debug bool errorTemplate string + username string logger *slog.Logger RequestLogger *slog.Logger @@ -140,6 +141,10 @@ func NewBot[T any](opts *BotOpts) *Bot[T] { _ = bot.Close() bot.logger.Fatal(err) } + bot.username = Val(u.Username, "") + if bot.username == "" { + bot.logger.Warn("Can't get bot username. Named command wouldn't work!") + } bot.logger.Infof("Authorized as %s\n", u.FirstName) return bot diff --git a/drafts.go b/drafts.go index fb37088..5fa2384 100644 --- a/drafts.go +++ b/drafts.go @@ -1,7 +1,6 @@ package laniakea import ( - "math" "math/rand/v2" "sync/atomic" @@ -17,16 +16,16 @@ type RandomDraftIdGenerator struct { } func (g *RandomDraftIdGenerator) Next() uint64 { - return rand.Uint64N(math.MaxUint64) + return rand.Uint64() } type LinearDraftIdGenerator struct { draftIdGenerator - lastId uint64 + lastId atomic.Uint64 } func (g *LinearDraftIdGenerator) Next() uint64 { - return atomic.AddUint64(&g.lastId, 1) + return g.lastId.Add(1) } type DraftProvider struct { @@ -41,7 +40,8 @@ type DraftProvider struct { generator draftIdGenerator } type Draft struct { - api *tgapi.API + api *tgapi.API + provider *DraftProvider chatID int64 messageThreadID int @@ -59,15 +59,28 @@ func NewRandomDraftProvider(api *tgapi.API) *DraftProvider { } } func NewLinearDraftProvider(api *tgapi.API, startValue uint64) *DraftProvider { + g := &LinearDraftIdGenerator{} + g.lastId.Store(startValue) return &DraftProvider{ api: api, - generator: &LinearDraftIdGenerator{lastId: startValue}, + generator: g, drafts: make(map[uint64]*Draft), } } func (d *DraftProvider) NewDraft() *Draft { id := d.generator.Next() - draft := &Draft{d.api, d.chatID, d.messageThreadID, d.parseMode, d.entities, id, ""} + entitiesCopy := make([]tgapi.MessageEntity, 0) + copy(entitiesCopy, d.entities) + draft := &Draft{ + api: d.api, + provider: d, + chatID: d.chatID, + messageThreadID: d.messageThreadID, + parseMode: d.parseMode, + entities: entitiesCopy, + ID: id, + Message: "", + } d.drafts[id] = draft return draft } @@ -87,6 +100,9 @@ func (d *Draft) Push(newText string) error { _, err := d.api.SendMessageDraft(params) return err } +func (d *Draft) Clear() { + d.Message = "" +} func (d *Draft) Flush() error { if d.Message == "" { return nil @@ -102,5 +118,9 @@ func (d *Draft) Flush() error { params.MessageThreadID = d.messageThreadID } _, err := d.api.SendMessage(params) + if err == nil { + d.Clear() + delete(d.provider.drafts, d.ID) + } return err } diff --git a/handler.go b/handler.go index 27ba991..bdf0133 100644 --- a/handler.go +++ b/handler.go @@ -49,34 +49,34 @@ func (bot *Bot[T]) handleMessage(update *tgapi.Update, ctx *MsgContext) { ctx.From = update.Message.From ctx.Msg = update.Message + // Убираем префикс text = strings.TrimSpace(text[len(prefix):]) + // Извлекаем команду как первое слово + spaceIndex := strings.Index(text, " ") + var cmd string + var args string + + if spaceIndex == -1 { + cmd = text + args = "" + } else { + cmd = text[:spaceIndex] + args = strings.TrimSpace(text[spaceIndex:]) + } + + if strings.Contains(cmd, "@") { + botUsername := bot.username + if botUsername != "" && strings.HasSuffix(cmd, "@"+botUsername) { + cmd = cmd[:len(cmd)-len("@"+botUsername)] // убираем @botname + } + } + + // Ищем команду по точному совпадению for _, plugin := range bot.plugins { - for cmd := range plugin.commands { - if !strings.HasPrefix(text, cmd) { - continue - } - requestParts := strings.Split(text, " ") - cmdParts := strings.Split(cmd, " ") - isValid := true - for i, part := range cmdParts { - if part != requestParts[i] { - isValid = false - break - } - } - - if !isValid { - continue - } - - ctx.Text = strings.TrimSpace(text[len(cmd):]) - if ctx.Text == "" { - ctx.Args = []string{} - } else { - ctx.Args = strings.Split(ctx.Text, " ") - } - + if _, exists := plugin.commands[cmd]; exists { + ctx.Text = args + ctx.Args = strings.Fields(args) // Убирает лишние пробелы if !plugin.executeMiddlewares(ctx, bot.dbContext) { return } diff --git a/msg_context.go b/msg_context.go index 817a630..5ad0f40 100644 --- a/msg_context.go +++ b/msg_context.go @@ -103,7 +103,7 @@ func (ctx *MsgContext) answer(text string, keyboard *InlineKeyboard) *AnswerMess params := tgapi.SendMessageP{ ChatID: ctx.Msg.Chat.ID, Text: text, - ParseMode: tgapi.ParseMD, + ParseMode: tgapi.ParseMDV2, } if keyboard != nil { params.ReplyMarkup = keyboard.Get() diff --git a/runners.go b/runners.go index 6f273c7..29c0ed5 100644 --- a/runners.go +++ b/runners.go @@ -13,6 +13,9 @@ type Runner[T DbContext] struct { fn RunnerFn[T] } +// NewRunner creates a new Runner with async=true by default. +// Builder methods (Onetime, Async, Timeout) modify the Runner in-place. +// DO NOT call builder methods concurrently or after Execute(). func NewRunner[T DbContext](name string, fn RunnerFn[T]) *Runner[T] { return &Runner[T]{ name: name, fn: fn, async: true, diff --git a/tgapi/api.go b/tgapi/api.go index fd2647a..8b994ab 100644 --- a/tgapi/api.go +++ b/tgapi/api.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "net/http" @@ -14,6 +13,8 @@ import ( "git.nix13.pw/scuroneko/slog" ) +// APIOpts holds configuration options for initializing the Telegram API client. +// Use the provided setter methods to build options — do not construct directly. type APIOpts struct { token string client *http.Client @@ -24,36 +25,59 @@ type APIOpts struct { dropOverflowLimit bool } -var ErrPoolUnexpected = errors.New("unexpected response from pool") - +// NewAPIOpts creates a new APIOpts with default values. +// Use setter methods to customize behavior. func NewAPIOpts(token string) *APIOpts { - return &APIOpts{token: token, client: nil, useTestServer: false, apiUrl: "https://api.telegram.org"} + return &APIOpts{ + token: token, + client: nil, + useTestServer: false, + apiUrl: "https://api.telegram.org", + } } + +// SetHTTPClient sets a custom HTTP client. Use this for timeouts, proxies, or custom transport. +// If not set, a default client with 45s timeout is used. func (opts *APIOpts) SetHTTPClient(client *http.Client) *APIOpts { if client != nil { opts.client = client } return opts } + +// UseTestServer enables use of Telegram's test server (https://api.test.telegram.org). +// Only for development/testing. func (opts *APIOpts) UseTestServer(use bool) *APIOpts { opts.useTestServer = use return opts } + +// SetAPIUrl overrides the default Telegram API URL. +// Useful for self-hosted bots or proxies. func (opts *APIOpts) SetAPIUrl(apiUrl string) *APIOpts { if apiUrl != "" { opts.apiUrl = apiUrl } return opts } + +// SetLimiter sets a rate limiter to enforce Telegram's API limits. +// Recommended: use utils.NewRateLimiter() for correct per-chat and global throttling. func (opts *APIOpts) SetLimiter(limiter *utils.RateLimiter) *APIOpts { opts.limiter = limiter return opts } + +// SetLimiterDrop enables "drop mode" for rate limiting. +// If true, requests exceeding limits return ErrDropOverflow immediately. +// If false, requests block until capacity is available. func (opts *APIOpts) SetLimiterDrop(b bool) *APIOpts { opts.dropOverflowLimit = b return opts } +// API is the main Telegram Bot API client. +// It manages HTTP requests, rate limiting, retries, and connection pooling. type API struct { token string client *http.Client @@ -61,73 +85,116 @@ type API struct { useTestServer bool apiUrl string - pool *WorkerPool + pool *workerPool Limiter *utils.RateLimiter dropOverflowLimit bool } +// NewAPI creates a new API client from options. +// Always call CloseApi() when done to release resources. func NewAPI(opts *APIOpts) *API { l := slog.CreateLogger().Level(utils.GetLoggerLevel()).Prefix("API") l.AddWriter(l.CreateJsonStdoutWriter()) + client := opts.client if client == nil { client = &http.Client{Timeout: time.Second * 45} } - pool := NewWorkerPool(16, 256) - pool.Start(context.Background()) + + pool := newWorkerPool(16, 256) + pool.start(context.Background()) + return &API{ - opts.token, client, l, - opts.useTestServer, opts.apiUrl, - pool, opts.limiter, opts.dropOverflowLimit, + token: opts.token, + client: client, + logger: l, + useTestServer: opts.useTestServer, + apiUrl: opts.apiUrl, + pool: pool, + Limiter: opts.limiter, + dropOverflowLimit: opts.dropOverflowLimit, } } + +// CloseApi shuts down the internal worker pool and closes the logger. +// Must be called to avoid resource leaks. func (api *API) CloseApi() error { - api.pool.Stop() + api.pool.stop() return api.logger.Close() } -func (api *API) GetLogger() *slog.Logger { return api.logger } +// GetLogger returns the internal logger for custom logging. +func (api *API) GetLogger() *slog.Logger { + return api.logger +} + +// ResponseParameters contains Telegram API response metadata (e.g., retry_after, migrate_to_chat_id). type ResponseParameters struct { MigrateToChatID *int64 `json:"migrate_to_chat_id,omitempty"` RetryAfter *int `json:"retry_after,omitempty"` } -type ApiResponse[R any] struct { - Ok bool `json:"ok"` - Description string `json:"description,omitempty"` - Result R `json:"result,omitempty"` - ErrorCode int `json:"error_code,omitempty"` - Parameters *ResponseParameters `json:"parameters,omitempty"` +// ApiResponse is the standard Telegram Bot API response structure. +// Generic over Result type R. +type ApiResponse[R any] struct { + Ok bool `json:"ok"` + Description string `json:"description,omitempty"` + Result R `json:"result,omitempty"` + ErrorCode int `json:"error_code,omitempty"` + Parameters *ResponseParameters `json:"parameters,omitempty"` } + +// TelegramRequest is an internal helper struct. +// DO NOT USE NewRequest or NewRequestWithChatID — they are unsafe and discouraged. +// Instead, use explicit methods like SendMessage, GetUpdates, etc. +// +// Why? Because using generics with arbitrary types P and R leads to: +// - No compile-time validation of parameters +// - No IDE autocompletion +// - Runtime panics on malformed JSON +// - Hard-to-debug errors +// +// Recommended: Define specific methods for each Telegram method (see below). type TelegramRequest[R, P any] struct { method string params P chatId int64 } +// NewRequest and NewRequestWithChatID are DEPRECATED. +// They encourage unsafe, untyped usage and bypass Go's type safety. +// Instead, define explicit, type-safe methods for each Telegram API endpoint. +// +// Example: +// +// func (api *API) SendMessage(ctx context.Context, chatID int64, text string) (Message, error) { ... } +// +// This provides: +// +// ✅ Compile-time validation +// ✅ IDE autocompletion +// ✅ Clear API surface +// ✅ Better error messages +// +// DO NOT use these constructors in production code. +// This can be used ONLY for testing or if you NEED method, that wasn't added as function. func NewRequest[R, P any](method string, params P) TelegramRequest[R, P] { return TelegramRequest[R, P]{method, params, 0} } + func NewRequestWithChatID[R, P any](method string, params P, chatId int64) TelegramRequest[R, P] { return TelegramRequest[R, P]{method, params, chatId} } + +// doRequest performs a single HTTP request to Telegram API. +// Handles rate limiting, retries on 429, and parses responses. +// Must be called within a worker pool context if using DoWithContext. func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, error) { var zero R - if api.Limiter != nil { - if api.dropOverflowLimit { - if !api.Limiter.GlobalAllow() { - return zero, errors.New("rate limited") - } - } else { - if err := api.Limiter.GlobalWait(ctx); err != nil { - return zero, err - } - } - } data, err := json.Marshal(r.params) if err != nil { - return zero, err + return zero, fmt.Errorf("failed to marshal request: %w", err) } buf := bytes.NewBuffer(data) @@ -136,54 +203,82 @@ func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, erro methodPrefix = "/test" } url := fmt.Sprintf("%s/bot%s%s/%s", api.apiUrl, api.token, methodPrefix, r.method) + req, err := http.NewRequestWithContext(ctx, "POST", url, buf) if err != nil { - return zero, err + return zero, fmt.Errorf("failed to create request: %w", err) } + req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") req.Header.Set("User-Agent", fmt.Sprintf("Laniakea/%s", utils.VersionString)) + req.Header.Set("Accept-Encoding", "gzip") + req.ContentLength = int64(len(data)) - api.logger.Debugln("REQ", api.apiUrl, r.method, buf.String()) - res, err := api.client.Do(req) - if err != nil { - return zero, err - } - defer func(Body io.ReadCloser) { - _ = Body.Close() - }(res.Body) - - data, err = readBody(res.Body) - if err != nil { - return zero, err - } - api.logger.Debugln("RES", r.method, string(data)) - if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusTooManyRequests { - return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(data)) - } - - responseData, err := parseBody[R](data) - if errors.Is(err, ErrRateLimit) { - if responseData.Parameters != nil { - after := 0 - if responseData.Parameters.RetryAfter != nil { - after = *responseData.Parameters.RetryAfter + for { + // Apply rate limiting before making the request + if api.Limiter != nil { + if err := api.Limiter.Check(ctx, api.dropOverflowLimit, r.chatId); err != nil { + return zero, err } - if r.chatId > 0 { - api.Limiter.SetChatLock(r.chatId, after) - } else { - api.Limiter.SetGlobalLock(after) - } - time.Sleep(time.Duration(after) * time.Second) - return r.doRequest(ctx, api) } - return zero, ErrRateLimit + + api.logger.Debugln("REQ", url, string(data)) + + resp, err := api.client.Do(req) + if err != nil { + return zero, fmt.Errorf("HTTP request failed: %w", err) + } + + data, err = readBody(resp.Body) + _ = resp.Body.Close() // ensure body is closed + if err != nil { + return zero, fmt.Errorf("failed to read response body: %w", err) + } + + api.logger.Debugln("RES", r.method, string(data)) + + response, err := parseBody[R](data) + if err != nil { + return zero, fmt.Errorf("failed to parse response: %w", err) + } + + if !response.Ok { + // Handle rate limiting (429) + if response.ErrorCode == 429 && response.Parameters != nil && response.Parameters.RetryAfter != nil { + after := *response.Parameters.RetryAfter + api.logger.Warnf("Rate limited by Telegram, retry after %d seconds (chat: %d)", after, r.chatId) + + // Apply cooldown to global or chat-specific limiter + if r.chatId > 0 { + api.Limiter.SetChatLock(r.chatId, after) + } else { + api.Limiter.SetGlobalLock(after) + } + + // Wait and retry + select { + case <-ctx.Done(): + return zero, ctx.Err() + case <-time.After(time.Duration(after) * time.Second): + continue // retry request + } + } + + // Other API errors + return zero, fmt.Errorf("[%d] %s", response.ErrorCode, response.Description) + } + + return response.Result, nil } - return responseData.Result, err } + +// DoWithContext executes the request asynchronously via the worker pool. +// Returns result or error via channel. Respects context cancellation. func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) { var zero R - result, err := api.pool.Submit(ctx, func(ctx context.Context) (any, error) { + + resultChan, err := api.pool.submit(ctx, func(ctx context.Context) (any, error) { return r.doRequest(ctx, api) }) if err != nil { @@ -193,35 +288,45 @@ func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, select { case <-ctx.Done(): return zero, ctx.Err() - case res := <-result: - if res.Err != nil { - return zero, res.Err + case res := <-resultChan: + if res.err != nil { + return zero, res.err } - if val, ok := res.Value.(R); ok { + if val, ok := res.value.(R); ok { return val, nil } return zero, ErrPoolUnexpected } } + +// Do executes the request synchronously with a background context. +// Use only for simple, non-critical calls. func (r TelegramRequest[R, P]) Do(api *API) (R, error) { return r.DoWithContext(context.Background(), api) } +// readBody reads and limits response body to prevent memory exhaustion. +// Telegram responses are typically small (<1MB), but we cap at 10MB. func readBody(body io.ReadCloser) ([]byte, error) { - reader := io.LimitReader(body, 10<<20) + reader := io.LimitReader(body, 10<<20) // 10 MB return io.ReadAll(reader) } + +// parseBody unmarshals Telegram API response and returns structured result. +// Returns ErrRateLimit internally if error_code == 429 — caller must handle via response.Ok check. func parseBody[R any](data []byte) (ApiResponse[R], error) { var resp ApiResponse[R] err := json.Unmarshal(data, &resp) if err != nil { - return resp, err + return resp, fmt.Errorf("failed to unmarshal JSON: %w", err) } + if !resp.Ok { if resp.ErrorCode == 429 { - return resp, ErrRateLimit + return resp, ErrRateLimit // internal use only } return resp, fmt.Errorf("[%d] %s", resp.ErrorCode, resp.Description) } + return resp, nil } diff --git a/tgapi/attachments_methods.go b/tgapi/attachments_methods.go index 455cd8c..1a1bfb1 100644 --- a/tgapi/attachments_methods.go +++ b/tgapi/attachments_methods.go @@ -24,13 +24,13 @@ type SendPhotoP struct { } func (api *API) SendPhoto(params SendPhotoP) (Message, error) { - req := NewRequest[Message]("sendPhoto", params) + req := NewRequestWithChatID[Message]("sendPhoto", params, params.ChatID) return req.Do(api) } type SendAudioP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -53,13 +53,13 @@ type SendAudioP struct { } func (api *API) SendAudio(params SendAudioP) (Message, error) { - req := NewRequest[Message]("sendAudio", params) + req := NewRequestWithChatID[Message]("sendAudio", params, params.ChatID) return req.Do(api) } type SendDocumentP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -79,13 +79,13 @@ type SendDocumentP struct { } func (api *API) SendDocument(params SendDocumentP) (Message, error) { - req := NewRequest[Message]("sendDocument", params) + req := NewRequestWithChatID[Message]("sendDocument", params, params.ChatID) return req.Do(api) } type SendVideoP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -114,13 +114,13 @@ type SendVideoP struct { } func (api *API) SendVideo(params SendVideoP) (Message, error) { - req := NewRequest[Message]("sendVideo", params) + req := NewRequestWithChatID[Message]("sendVideo", params, params.ChatID) return req.Do(api) } type SendAnimationP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -145,13 +145,13 @@ type SendAnimationP struct { } func (api *API) SendAnimation(params SendAnimationP) (Message, error) { - req := NewRequest[Message]("sendAnimation", params) + req := NewRequestWithChatID[Message]("sendAnimation", params, params.ChatID) return req.Do(api) } type SendVoiceP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -171,13 +171,13 @@ type SendVoiceP struct { } func (api *API) SendVoice(params *SendVoiceP) (Message, error) { - req := NewRequest[Message]("sendVoice", params) + req := NewRequestWithChatID[Message]("sendVoice", params, params.ChatID) return req.Do(api) } type SendVideoNoteP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -195,13 +195,13 @@ type SendVideoNoteP struct { } func (api *API) SendVideoNote(params SendVideoNoteP) (Message, error) { - req := NewRequest[Message]("sendVideoNote", params) + req := NewRequestWithChatID[Message]("sendVideoNote", params, params.ChatID) return req.Do(api) } type SendPaidMediaP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` StarCount int `json:"star_count,omitempty"` @@ -222,13 +222,13 @@ type SendPaidMediaP struct { } func (api *API) SendPaidMedia(params SendPaidMediaP) (Message, error) { - req := NewRequest[Message]("sendPaidMedia", params) + req := NewRequestWithChatID[Message]("sendPaidMedia", params, params.ChatID) return req.Do(api) } type SendMediaGroupP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -241,6 +241,6 @@ type SendMediaGroupP struct { } func (api *API) SendMediaGroup(params SendMediaGroupP) (Message, error) { - req := NewRequest[Message]("sendMediaGroup", params) + req := NewRequestWithChatID[Message]("sendMediaGroup", params, params.ChatID) return req.Do(api) } diff --git a/tgapi/chat_methods.go b/tgapi/chat_methods.go index 8294258..ad1fc33 100644 --- a/tgapi/chat_methods.go +++ b/tgapi/chat_methods.go @@ -1,30 +1,30 @@ package tgapi type BanChatMemberP struct { - ChatID int `json:"chat_id"` - UserID int `json:"user_id"` - UntilDate int `json:"until_date,omitempty"` - RevokeMessages bool `json:"revoke_messages,omitempty"` + ChatID int64 `json:"chat_id"` + UserID int `json:"user_id"` + UntilDate int `json:"until_date,omitempty"` + RevokeMessages bool `json:"revoke_messages,omitempty"` } func (api *API) BanChatMember(params BanChatMemberP) (bool, error) { - req := NewRequest[bool]("banChatMember", params) + req := NewRequestWithChatID[bool]("banChatMember", params, params.ChatID) return req.Do(api) } type UnbanChatMemberP struct { - ChatID int `json:"chat_id"` - UserID int `json:"user_id"` - OnlyIfBanned bool `json:"only_if_banned"` + ChatID int64 `json:"chat_id"` + UserID int `json:"user_id"` + OnlyIfBanned bool `json:"only_if_banned"` } func (api *API) UnbanChatMember(params UnbanChatMemberP) (bool, error) { - req := NewRequest[bool]("unbanChatMember", params) + req := NewRequestWithChatID[bool]("unbanChatMember", params, params.ChatID) return req.Do(api) } type RestrictChatMemberP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` UserID int `json:"user_id"` Permissions ChatPermissions `json:"permissions"` UseIndependentChatPermissions bool `json:"use_independent_chat_permissions,omitempty"` @@ -32,14 +32,14 @@ type RestrictChatMemberP struct { } func (api *API) RestrictChatMember(params RestrictChatMemberP) (bool, error) { - req := NewRequest[bool]("restrictChatMember", params) + req := NewRequestWithChatID[bool]("restrictChatMember", params, params.ChatID) return req.Do(api) } type PromoteChatMember struct { - ChatID int `json:"chat_id"` - UserID int `json:"user_id"` - IsAnonymous bool `json:"is_anonymous,omitempty"` + ChatID int64 `json:"chat_id"` + UserID int `json:"user_id"` + IsAnonymous bool `json:"is_anonymous,omitempty"` CanManageChat bool `json:"can_manage_chat,omitempty"` CanDeleteMessages bool `json:"can_delete_messages,omitempty"` @@ -60,74 +60,74 @@ type PromoteChatMember struct { } func (api *API) PromoteChatMember(params PromoteChatMember) (bool, error) { - req := NewRequest[bool]("promoteChatMember", params) + req := NewRequestWithChatID[bool]("promoteChatMember", params, params.ChatID) return req.Do(api) } type SetChatAdministratorCustomTitleP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` UserID int `json:"user_id"` CustomTitle string `json:"custom_title"` } func (api *API) SetChatAdministratorCustomTitle(params SetChatAdministratorCustomTitleP) (bool, error) { - req := NewRequest[bool]("setChatAdministratorCustomTitle", params) + req := NewRequestWithChatID[bool]("setChatAdministratorCustomTitle", params, params.ChatID) return req.Do(api) } type SetChatMemberTagP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` UserID int `json:"user_id"` Tag string `json:"tag,omitempty"` } func (api *API) SetChatMemberTag(params SetChatMemberTagP) (bool, error) { - req := NewRequest[bool]("setChatMemberTag", params) + req := NewRequestWithChatID[bool]("setChatMemberTag", params, params.ChatID) return req.Do(api) } type BanChatSenderChatP struct { - ChatID int `json:"chat_id"` - SenderChatID int `json:"sender_chat_id"` + ChatID int64 `json:"chat_id"` + SenderChatID int64 `json:"sender_chat_id"` } func (api *API) BanChatSenderChat(params BanChatSenderChatP) (bool, error) { - req := NewRequest[bool]("banChatSenderChat", params) + req := NewRequestWithChatID[bool]("banChatSenderChat", params, params.ChatID) return req.Do(api) } type UnbanChatSenderChatP struct { - ChatID int `json:"chat_id"` - SenderChatID int `json:"sender_chat_id"` + ChatID int64 `json:"chat_id"` + SenderChatID int64 `json:"sender_chat_id"` } func (api *API) UnbanChatSenderChat(params BanChatSenderChatP) (bool, error) { - req := NewRequest[bool]("unbanChatSenderChat", params) + req := NewRequestWithChatID[bool]("unbanChatSenderChat", params, params.ChatID) return req.Do(api) } type SetChatPermissionsP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Permissions ChatPermissions `json:"permissions"` UseIndependentChatPermissions bool `json:"use_independent_chat_permissions,omitempty"` } func (api *API) SetChatPermissions(params SetChatPermissionsP) (bool, error) { - req := NewRequest[bool]("setChatPermissions", params) + req := NewRequestWithChatID[bool]("setChatPermissions", params, params.ChatID) return req.Do(api) } type ExportChatInviteLinkP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) ExportChatInviteLink(params ExportChatInviteLinkP) (string, error) { - req := NewRequest[string]("exportChatInviteLink", params) + req := NewRequestWithChatID[string]("exportChatInviteLink", params, params.ChatID) return req.Do(api) } type CreateChatInviteLinkP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Name *string `json:"name,omitempty"` ExpireDate int `json:"expire_date,omitempty"` MemberLimit int `json:"member_limit,omitempty"` @@ -135,12 +135,12 @@ type CreateChatInviteLinkP struct { } func (api *API) CreateChatInviteLink(params CreateChatInviteLinkP) (ChatInviteLink, error) { - req := NewRequest[ChatInviteLink]("createChatInviteLink", params) + req := NewRequestWithChatID[ChatInviteLink]("createChatInviteLink", params, params.ChatID) return req.Do(api) } type EditChatInviteLinkP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` InviteLink string `json:"invite_link"` Name string `json:"name,omitempty"` @@ -150,60 +150,60 @@ type EditChatInviteLinkP struct { } func (api *API) EditChatInviteLink(params EditChatInviteLinkP) (ChatInviteLink, error) { - req := NewRequest[ChatInviteLink]("editChatInviteLink", params) + req := NewRequestWithChatID[ChatInviteLink]("editChatInviteLink", params, params.ChatID) return req.Do(api) } type CreateChatSubscriptionInviteLinkP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Name string `json:"name,omitempty"` SubscriptionPeriod int `json:"subscription_period,omitempty"` SubscriptionPrice int `json:"subscription_price,omitempty"` } func (api *API) CreateChatSubscriptionInviteLink(params CreateChatSubscriptionInviteLinkP) (ChatInviteLink, error) { - req := NewRequest[ChatInviteLink]("createChatSubscriptionInviteLink", params) + req := NewRequestWithChatID[ChatInviteLink]("createChatSubscriptionInviteLink", params, params.ChatID) return req.Do(api) } type EditChatSubscriptionInviteLinkP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` InviteLink string `json:"invite_link"` Name string `json:"name,omitempty"` } func (api *API) EditChatSubscriptionInviteLink(params EditChatSubscriptionInviteLinkP) (ChatInviteLink, error) { - req := NewRequest[ChatInviteLink]("editChatSubscriptionInviteLink", params) + req := NewRequestWithChatID[ChatInviteLink]("editChatSubscriptionInviteLink", params, params.ChatID) return req.Do(api) } type RevokeChatInviteLinkP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` InviteLink string `json:"invite_link"` } func (api *API) RevokeChatInviteLink(params RevokeChatInviteLinkP) (ChatInviteLink, error) { - req := NewRequest[ChatInviteLink]("revokeChatInviteLink", params) + req := NewRequestWithChatID[ChatInviteLink]("revokeChatInviteLink", params, params.ChatID) return req.Do(api) } type ApproveChatJoinRequestP struct { - ChatID int `json:"chat_id"` - UserID int `json:"user_id"` + ChatID int64 `json:"chat_id"` + UserID int `json:"user_id"` } func (api *API) ApproveChatJoinRequest(params ApproveChatJoinRequestP) (bool, error) { - req := NewRequest[bool]("approveChatJoinRequest", params) + req := NewRequestWithChatID[bool]("approveChatJoinRequest", params, params.ChatID) return req.Do(api) } type DeclineChatJoinRequestP struct { - ChatID int `json:"chat_id"` - UserID int `json:"user_id"` + ChatID int64 `json:"chat_id"` + UserID int `json:"user_id"` } func (api *API) DeclineChatJoinRequest(params DeclineChatJoinRequestP) (bool, error) { - req := NewRequest[bool]("declineChatJoinRequest", params) + req := NewRequestWithChatID[bool]("declineChatJoinRequest", params, params.ChatID) return req.Do(api) } @@ -213,143 +213,143 @@ func (api *API) SetChatPhoto() { } type DeleteChatPhotoP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) DeleteChatPhoto(params DeleteChatPhotoP) (bool, error) { - req := NewRequest[bool]("deleteChatPhoto", params) + req := NewRequestWithChatID[bool]("deleteChatPhoto", params, params.ChatID) return req.Do(api) } type SetChatTitleP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Title string `json:"title"` } func (api *API) SetChatTitle(params SetChatTitleP) (bool, error) { - req := NewRequest[bool]("setChatTitle", params) + req := NewRequestWithChatID[bool]("setChatTitle", params, params.ChatID) return req.Do(api) } type SetChatDescriptionP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Description string `json:"description"` } func (api *API) SetChatDescription(params SetChatDescriptionP) (bool, error) { - req := NewRequest[bool]("setChatDescription", params) + req := NewRequestWithChatID[bool]("setChatDescription", params, params.ChatID) return req.Do(api) } type PinChatMessageP struct { BusinessConnectionID *string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageID int `json:"message_id"` DisableNotification bool `json:"disable_notification,omitempty"` } func (api *API) PinChatMessage(params PinChatMessageP) (bool, error) { - req := NewRequest[bool]("pinChatMessage", params) + req := NewRequestWithChatID[bool]("pinChatMessage", params, params.ChatID) return req.Do(api) } type UnpinChatMessageP struct { BusinessConnectionID *string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageID int `json:"message_id"` } func (api *API) UnpinChatMessage(params UnpinChatMessageP) (bool, error) { - req := NewRequest[bool]("unpinChatMessage", params) + req := NewRequestWithChatID[bool]("unpinChatMessage", params, params.ChatID) return req.Do(api) } type UnpinAllChatMessagesP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) UnpinAllChatMessages(params UnpinAllChatMessagesP) (bool, error) { - req := NewRequest[bool]("unpinAllChatMessages", params) + req := NewRequestWithChatID[bool]("unpinAllChatMessages", params, params.ChatID) return req.Do(api) } type LeaveChatP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) LeaveChat(params LeaveChatP) (bool, error) { - req := NewRequest[bool]("leaveChatP", params) + req := NewRequestWithChatID[bool]("leaveChatP", params, params.ChatID) return req.Do(api) } type GetChatP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) GetChatP(params GetChatP) (ChatFullInfo, error) { - req := NewRequest[ChatFullInfo]("getChatP", params) + req := NewRequestWithChatID[ChatFullInfo]("getChatP", params, params.ChatID) return req.Do(api) } type GetChatAdministratorsP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) GetChatAdministrators(params GetChatAdministratorsP) ([]ChatMember, error) { - req := NewRequest[[]ChatMember]("getChatAdministrators", params) + req := NewRequestWithChatID[[]ChatMember]("getChatAdministrators", params, params.ChatID) return req.Do(api) } type GetChatMembersCountP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) GetChatMemberCount(params GetChatMembersCountP) (int, error) { - req := NewRequest[int]("getChatMemberCount", params) + req := NewRequestWithChatID[int]("getChatMemberCount", params, params.ChatID) return req.Do(api) } type GetChatMemberP struct { - ChatID int `json:"chat_id"` - UserID int `json:"user_id"` + ChatID int64 `json:"chat_id"` + UserID int `json:"user_id"` } func (api *API) GetChatMember(params GetChatMemberP) (ChatMember, error) { - req := NewRequest[ChatMember]("getChatMember", params) + req := NewRequestWithChatID[ChatMember]("getChatMember", params, params.ChatID) return req.Do(api) } type SetChatStickerSetP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` StickerSetName string `json:"sticker_set_name"` } func (api *API) SetChatStickerSet(params SetChatStickerSetP) (bool, error) { - req := NewRequest[bool]("setChatStickerSet", params) + req := NewRequestWithChatID[bool]("setChatStickerSet", params, params.ChatID) return req.Do(api) } type DeleteChatStickerSetP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (api *API) DeleteChatStickerSet(params DeleteChatStickerSetP) (bool, error) { - req := NewRequest[bool]("deleteChatStickerSet", params) + req := NewRequestWithChatID[bool]("deleteChatStickerSet", params, params.ChatID) return req.Do(api) } type GetUserChatBoostsP struct { - ChatID int `json:"chat_id"` - UserID int `json:"user_id"` + ChatID int64 `json:"chat_id"` + UserID int `json:"user_id"` } func (api *API) GetUserChatBoosts(params GetUserChatBoostsP) (UserChatBoosts, error) { - req := NewRequest[UserChatBoosts]("getUserChatBoosts", params) + req := NewRequestWithChatID[UserChatBoosts]("getUserChatBoosts", params, params.ChatID) return req.Do(api) } type GetChatGiftsP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` ExcludeUnsaved bool `json:"exclude_unsaved,omitempty"` ExcludeSaved bool `json:"exclude_saved,omitempty"` ExcludeUnlimited bool `json:"exclude_unlimited,omitempty"` @@ -363,6 +363,6 @@ type GetChatGiftsP struct { } func (api *API) GetChatGifts(params GetChatGiftsP) (OwnedGifts, error) { - req := NewRequest[OwnedGifts]("getChatGifts", params) + req := NewRequestWithChatID[OwnedGifts]("getChatGifts", params, params.ChatID) return req.Do(api) } diff --git a/tgapi/errors.go b/tgapi/errors.go index 48a368b..fa1d205 100644 --- a/tgapi/errors.go +++ b/tgapi/errors.go @@ -3,3 +3,5 @@ package tgapi import "errors" var ErrRateLimit = errors.New("rate limit exceeded") +var ErrPoolUnexpected = errors.New("unexpected response from pool") +var ErrPoolQueueFull = errors.New("worker pool queue full") diff --git a/tgapi/forum_methods.go b/tgapi/forum_methods.go index e098983..137f87d 100644 --- a/tgapi/forum_methods.go +++ b/tgapi/forum_methods.go @@ -1,24 +1,24 @@ package tgapi type BaseForumTopicP struct { - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id"` } -func (api *API) GetForumTopicIconSet() ([]Sticker, error) { - req := NewRequest[[]Sticker]("getForumTopicIconSet", NoParams) +func (api *API) GetForumTopicIconStickers() ([]Sticker, error) { + req := NewRequest[[]Sticker]("getForumTopicIconStickers", NoParams) return req.Do(api) } type CreateForumTopicP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Name string `json:"name"` IconColor ForumTopicIconColor `json:"icon_color"` IconCustomEmojiID string `json:"icon_custom_emoji_id"` } func (api *API) CreateForumTopic(params CreateForumTopicP) (ForumTopic, error) { - req := NewRequest[ForumTopic]("createForumTopic", params) + req := NewRequestWithChatID[ForumTopic]("createForumTopic", params, params.ChatID) return req.Do(api) } @@ -29,58 +29,58 @@ type EditForumTopicP struct { } func (api *API) EditForumTopic(params EditForumTopicP) (bool, error) { - req := NewRequest[bool]("editForumTopic", params) + req := NewRequestWithChatID[bool]("editForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) CloseForumTopic(params BaseForumTopicP) (bool, error) { - req := NewRequest[bool]("closeForumTopic", params) + req := NewRequestWithChatID[bool]("closeForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) ReopenForumTopic(params BaseForumTopicP) (bool, error) { - req := NewRequest[bool]("reopenForumTopic", params) + req := NewRequestWithChatID[bool]("reopenForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) DeleteForumTopic(params BaseForumTopicP) (bool, error) { - req := NewRequest[bool]("deleteForumTopic", params) + req := NewRequestWithChatID[bool]("deleteForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) UnpinAllForumTopicMessages(params BaseForumTopicP) (bool, error) { - req := NewRequest[bool]("unpinAllForumTopicMessages", params) + req := NewRequestWithChatID[bool]("unpinAllForumTopicMessages", params, params.ChatID) return req.Do(api) } type BaseGeneralForumTopicP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } type EditGeneralForumTopicP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Name string `json:"name"` } func (api *API) EditGeneralForumTopic(params EditGeneralForumTopicP) (bool, error) { - req := NewRequest[bool]("editGeneralForumTopic", params) + req := NewRequestWithChatID[bool]("editGeneralForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) CloseGeneralForumTopic(params BaseGeneralForumTopicP) (bool, error) { - req := NewRequest[bool]("closeGeneralForumTopic", params) + req := NewRequestWithChatID[bool]("closeGeneralForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) ReopenGeneralForumTopic(params BaseGeneralForumTopicP) (bool, error) { - req := NewRequest[bool]("reopenGeneralForumTopic", params) + req := NewRequestWithChatID[bool]("reopenGeneralForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) HideGeneralForumTopic(params BaseGeneralForumTopicP) (bool, error) { - req := NewRequest[bool]("hideGeneralForumTopic", params) + req := NewRequestWithChatID[bool]("hideGeneralForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) UnhideGeneralForumTopic(params BaseGeneralForumTopicP) (bool, error) { - req := NewRequest[bool]("unhideGeneralForumTopic", params) + req := NewRequestWithChatID[bool]("unhideGeneralForumTopic", params, params.ChatID) return req.Do(api) } func (api *API) UnpinAllGeneralForumTopicMessages(params BaseGeneralForumTopicP) (bool, error) { - req := NewRequest[bool]("unpinAllGeneralForumTopicMessages", params) + req := NewRequestWithChatID[bool]("unpinAllGeneralForumTopicMessages", params, params.ChatID) return req.Do(api) } diff --git a/tgapi/messages_methods.go b/tgapi/messages_methods.go index 2e8d0bb..cbcd7ce 100644 --- a/tgapi/messages_methods.go +++ b/tgapi/messages_methods.go @@ -26,47 +26,47 @@ func (api *API) SendMessage(params SendMessageP) (Message, error) { } type ForwardMessageP struct { - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` - MessageID int `json:"message_id,omitempty"` - FromChatID int `json:"from_chat_id,omitempty"` - VideoStartTimestamp int `json:"video_start_timestamp,omitempty"` - DisableNotification bool `json:"disable_notification,omitempty"` - ProtectContent bool `json:"protect_content,omitempty"` + MessageID int `json:"message_id,omitempty"` + FromChatID int64 `json:"from_chat_id,omitempty"` + VideoStartTimestamp int `json:"video_start_timestamp,omitempty"` + DisableNotification bool `json:"disable_notification,omitempty"` + ProtectContent bool `json:"protect_content,omitempty"` MessageEffectID string `json:"message_effect_id,omitempty"` SuggestedPostParameters *SuggestedPostParameters `json:"suggested_post_parameters,omitempty"` } func (api *API) ForwardMessage(params ForwardMessageP) (Message, error) { - req := NewRequest[Message]("forwardMessage", params) + req := NewRequestWithChatID[Message]("forwardMessage", params, params.ChatID) return req.Do(api) } type ForwardMessagesP struct { - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` - FromChatID int `json:"from_chat_id,omitempty"` + FromChatID int64 `json:"from_chat_id,omitempty"` MessageIDs []int `json:"message_ids,omitempty"` DisableNotification bool `json:"disable_notification,omitempty"` ProtectContent bool `json:"protect_content,omitempty"` } func (api *API) ForwardMessages(params ForwardMessagesP) ([]int, error) { - req := NewRequest[[]int]("forwardMessages", params) + req := NewRequestWithChatID[[]int]("forwardMessages", params, params.ChatID) return req.Do(api) } type CopyMessageP struct { - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` - FromChatID int `json:"from_chat_id"` + FromChatID int64 `json:"from_chat_id"` MessageID int `json:"message_id"` VideoStartTimestamp int `json:"video_start_timestamp,omitempty"` Caption string `json:"caption,omitempty"` @@ -85,16 +85,16 @@ type CopyMessageP struct { } func (api *API) CopyMessage(params CopyMessageP) (int, error) { - req := NewRequest[int]("copyMessage", params) + req := NewRequestWithChatID[int]("copyMessage", params, params.ChatID) return req.Do(api) } type CopyMessagesP struct { - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` - FromChatID int `json:"from_chat_id,omitempty"` + FromChatID int64 `json:"from_chat_id,omitempty"` MessageIDs []int `json:"message_ids,omitempty"` DisableNotification bool `json:"disable_notification,omitempty"` ProtectContent bool `json:"protect_content,omitempty"` @@ -102,15 +102,15 @@ type CopyMessagesP struct { } func (api *API) CopyMessages(params CopyMessagesP) ([]int, error) { - req := NewRequest[[]int]("copyMessages", params) + req := NewRequestWithChatID[[]int]("copyMessages", params, params.ChatID) return req.Do(api) } type SendLocationP struct { - BusinessConnectionID int `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + BusinessConnectionID int `json:"business_connection_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` @@ -130,15 +130,15 @@ type SendLocationP struct { } func (api *API) SendLocation(params SendLocationP) (Message, error) { - req := NewRequest[Message]("sendLocation", params) + req := NewRequestWithChatID[Message]("sendLocation", params, params.ChatID) return req.Do(api) } type SendVenueP struct { - BusinessConnectionID int `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + BusinessConnectionID int `json:"business_connection_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` @@ -160,15 +160,15 @@ type SendVenueP struct { } func (api *API) SendVenue(params SendVenueP) (Message, error) { - req := NewRequest[Message]("sendVenue", params) + req := NewRequestWithChatID[Message]("sendVenue", params, params.ChatID) return req.Do(api) } type SendContactP struct { - BusinessConnectionID int `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + BusinessConnectionID int `json:"business_connection_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` PhoneNumber string `json:"phone_number"` FirstName string `json:"first_name"` @@ -186,14 +186,14 @@ type SendContactP struct { } func (api *API) SendContact(params SendContactP) (Message, error) { - req := NewRequest[Message]("sendContact", params) + req := NewRequestWithChatID[Message]("sendContact", params, params.ChatID) return req.Do(api) } type SendPollP struct { - BusinessConnectionID int `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` + BusinessConnectionID int `json:"business_connection_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` Question string `json:"question"` QuestionParseMode ParseMode `json:"question_mode,omitempty"` @@ -220,13 +220,13 @@ type SendPollP struct { } func (api *API) SendPoll(params SendPollP) (Message, error) { - req := NewRequest[Message]("sendPoll", params) + req := NewRequestWithChatID[Message]("sendPoll", params, params.ChatID) return req.Do(api) } type SendChecklistP struct { BusinessConnectionID int `json:"business_connection_id"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` Checklist InputChecklist `json:"checklist"` DisableNotification bool `json:"disable_notification,omitempty"` @@ -238,15 +238,15 @@ type SendChecklistP struct { } func (api *API) SendChecklist(params SendChecklistP) (Message, error) { - req := NewRequest[Message]("sendChecklist", params) + req := NewRequestWithChatID[Message]("sendChecklist", params, params.ChatID) return req.Do(api) } type SendDiceP struct { - BusinessConnectionID int `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` - MessageThreadID int `json:"message_thread_id,omitempty"` - DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` + BusinessConnectionID int `json:"business_connection_id,omitempty"` + ChatID int64 `json:"chat_id"` + MessageThreadID int `json:"message_thread_id,omitempty"` + DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` Emoji string `json:"emoji,omitempty"` @@ -261,7 +261,7 @@ type SendDiceP struct { } func (api *API) SendDice(params SendDiceP) (Message, error) { - req := NewRequest[Message]("sendDice", params) + req := NewRequestWithChatID[Message]("sendDice", params, params.ChatID) return req.Do(api) } @@ -287,19 +287,19 @@ type SendChatActionP struct { } func (api *API) SendChatAction(params SendChatActionP) (bool, error) { - req := NewRequest[bool]("sendChatAction", params) + req := NewRequestWithChatID[bool]("sendChatAction", params, params.ChatID) return req.Do(api) } type SetMessageReactionP struct { - ChatId int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageId int `json:"message_id"` Reaction []ReactionType `json:"reaction"` IsBig bool `json:"is_big,omitempty"` } func (api *API) SetMessageReaction(params SetMessageReactionP) (bool, error) { - req := NewRequest[bool]("setMessageReaction", params) + req := NewRequestWithChatID[bool]("setMessageReaction", params, params.ChatID) return req.Do(api) } @@ -320,11 +320,11 @@ type EditMessageTextP struct { func (api *API) EditMessageText(params EditMessageTextP) (Message, bool, error) { var zero Message if params.InlineMessageID != "" { - req := NewRequest[bool]("editMessageText", params) + req := NewRequestWithChatID[bool]("editMessageText", params, params.ChatID) res, err := req.Do(api) return zero, res, err } - req := NewRequest[Message]("editMessageText", params) + req := NewRequestWithChatID[Message]("editMessageText", params, params.ChatID) res, err := req.Do(api) return res, false, err } @@ -344,18 +344,18 @@ type EditMessageCaptionP struct { func (api *API) EditMessageCaption(params EditMessageCaptionP) (Message, bool, error) { var zero Message if params.InlineMessageID != "" { - req := NewRequest[bool]("editMessageCaption", params) + req := NewRequestWithChatID[bool]("editMessageCaption", params, params.ChatID) res, err := req.Do(api) return zero, res, err } - req := NewRequest[Message]("editMessageCaption", params) + req := NewRequestWithChatID[Message]("editMessageCaption", params, params.ChatID) res, err := req.Do(api) return res, false, err } type EditMessageMediaP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id,omitempty"` + ChatID int64 `json:"chat_id,omitempty"` MessageID int `json:"message_id,omitempty"` InlineMessageID string `json:"inline_message_id,omitempty"` Message InputMedia `json:"message"` @@ -367,18 +367,18 @@ type EditMessageMediaP struct { func (api *API) EditMessageMedia(params EditMessageMediaP) (Message, bool, error) { var zero Message if params.InlineMessageID != "" { - req := NewRequest[bool]("editMessageMedia", params) + req := NewRequestWithChatID[bool]("editMessageMedia", params, params.ChatID) res, err := req.Do(api) return zero, res, err } - req := NewRequest[Message]("editMessageMedia", params) + req := NewRequestWithChatID[Message]("editMessageMedia", params, params.ChatID) res, err := req.Do(api) return res, false, err } type EditMessageLiveLocationP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id,omitempty"` + ChatID int64 `json:"chat_id,omitempty"` MessageID int `json:"message_id,omitempty"` InlineMessageID string `json:"inline_message_id,omitempty"` @@ -396,18 +396,18 @@ type EditMessageLiveLocationP struct { func (api *API) EditMessageLiveLocation(params EditMessageLiveLocationP) (Message, bool, error) { var zero Message if params.InlineMessageID != "" { - req := NewRequest[bool]("editMessageLiveLocation", params) + req := NewRequestWithChatID[bool]("editMessageLiveLocation", params, params.ChatID) res, err := req.Do(api) return zero, res, err } - req := NewRequest[Message]("editMessageLiveLocation", params) + req := NewRequestWithChatID[Message]("editMessageLiveLocation", params, params.ChatID) res, err := req.Do(api) return res, false, err } type StopMessageLiveLocationP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id,omitempty"` + ChatID int64 `json:"chat_id,omitempty"` MessageID int `json:"message_id,omitempty"` InlineMessageID string `json:"inline_message_id,omitempty"` ReplyMarkup *InlineKeyboardMarkup `json:"reply_markup,omitempty"` @@ -418,31 +418,31 @@ type StopMessageLiveLocationP struct { func (api *API) StopMessageLiveLocation(params StopMessageLiveLocationP) (Message, bool, error) { var zero Message if params.InlineMessageID != "" { - req := NewRequest[bool]("stopMessageLiveLocation", params) + req := NewRequestWithChatID[bool]("stopMessageLiveLocation", params, params.ChatID) res, err := req.Do(api) return zero, res, err } - req := NewRequest[Message]("stopMessageLiveLocation", params) + req := NewRequestWithChatID[Message]("stopMessageLiveLocation", params, params.ChatID) res, err := req.Do(api) return res, false, err } type EditMessageChecklistP struct { BusinessConnectionID string `json:"business_connection_id"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageID int `json:"message_id"` Checklist InputChecklist `json:"checklist"` ReplyMarkup *InlineKeyboardMarkup `json:"reply_markup,omitempty"` } func (api *API) EditMessageChecklist(params EditMessageChecklistP) (Message, error) { - req := NewRequest[Message]("editMessageChecklist", params) + req := NewRequestWithChatID[Message]("editMessageChecklist", params, params.ChatID) return req.Do(api) } type EditMessageReplyMarkupP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id,omitempty"` + ChatID int64 `json:"chat_id,omitempty"` MessageID int `json:"message_id,omitempty"` InlineMessageID string `json:"inline_message_id,omitempty"` ReplyMarkup *InlineKeyboardMarkup `json:"reply_markup,omitempty"` @@ -451,46 +451,46 @@ type EditMessageReplyMarkupP struct { func (api *API) EditMessageReplyMarkup(params EditMessageReplyMarkupP) (Message, bool, error) { var zero Message if params.InlineMessageID != "" { - req := NewRequest[bool]("editMessageReplyMarkup", params) + req := NewRequestWithChatID[bool]("editMessageReplyMarkup", params, params.ChatID) res, err := req.Do(api) return zero, res, err } - req := NewRequest[Message]("editMessageReplyMarkup", params) + req := NewRequestWithChatID[Message]("editMessageReplyMarkup", params, params.ChatID) res, err := req.Do(api) return res, false, err } type StopPollP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageID int `json:"message_id"` InlineMessageID string `json:"inline_message_id,omitempty"` } func (api *API) StopPoll(params StopPollP) (Poll, error) { - req := NewRequest[Poll]("stopPoll", params) + req := NewRequestWithChatID[Poll]("stopPoll", params, params.ChatID) return req.Do(api) } type ApproveSuggestedPostP struct { - ChatID int `json:"chat_id"` - MessageID int `json:"message_id"` - SendDate int `json:"send_date,omitempty"` + ChatID int64 `json:"chat_id"` + MessageID int `json:"message_id"` + SendDate int `json:"send_date,omitempty"` } func (api *API) ApproveSuggestedPost(params ApproveSuggestedPostP) (bool, error) { - req := NewRequest[bool]("approveSuggestedPost", params) + req := NewRequestWithChatID[bool]("approveSuggestedPost", params, params.ChatID) return req.Do(api) } type DeclineSuggestedPostP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageID int `json:"message_id"` Comment string `json:"comment,omitempty"` } func (api *API) DeclineSuggestedPost(params DeclineSuggestedPostP) (bool, error) { - req := NewRequest[bool]("declineSuggestedPost", params) + req := NewRequestWithChatID[bool]("declineSuggestedPost", params, params.ChatID) return req.Do(api) } @@ -500,17 +500,17 @@ type DeleteMessageP struct { } func (api *API) DeleteMessage(params DeleteMessageP) (bool, error) { - req := NewRequest[bool]("deleteMessage", params) + req := NewRequestWithChatID[bool]("deleteMessage", params, params.ChatID) return req.Do(api) } type DeleteMessagesP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageIDs []int `json:"message_ids"` } func (api *API) DeleteMessages(params DeleteMessagesP) (bool, error) { - req := NewRequest[bool]("deleteMessages", params) + req := NewRequestWithChatID[bool]("deleteMessages", params, params.ChatID) return req.Do(api) } diff --git a/tgapi/pool.go b/tgapi/pool.go index 79a126e..774938a 100644 --- a/tgapi/pool.go +++ b/tgapi/pool.go @@ -2,91 +2,141 @@ package tgapi import ( "context" - "errors" "sync" ) -var ErrPoolQueueFull = errors.New("worker pool queue full") - -type RequestEnvelope struct { - DoFunc func(context.Context) (any, error) // функция, которая выполнит запрос и вернет any - ResultCh chan RequestResult // канал для результата -} -type RequestResult struct { - Value any - Err error +// workerPool — приватная структура, управляющая пулом воркеров. +// Внешний код не может создавать или напрямую взаимодействовать с этой структурой. +// Используется только через экспортируемые методы newWorkerPool, start, stop, submit. +type workerPool struct { + taskCh chan requestEnvelope // канал для принятия задач (буферизованный) + queueSize int // максимальный размер очереди + workers int // количество воркеров (горутин) + wg sync.WaitGroup // синхронизирует завершение всех воркеров при остановке + quit chan struct{} // канал для сигнала остановки + started bool // флаг, указывающий, запущен ли пул + startedMu sync.Mutex // мьютекс для безопасного доступа к started } -// WorkerPool управляет воркерами и очередью -type WorkerPool struct { - taskCh chan RequestEnvelope - queueSize int - workers int - wg sync.WaitGroup - quit chan struct{} - started bool - startedMu sync.Mutex +// requestEnvelope — приватная структура, инкапсулирующая задачу и канал для результата. +// Используется только внутри пакета для передачи задач воркерам. +type requestEnvelope struct { + doFunc func(context.Context) (any, error) // функция, выполняющая запрос + resultCh chan requestResult // канал, через который воркер вернёт результат } -func NewWorkerPool(workers int, queueSize int) *WorkerPool { - return &WorkerPool{ - taskCh: make(chan RequestEnvelope, queueSize), +// requestResult — приватная структура, представляющая результат выполнения задачи. +// Внешний код получает его через канал, но не знает структуры — только через <-chan requestResult. +type requestResult struct { + value any // значение, возвращённое задачей + err error // ошибка, если возникла +} + +// newWorkerPool создаёт новый пул воркеров с заданным количеством горутин и размером очереди. +// Это единственный способ создать workerPool — внешний код не может создать его напрямую. +func newWorkerPool(workers int, queueSize int) *workerPool { + if workers <= 0 { + workers = 1 // защита от некорректных значений + } + if queueSize <= 0 { + queueSize = 100 // разумный дефолт + } + + return &workerPool{ + taskCh: make(chan requestEnvelope, queueSize), queueSize: queueSize, workers: workers, quit: make(chan struct{}), } } -// Start запускает воркеров -func (p *WorkerPool) Start(ctx context.Context) { +// start запускает воркеры (горутины), которые будут обрабатывать задачи из очереди. +// Метод идемпотентен: если пул уже запущен — ничего не делает. +// Должен вызываться перед первым вызовом submit. +func (p *workerPool) start(ctx context.Context) { p.startedMu.Lock() defer p.startedMu.Unlock() if p.started { - return + return // уже запущен — ничего не делаем } p.started = true + + // Запускаем воркеры — каждый будет обрабатывать задачи в бесконечном цикле for i := 0; i < p.workers; i++ { p.wg.Add(1) - go p.worker(ctx) + go p.worker(ctx) // запускаем горутину с контекстом } } -// Stop останавливает пул (ждет завершения текущих задач) -func (p *WorkerPool) Stop() { - close(p.quit) - p.wg.Wait() +// stop останавливает пул воркеров. +// Отправляет сигнал остановки через quit-канал и ждёт завершения всех активных задач. +// Безопасно вызывать многократно — после остановки повторные вызовы не имеют эффекта. +func (p *workerPool) stop() { + close(p.quit) // сигнал для всех воркеров — выйти из цикла + p.wg.Wait() // ждём, пока все воркеры завершатся } -// Submit отправляет задачу в очередь и возвращает канал для результата -func (p *WorkerPool) Submit(ctx context.Context, do func(context.Context) (any, error)) (<-chan RequestResult, error) { +// submit отправляет задачу в очередь и возвращает канал, через который будет получен результат. +// Если очередь переполнена — возвращает ErrPoolQueueFull. +// Канал результата имеет буфер 1, чтобы не блокировать воркера при записи. +// Контекст используется для отмены задачи, если клиент отменил запрос до отправки. +func (p *workerPool) submit(ctx context.Context, do func(context.Context) (any, error)) (<-chan requestResult, error) { + // Проверяем, не превышена ли очередь if len(p.taskCh) >= p.queueSize { return nil, ErrPoolQueueFull } - resultCh := make(chan RequestResult, 1) // буфер 1, чтобы не блокировать воркера - envelope := RequestEnvelope{do, resultCh} + // Создаём канал для результата — буферизованный, чтобы не блокировать воркера + resultCh := make(chan requestResult, 1) + + // Создаём обёртку задачи + envelope := requestEnvelope{ + doFunc: do, + resultCh: resultCh, + } + + // Пытаемся отправить задачу в очередь select { case <-ctx.Done(): + // Клиент отменил операцию до отправки — возвращаем ошибку отмены return nil, ctx.Err() case p.taskCh <- envelope: + // Успешно отправлено — возвращаем канал для чтения результата return resultCh, nil default: + // Очередь переполнена — не должно происходить при проверке len(p.taskCh), но на всякий случай return nil, ErrPoolQueueFull } } -// worker выполняет задачи -func (p *WorkerPool) worker(ctx context.Context) { - defer p.wg.Done() +// worker — приватная горутина, выполняющая задачи из очереди. +// Каждый воркер работает в бесконечном цикле, пока не получит сигнал остановки. +// При получении задачи: +// - вызывает doFunc с контекстом +// - записывает результат в resultCh +// - закрывает канал, чтобы клиент мог прочитать и завершить +// +// После закрытия quit-канала — воркер завершает работу. +func (p *workerPool) worker(ctx context.Context) { + defer p.wg.Done() // уменьшаем WaitGroup при завершении горутины + for { select { case <-p.quit: + // Получен сигнал остановки — выходим из цикла return + case envelope := <-p.taskCh: - // Выполняем задачу с переданным контекстом (или можно использовать свой) - val, err := envelope.DoFunc(ctx) - envelope.ResultCh <- RequestResult{Value: val, Err: err} - close(envelope.ResultCh) + // Выполняем задачу с переданным контекстом (клиентский или общий) + value, err := envelope.doFunc(ctx) + + // Записываем результат в канал — не блокируем, т.к. буфер 1 + envelope.resultCh <- requestResult{ + value: value, + err: err, + } + // Закрываем канал — клиент знает, что результат пришёл и больше не будет + close(envelope.resultCh) } } } diff --git a/tgapi/stickers_methods.go b/tgapi/stickers_methods.go index abf6390..152bf56 100644 --- a/tgapi/stickers_methods.go +++ b/tgapi/stickers_methods.go @@ -2,7 +2,7 @@ package tgapi type SendStickerP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -15,7 +15,7 @@ type SendStickerP struct { } func (api *API) SendSticker(params SendStickerP) (Message, error) { - req := NewRequest[Message]("sendSticker", params) + req := NewRequestWithChatID[Message]("sendSticker", params, params.ChatID) return req.Do(api) } diff --git a/tgapi/uploader_api.go b/tgapi/uploader_api.go index 0b4d2a5..e7184ff 100644 --- a/tgapi/uploader_api.go +++ b/tgapi/uploader_api.go @@ -8,6 +8,7 @@ import ( "mime/multipart" "net/http" "path/filepath" + "time" "git.nix13.pw/scuroneko/laniakea/utils" "git.nix13.pw/scuroneko/slog" @@ -59,24 +60,17 @@ type UploaderRequest[R, P any] struct { method string files []UploaderFile params P + chatId int64 } func NewUploaderRequest[R, P any](method string, params P, files ...UploaderFile) UploaderRequest[R, P] { - return UploaderRequest[R, P]{method, files, params} + return UploaderRequest[R, P]{method: method, files: files, params: params, chatId: 0} +} +func NewUploaderRequestWithChatID[R, P any](method string, params P, chatId int64, files ...UploaderFile) UploaderRequest[R, P] { + return UploaderRequest[R, P]{method: method, files: files, params: params, chatId: chatId} } func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R, error) { var zero R - if up.api.Limiter != nil { - if up.api.dropOverflowLimit { - if !up.api.Limiter.GlobalAllow() { - return zero, errors.New("rate limited") - } - } else { - if err := up.api.Limiter.GlobalWait(ctx); err != nil { - return zero, err - } - } - } buf, contentType, err := prepareMultipart(r.files, r.params) if err != nil { @@ -95,25 +89,58 @@ func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R, req.Header.Set("Content-Type", contentType) req.Header.Set("Accept", "application/json") req.Header.Set("User-Agent", fmt.Sprintf("Laniakea/%s", utils.VersionString)) + req.Header.Set("Accept-Encoding", "gzip") + req.ContentLength = int64(buf.Len()) - up.logger.Debugln("UPLOADER REQ", r.method) - res, err := up.api.client.Do(req) - if err != nil { - return zero, err - } - defer res.Body.Close() + for { + if up.api.Limiter != nil { + if up.api.dropOverflowLimit { + if !up.api.Limiter.GlobalAllow() { + return zero, errors.New("rate limited") + } + } else { + if err := up.api.Limiter.GlobalWait(ctx); err != nil { + return zero, err + } + } + } - body, err := readBody(res.Body) - up.logger.Debugln("UPLOADER RES", r.method, string(body)) - if res.StatusCode != http.StatusOK { - return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(body)) - } + up.logger.Debugln("UPLOADER REQ", r.method) + resp, err := up.api.client.Do(req) + if err != nil { + return zero, err + } - respBody, err := parseBody[R](body) - if err != nil { - return zero, err + body, err := readBody(resp.Body) + _ = resp.Body.Close() + up.logger.Debugln("UPLOADER RES", r.method, string(body)) + + response, err := parseBody[R](body) + if err != nil { + return zero, err + } + + if !response.Ok { + if response.ErrorCode == 429 && response.Parameters != nil && response.Parameters.RetryAfter != nil { + after := *response.Parameters.RetryAfter + up.logger.Warnf("Rate limited, retry after %d seconds (chat: %d)", after, r.chatId) + if r.chatId > 0 { + up.api.Limiter.SetChatLock(r.chatId, after) + } else { + up.api.Limiter.SetGlobalLock(after) + } + + select { + case <-ctx.Done(): + return zero, ctx.Err() + case <-time.After(time.Duration(after) * time.Second): + continue // Повторяем запрос + } + } + return zero, fmt.Errorf("[%d] %s", response.ErrorCode, response.Description) + } + return response.Result, nil } - return respBody.Result, nil } func (r UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) { var zero R @@ -149,24 +176,29 @@ func prepareMultipart[P any](files []UploaderFile, params P) (*bytes.Buffer, str for _, file := range files { fw, err := w.CreateFormFile(string(file.field), file.filename) if err != nil { - _ = w.Close() - return buf, w.FormDataContentType(), err + _ = w.Close() // Закрываем, чтобы не было утечки + return nil, "", err } _, err = fw.Write(file.data) if err != nil { _ = w.Close() - return buf, w.FormDataContentType(), err + return nil, "", err } } - err := utils.Encode(w, params) + err := utils.Encode(w, params) // Предполагается, что это записывает в w if err != nil { _ = w.Close() - return buf, w.FormDataContentType(), err + return nil, "", err } - err = w.Close() - return buf, w.FormDataContentType(), err + + err = w.Close() // ✅ ОБЯЗАТЕЛЬНО вызвать в конце — иначе запрос битый! + if err != nil { + return nil, "", err + } + + return buf, w.FormDataContentType(), nil } func uploaderTypeByExt(filename string) UploaderFileType { diff --git a/tgapi/uploader_methods.go b/tgapi/uploader_methods.go index 8a9520e..30931fd 100644 --- a/tgapi/uploader_methods.go +++ b/tgapi/uploader_methods.go @@ -23,13 +23,13 @@ type UploadPhotoP struct { } func (u *Uploader) UploadPhoto(params UploadPhotoP, file UploaderFile) (Message, error) { - req := NewUploaderRequest[Message]("sendPhoto", params, file) + req := NewUploaderRequestWithChatID[Message]("sendPhoto", params, params.ChatID, file) return req.Do(u) } type UploadAudioP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -52,13 +52,13 @@ type UploadAudioP struct { } func (u *Uploader) UploadAudio(params UploadAudioP, files ...UploaderFile) (Message, error) { - req := NewUploaderRequest[Message]("sendAudio", params, files...) + req := NewUploaderRequestWithChatID[Message]("sendAudio", params, params.ChatID, files...) return req.Do(u) } type UploadDocumentP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -84,7 +84,7 @@ func (u *Uploader) UploadDocument(params UploadDocumentP, files ...UploaderFile) type UploadVideoP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -117,7 +117,7 @@ func (u *Uploader) UploadVideo(params UploadVideoP, files ...UploaderFile) (Mess type UploadAnimationP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -148,7 +148,7 @@ func (u *Uploader) UploadAnimation(params UploadAnimationP, files ...UploaderFil type UploadVoiceP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -174,7 +174,7 @@ func (u *Uploader) UploadVoice(params UploadVoiceP, files ...UploaderFile) (Mess type UploadVideoNoteP struct { BusinessConnectionID string `json:"business_connection_id,omitempty"` - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` MessageThreadID int `json:"message_thread_id,omitempty"` DirectMessagesTopicID int `json:"direct_messages_topic_id,omitempty"` @@ -197,7 +197,7 @@ func (u *Uploader) UploadVideoNote(params UploadVideoNoteP, files ...UploaderFil } type UploadChatPhotoP struct { - ChatID int `json:"chat_id"` + ChatID int64 `json:"chat_id"` } func (u *Uploader) UploadChatPhoto(params UploadChatPhotoP, photo UploaderFile) (Message, error) { diff --git a/utils/limiter.go b/utils/limiter.go index 50cf27a..65ce2c6 100644 --- a/utils/limiter.go +++ b/utils/limiter.go @@ -2,30 +2,42 @@ package utils import ( "context" + "errors" "sync" "time" "golang.org/x/time/rate" ) -type RateLimiter struct { - globalLockUntil time.Time - globalLimiter *rate.Limiter - globalMu sync.RWMutex +var ErrDropOverflow = errors.New("drop overflow limit") - chatLocks map[int64]time.Time - chatLimiters map[int64]*rate.Limiter - chatMu sync.Mutex +// RateLimiter implements per-chat and global rate limiting with optional blocking. +// It supports two modes: +// - "drop" mode: immediately reject if limits are exceeded. +// - "wait" mode: block until capacity is available. +type RateLimiter struct { + globalLockUntil time.Time // global cooldown timestamp (set by API errors) + globalLimiter *rate.Limiter // global token bucket (30 req/sec) + globalMu sync.RWMutex // protects globalLockUntil and globalLimiter + + chatLocks map[int64]time.Time // per-chat cooldown timestamps + chatLimiters map[int64]*rate.Limiter // per-chat token buckets (1 req/sec) + chatMu sync.Mutex // protects chatLocks and chatLimiters } +// NewRateLimiter creates a new RateLimiter with default limits. +// Global: 30 requests per second, burst 30. +// Per-chat: 1 request per second, burst 1. func NewRateLimiter() *RateLimiter { return &RateLimiter{ globalLimiter: rate.NewLimiter(30, 30), chatLimiters: make(map[int64]*rate.Limiter), - chatLocks: make(map[int64]time.Time), // инициализация! + chatLocks: make(map[int64]time.Time), } } +// SetGlobalLock sets a global cooldown period (e.g., after receiving 429 from Telegram). +// If retryAfter <= 0, no lock is applied. func (rl *RateLimiter) SetGlobalLock(retryAfter int) { if retryAfter <= 0 { return @@ -35,6 +47,8 @@ func (rl *RateLimiter) SetGlobalLock(retryAfter int) { rl.globalLockUntil = time.Now().Add(time.Duration(retryAfter) * time.Second) } +// SetChatLock sets a cooldown for a specific chat (e.g., after 429 for that chat). +// If retryAfter <= 0, no lock is applied. func (rl *RateLimiter) SetChatLock(chatID int64, retryAfter int) { if retryAfter <= 0 { return @@ -44,32 +58,31 @@ func (rl *RateLimiter) SetChatLock(chatID int64, retryAfter int) { rl.chatLocks[chatID] = time.Now().Add(time.Duration(retryAfter) * time.Second) } -// GlobalWait блокирует до возможности сделать глобальный запрос. +// GlobalWait blocks until a global request can be made. +// Waits for both global cooldown and token bucket availability. func (rl *RateLimiter) GlobalWait(ctx context.Context) error { - // Ждём окончания глобальной блокировки, если она есть if err := rl.waitForGlobalUnlock(ctx); err != nil { return err } - // Ждём разрешения rate limiter'а return rl.globalLimiter.Wait(ctx) } -// Wait блокирует до возможности сделать запрос для конкретного чата. +// Wait blocks until a request for the given chat can be made. +// Waits for: chat cooldown → global cooldown → chat token bucket. +// Note: Global limit is checked *before* chat limit to avoid overloading upstream. func (rl *RateLimiter) Wait(ctx context.Context, chatID int64) error { - // Ждём окончания блокировки чата if err := rl.waitForChatUnlock(ctx, chatID); err != nil { return err } - // Затем глобальной блокировки if err := rl.waitForGlobalUnlock(ctx); err != nil { return err } - // Получаем или создаём лимитер для чата limiter := rl.getChatLimiter(chatID) return limiter.Wait(ctx) } -// GlobalAllow неблокирующая проверка глобального запроса. +// GlobalAllow checks if a global request can be made without blocking. +// Returns false if either global cooldown is active or token bucket is exhausted. func (rl *RateLimiter) GlobalAllow() bool { rl.globalMu.RLock() until := rl.globalLockUntil @@ -81,9 +94,11 @@ func (rl *RateLimiter) GlobalAllow() bool { return rl.globalLimiter.Allow() } -// Allow неблокирующая проверка запроса для чата. +// Allow checks if a request for the given chat can be made without blocking. +// Returns false if: global cooldown, chat cooldown, global limiter, or chat limiter denies. +// Note: Global limiter is checked before chat limiter — upstream limits take priority. func (rl *RateLimiter) Allow(chatID int64) bool { - // Проверяем глобальную блокировку + // Check global cooldown rl.globalMu.RLock() globalUntil := rl.globalLockUntil rl.globalMu.RUnlock() @@ -91,7 +106,7 @@ func (rl *RateLimiter) Allow(chatID int64) bool { return false } - // Проверяем блокировку чата + // Check chat cooldown rl.chatMu.Lock() chatUntil, ok := rl.chatLocks[chatID] rl.chatMu.Unlock() @@ -99,18 +114,47 @@ func (rl *RateLimiter) Allow(chatID int64) bool { return false } - // Проверяем глобальный лимитер + // Check global token bucket if !rl.globalLimiter.Allow() { return false } - // Проверяем лимитер чата + // Check chat token bucket limiter := rl.getChatLimiter(chatID) return limiter.Allow() } -// Внутренние вспомогательные методы +// Check applies rate limiting based on configuration. +// If dropOverflow is true: +// - Immediately returns ErrDropOverflow if either global or chat limit is exceeded. +// +// Else: +// - If chatID != 0: waits for chat-specific capacity (including global limit). +// - If chatID == 0: waits for global capacity only. +// +// chatID == 0 means no specific chat context (e.g., inline query, webhook without chat). +func (rl *RateLimiter) Check(ctx context.Context, dropOverflow bool, chatID int64) error { + if dropOverflow { + if chatID != 0 && !rl.Allow(chatID) { + return ErrDropOverflow + } + if !rl.GlobalAllow() { + return ErrDropOverflow + } + } else if chatID != 0 { + if err := rl.Wait(ctx, chatID); err != nil { + return err + } + } else { + if err := rl.GlobalWait(ctx); err != nil { + return err + } + } + return nil +} +// waitForGlobalUnlock blocks until global cooldown expires or context is done. +// Does not check token bucket — only cooldown. func (rl *RateLimiter) waitForGlobalUnlock(ctx context.Context) error { rl.globalMu.RLock() until := rl.globalLockUntil @@ -119,6 +163,7 @@ func (rl *RateLimiter) waitForGlobalUnlock(ctx context.Context) error { if until.IsZero() || time.Now().After(until) { return nil } + select { case <-time.After(time.Until(until)): return nil @@ -127,6 +172,8 @@ func (rl *RateLimiter) waitForGlobalUnlock(ctx context.Context) error { } } +// waitForChatUnlock blocks until the specified chat's cooldown expires or context is done. +// Does not check token bucket — only cooldown. func (rl *RateLimiter) waitForChatUnlock(ctx context.Context, chatID int64) error { rl.chatMu.Lock() until, ok := rl.chatLocks[chatID] @@ -135,6 +182,7 @@ func (rl *RateLimiter) waitForChatUnlock(ctx context.Context, chatID int64) erro if !ok || until.IsZero() || time.Now().After(until) { return nil } + select { case <-time.After(time.Until(until)): return nil @@ -143,13 +191,14 @@ func (rl *RateLimiter) waitForChatUnlock(ctx context.Context, chatID int64) erro } } +// getChatLimiter returns the rate limiter for the given chat, creating it if needed. +// Uses 1 request per second with burst of 1 — conservative for per-user limits. +// Must be called with rl.chatMu held. func (rl *RateLimiter) getChatLimiter(chatID int64) *rate.Limiter { - rl.chatMu.Lock() - defer rl.chatMu.Unlock() if lim, ok := rl.chatLimiters[chatID]; ok { return lim } - lim := rate.NewLimiter(1, 1) // 1 запрос/сек + lim := rate.NewLimiter(1, 1) rl.chatLimiters[chatID] = lim return lim } diff --git a/utils/multipart.go b/utils/multipart.go index 89d2fc7..0dfdac0 100644 --- a/utils/multipart.go +++ b/utils/multipart.go @@ -1,7 +1,6 @@ package utils import ( - "encoding/json" "fmt" "io" "mime/multipart" @@ -26,33 +25,31 @@ func Encode[T any](w *multipart.Writer, req T) error { field := v.Field(i) fieldType := t.Field(i) - formTags := strings.Split(fieldType.Tag.Get("json"), ",") - fieldName := "" - if len(formTags) == 0 { - formTags = strings.Split(fieldType.Tag.Get("json"), ",") + jsonTag := fieldType.Tag.Get("json") + if jsonTag == "" { + jsonTag = fieldType.Name } - if len(formTags) > 0 { - fieldName = formTags[0] - if fieldName == "-" { - continue - } - if slices.Index(formTags, "omitempty") >= 0 { - if field.IsZero() { - continue - } - } - } else { - fieldName = strings.ToLower(fieldType.Name) + parts := strings.Split(jsonTag, ",") + fieldName := parts[0] + if fieldName == "-" { + continue + } + + // Handle omitempty + isEmpty := field.IsZero() + if slices.Contains(parts, "omitempty") && isEmpty { + continue } var ( fw io.Writer err error ) + switch field.Kind() { case reflect.String: - if field.String() != "" { + if !isEmpty { fw, err = w.CreateFormField(fieldName) if err == nil { _, err = fw.Write([]byte(field.String())) @@ -80,45 +77,47 @@ func Encode[T any](w *multipart.Writer, req T) error { } case reflect.Slice: if field.Type().Elem().Kind() == reflect.Uint8 && !field.IsNil() { + // Handle []byte as file upload (e.g., thumbnail) filename := fieldType.Tag.Get("filename") if filename == "" { filename = fieldName } - - ext := "" - filename = filename + ext - fw, err = w.CreateFormFile(fieldName, filename) if err == nil { _, err = fw.Write(field.Bytes()) } } else if !field.IsNil() { - // Handle slice of primitive values (as multiple form fields with the same name) + // Handle []string, []int, etc. — send as multiple fields with same name for j := 0; j < field.Len(); j++ { elem := field.Index(j) fw, err = w.CreateFormField(fieldName) - if err == nil { - switch elem.Kind() { - case reflect.String: - _, err = fw.Write([]byte(elem.String())) - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - _, err = fw.Write([]byte(strconv.FormatInt(elem.Int(), 10))) - case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: - _, err = fw.Write([]byte(strconv.FormatUint(elem.Uint(), 10))) - } + if err != nil { + break + } + switch elem.Kind() { + case reflect.String: + _, err = fw.Write([]byte(elem.String())) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + _, err = fw.Write([]byte(strconv.FormatInt(elem.Int(), 10))) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + _, err = fw.Write([]byte(strconv.FormatUint(elem.Uint(), 10))) + case reflect.Bool: + _, err = fw.Write([]byte(strconv.FormatBool(elem.Bool()))) + case reflect.Float32, reflect.Float64: + _, err = fw.Write([]byte(strconv.FormatFloat(elem.Float(), 'f', -1, 64))) + } + if err != nil { + break } } } case reflect.Struct: - var jsonData []byte - jsonData, err = json.Marshal(field.Interface()) - if err == nil { - fw, err = w.CreateFormField(fieldName) - if err == nil { - _, err = fw.Write(jsonData) - } - } + // Don't serialize structs as JSON — flatten them! + // Telegram doesn't support nested JSON in form-data. + // If you need nested data, use separate fields (e.g., ParseMode, CaptionEntities) + // This is a design choice — you should avoid nested structs in params. + return fmt.Errorf("nested structs are not supported in params — use flat fields") } if err != nil { diff --git a/utils/utils.go b/utils/utils.go index 9d0cbac..2727c05 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -1,7 +1,6 @@ package utils import ( - "fmt" "os" "strings" @@ -16,6 +15,7 @@ func GetLoggerLevel() slog.LogLevel { return level } +// EscapeMarkdown Deprecated. Use MarkdownV2 func EscapeMarkdown(s string) string { s = strings.ReplaceAll(s, "_", `\_`) s = strings.ReplaceAll(s, "*", `\*`) @@ -23,10 +23,20 @@ func EscapeMarkdown(s string) string { return strings.ReplaceAll(s, "`", "\\`") } +// EscapeHTML escapes special characters for Telegram HTML parse mode. +func EscapeHTML(s string) string { + s = strings.ReplaceAll(s, "&", "&") + s = strings.ReplaceAll(s, "<", "<") + s = strings.ReplaceAll(s, ">", ">") + return s +} + +// EscapeMarkdownV2 escapes special characters for Telegram MarkdownV2. +// https://core.telegram.org/bots/api#markdownv2-style func EscapeMarkdownV2(s string) string { - symbols := []string{"_", "*", "[", "]", "(", ")", "~", "`", ">", "#", "+", "-", "=", "|", "{", "}", ".", "!"} + symbols := []string{"_", "*", "[", "]", "(", ")", "~", "`", ">", "#", "+", "-", "=", "|", "{", "}", ".", "!", "\\"} for _, symbol := range symbols { - s = strings.ReplaceAll(s, symbol, fmt.Sprintf("\\%s", symbol)) + s = strings.ReplaceAll(s, symbol, "\\"+symbol) } return s } diff --git a/utils/version.go b/utils/version.go index ab7dbf0..5ad969a 100644 --- a/utils/version.go +++ b/utils/version.go @@ -1,9 +1,9 @@ package utils const ( - VersionString = "1.0.0-beta.8" + VersionString = "1.0.0-beta.9" VersionMajor = 1 VersionMinor = 0 VersionPatch = 0 - Beta = 8 + Beta = 9 )