From 28ec2b7ca9ea91bd1d3f1ae4ba28b99be3db613e Mon Sep 17 00:00:00 2001 From: ScuroNeko Date: Thu, 26 Feb 2026 14:31:03 +0300 Subject: [PATCH] 0.8.0 beta 4 --- bot.go | 61 +++++++++++++++++++--------- go.mod | 2 + go.sum | 4 ++ tgapi/api.go | 67 +++++++++++++++++++++++++++++-- tgapi/pool.go | 92 +++++++++++++++++++++++++++++++++++++++++++ tgapi/uploader_api.go | 49 +++++++++++++++++++---- utils/version.go | 4 +- 7 files changed, 248 insertions(+), 31 deletions(-) create mode 100644 tgapi/pool.go diff --git a/bot.go b/bot.go index efb9296..8a1a8e6 100644 --- a/bot.go +++ b/bot.go @@ -4,38 +4,58 @@ import ( "fmt" "os" "sort" + "strconv" "strings" "time" "git.nix13.pw/scuroneko/extypes" "git.nix13.pw/scuroneko/laniakea/tgapi" "git.nix13.pw/scuroneko/slog" + "golang.org/x/time/rate" ) type BotOpts struct { - Token string - Debug bool - ErrorTemplate string - Prefixes []string - UpdateTypes []string + Token string + UpdateTypes []string + + Debug bool + ErrorTemplate string + Prefixes []string + LoggerBasePath string UseRequestLogger bool WriteToFile bool - UseTestServer bool - APIUrl string + + UseTestServer bool + APIUrl string + + RateLimit int + DropRLOverflow bool } +func NewOpts() *BotOpts { return new(BotOpts) } func LoadOptsFromEnv() *BotOpts { + rateLimit := 30 + if rl := os.Getenv("RATE_LIMIT"); rl != "" { + rateLimit, _ = strconv.Atoi(rl) + } + return &BotOpts{ - Token: os.Getenv("TG_TOKEN"), - Debug: os.Getenv("DEBUG") == "true", - ErrorTemplate: os.Getenv("ERROR_TEMPLATE"), - Prefixes: LoadPrefixesFromEnv(), - UpdateTypes: strings.Split(os.Getenv("UPDATE_TYPES"), ";"), + Token: os.Getenv("TG_TOKEN"), + UpdateTypes: strings.Split(os.Getenv("UPDATE_TYPES"), ";"), + + Debug: os.Getenv("DEBUG") == "true", + ErrorTemplate: os.Getenv("ERROR_TEMPLATE"), + Prefixes: LoadPrefixesFromEnv(), + UseRequestLogger: os.Getenv("USE_REQ_LOG") == "true", WriteToFile: os.Getenv("WRITE_TO_FILE") == "true", - UseTestServer: os.Getenv("USE_TEST_SERVER") == "true", - APIUrl: os.Getenv("API_URL"), + + UseTestServer: os.Getenv("USE_TEST_SERVER") == "true", + APIUrl: os.Getenv("API_URL"), + + RateLimit: rateLimit, + DropRLOverflow: os.Getenv("DROP_RL_OVERFLOW") == "true", } } func LoadPrefixesFromEnv() []string { @@ -55,6 +75,7 @@ type Bot[T DbContext] struct { logger *slog.Logger RequestLogger *slog.Logger + extraLoggers extypes.Slice[*slog.Logger] plugins []Plugin[T] middlewares []Middleware[T] @@ -66,8 +87,6 @@ type Bot[T DbContext] struct { dbContext *T l10n *L10n - extraLoggers extypes.Slice[*slog.Logger] - updateOffset int updateTypes []tgapi.UpdateType updateQueue *extypes.Queue[*tgapi.Update] @@ -76,7 +95,12 @@ type Bot[T DbContext] struct { func NewBot[T any](opts *BotOpts) *Bot[T] { updateQueue := extypes.CreateQueue[*tgapi.Update](512) - apiOpts := tgapi.NewAPIOpts(opts.Token).SetAPIUrl(opts.APIUrl).UseTestServer(opts.UseTestServer) + var limiter *rate.Limiter + if opts.RateLimit > 0 { + limiter = rate.NewLimiter(rate.Limit(opts.RateLimit), opts.RateLimit) + } + + apiOpts := tgapi.NewAPIOpts(opts.Token).SetAPIUrl(opts.APIUrl).UseTestServer(opts.UseTestServer).SetLimiter(limiter) api := tgapi.NewAPI(apiOpts) uploader := tgapi.NewUploader(api) @@ -108,8 +132,7 @@ func NewBot[T any](opts *BotOpts) *Bot[T] { u, err := api.GetMe() if err != nil { - _ = api.CloseApi() - _ = uploader.Close() + _ = bot.Close() bot.logger.Fatal(err) } bot.logger.Infof("Authorized as %s\n", u.FirstName) diff --git a/go.mod b/go.mod index 30d247f..f6a0218 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,11 @@ go 1.26 require ( git.nix13.pw/scuroneko/extypes v1.2.0 git.nix13.pw/scuroneko/slog v1.0.2 + golang.org/x/time v0.14.0 ) require ( + github.com/alitto/pond/v2 v2.6.2 // indirect github.com/fatih/color v1.18.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index 6e6f7ae..ac69590 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ git.nix13.pw/scuroneko/extypes v1.2.0 h1:2n2hD6KsMAted+6MGhAyeWyli2Qzc9G2y+pQNB7 git.nix13.pw/scuroneko/extypes v1.2.0/go.mod h1:uZVs8Yo3RrYAG9dMad6qR6lsYY67t+459D9c65QAYAw= git.nix13.pw/scuroneko/slog v1.0.2 h1:vZyUROygxC2d5FJHUQM/30xFEHY1JT/aweDZXA4rm2g= git.nix13.pw/scuroneko/slog v1.0.2/go.mod h1:3Qm2wzkR5KjwOponMfG7TcGSDjmYaFqRAmLvSPTuWJI= +github.com/alitto/pond/v2 v2.6.2 h1:Sphe40g0ILeM1pA2c2K+Th0DGU+pt0A/Kprr+WB24Pw= +github.com/alitto/pond/v2 v2.6.2/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= @@ -11,3 +13,5 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= diff --git a/tgapi/api.go b/tgapi/api.go index 78876b6..8422b20 100644 --- a/tgapi/api.go +++ b/tgapi/api.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -11,6 +12,7 @@ import ( "git.nix13.pw/scuroneko/laniakea/utils" "git.nix13.pw/scuroneko/slog" + "golang.org/x/time/rate" ) type APIOpts struct { @@ -18,8 +20,13 @@ type APIOpts struct { client *http.Client useTestServer bool apiUrl string + + limiter *rate.Limiter + dropOverflowLimit bool } +var ErrPoolUnexpected = errors.New("unexpected response from pool") + func NewAPIOpts(token string) *APIOpts { return &APIOpts{token: token, client: nil, useTestServer: false, apiUrl: "https://api.telegram.org"} } @@ -39,6 +46,14 @@ func (opts *APIOpts) SetAPIUrl(apiUrl string) *APIOpts { } return opts } +func (opts *APIOpts) SetLimiter(limiter *rate.Limiter) *APIOpts { + opts.limiter = limiter + return opts +} +func (opts *APIOpts) SetLimiterDrop(b bool) *APIOpts { + opts.dropOverflowLimit = b + return opts +} type API struct { token string @@ -46,6 +61,10 @@ type API struct { logger *slog.Logger useTestServer bool apiUrl string + + pool *WorkerPool + limiter *rate.Limiter + dropOverflowLimit bool } func NewAPI(opts *APIOpts) *API { @@ -55,9 +74,18 @@ func NewAPI(opts *APIOpts) *API { if client == nil { client = &http.Client{Timeout: time.Second * 45} } - return &API{opts.token, client, l, opts.useTestServer, opts.apiUrl} + pool := NewWorkerPool(16, 256) + pool.Start(context.Background()) + return &API{ + opts.token, client, l, + opts.useTestServer, opts.apiUrl, + pool, opts.limiter, opts.dropOverflowLimit, + } +} +func (api *API) CloseApi() error { + api.pool.Stop() + return api.logger.Close() } -func (api *API) CloseApi() error { return api.logger.Close() } func (api *API) GetLogger() *slog.Logger { return api.logger } type ApiResponse[R any] struct { @@ -74,8 +102,20 @@ type TelegramRequest[R, P any] struct { func NewRequest[R, P any](method string, params P) TelegramRequest[R, P] { return TelegramRequest[R, P]{method: method, params: params} } -func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) { +func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, error) { var zero R + if api.limiter != nil { + if api.dropOverflowLimit { + if !api.limiter.Allow() { + return zero, errors.New("rate limited") + } + } else { + if err := api.limiter.Wait(ctx); err != nil { + return zero, err + } + } + } + data, err := json.Marshal(r.params) if err != nil { return zero, err @@ -113,7 +153,28 @@ func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(data)) } return parseBody[R](data) +} +func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) { + var zero R + result, err := api.pool.Submit(ctx, func(ctx context.Context) (any, error) { + return r.doRequest(ctx, api) + }) + if err != nil { + return zero, err + } + select { + case <-ctx.Done(): + return zero, ctx.Err() + case res := <-result: + if res.Err != nil { + return zero, res.Err + } + if val, ok := res.Value.(R); ok { + return val, nil + } + return zero, ErrPoolUnexpected + } } func (r TelegramRequest[R, P]) Do(api *API) (R, error) { return r.DoWithContext(context.Background(), api) diff --git a/tgapi/pool.go b/tgapi/pool.go new file mode 100644 index 0000000..79a126e --- /dev/null +++ b/tgapi/pool.go @@ -0,0 +1,92 @@ +package tgapi + +import ( + "context" + "errors" + "sync" +) + +var ErrPoolQueueFull = errors.New("worker pool queue full") + +type RequestEnvelope struct { + DoFunc func(context.Context) (any, error) // функция, которая выполнит запрос и вернет any + ResultCh chan RequestResult // канал для результата +} +type RequestResult struct { + Value any + Err error +} + +// WorkerPool управляет воркерами и очередью +type WorkerPool struct { + taskCh chan RequestEnvelope + queueSize int + workers int + wg sync.WaitGroup + quit chan struct{} + started bool + startedMu sync.Mutex +} + +func NewWorkerPool(workers int, queueSize int) *WorkerPool { + return &WorkerPool{ + taskCh: make(chan RequestEnvelope, queueSize), + queueSize: queueSize, + workers: workers, + quit: make(chan struct{}), + } +} + +// Start запускает воркеров +func (p *WorkerPool) Start(ctx context.Context) { + p.startedMu.Lock() + defer p.startedMu.Unlock() + if p.started { + return + } + p.started = true + for i := 0; i < p.workers; i++ { + p.wg.Add(1) + go p.worker(ctx) + } +} + +// Stop останавливает пул (ждет завершения текущих задач) +func (p *WorkerPool) Stop() { + close(p.quit) + p.wg.Wait() +} + +// Submit отправляет задачу в очередь и возвращает канал для результата +func (p *WorkerPool) Submit(ctx context.Context, do func(context.Context) (any, error)) (<-chan RequestResult, error) { + if len(p.taskCh) >= p.queueSize { + return nil, ErrPoolQueueFull + } + + resultCh := make(chan RequestResult, 1) // буфер 1, чтобы не блокировать воркера + envelope := RequestEnvelope{do, resultCh} + select { + case <-ctx.Done(): + return nil, ctx.Err() + case p.taskCh <- envelope: + return resultCh, nil + default: + return nil, ErrPoolQueueFull + } +} + +// worker выполняет задачи +func (p *WorkerPool) worker(ctx context.Context) { + defer p.wg.Done() + for { + select { + case <-p.quit: + return + case envelope := <-p.taskCh: + // Выполняем задачу с переданным контекстом (или можно использовать свой) + val, err := envelope.DoFunc(ctx) + envelope.ResultCh <- RequestResult{Value: val, Err: err} + close(envelope.ResultCh) + } + } +} diff --git a/tgapi/uploader_api.go b/tgapi/uploader_api.go index 23424c0..1641162 100644 --- a/tgapi/uploader_api.go +++ b/tgapi/uploader_api.go @@ -3,6 +3,7 @@ package tgapi import ( "bytes" "context" + "errors" "fmt" "mime/multipart" "net/http" @@ -63,10 +64,21 @@ type UploaderRequest[R, P any] struct { func NewUploaderRequest[R, P any](method string, params P, files ...UploaderFile) UploaderRequest[R, P] { return UploaderRequest[R, P]{method, files, params} } -func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) { +func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R, error) { var zero R + if up.api.limiter != nil { + if up.api.dropOverflowLimit { + if !up.api.limiter.Allow() { + return zero, errors.New("rate limited") + } + } else { + if err := up.api.limiter.Wait(ctx); err != nil { + return zero, err + } + } + } - buf, contentType, err := prepareMultipart(u.files, u.params) + buf, contentType, err := prepareMultipart(r.files, r.params) if err != nil { return zero, err } @@ -75,7 +87,7 @@ func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) if up.api.useTestServer { methodPrefix = "/test" } - url := fmt.Sprintf("%s/bot%s%s/%s", up.api.apiUrl, up.api.token, methodPrefix, u.method) + url := fmt.Sprintf("%s/bot%s%s/%s", up.api.apiUrl, up.api.token, methodPrefix, r.method) req, err := http.NewRequestWithContext(ctx, "POST", url, buf) if err != nil { return zero, err @@ -84,7 +96,7 @@ func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) req.Header.Set("Accept", "application/json") req.Header.Set("User-Agent", fmt.Sprintf("Laniakea/%s", utils.VersionString)) - up.logger.Debugln("UPLOADER REQ", u.method) + up.logger.Debugln("UPLOADER REQ", r.method) res, err := up.api.client.Do(req) if err != nil { return zero, err @@ -92,15 +104,38 @@ func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) defer res.Body.Close() body, err := readBody(res.Body) - up.logger.Debugln("UPLOADER RES", u.method, string(body)) + up.logger.Debugln("UPLOADER RES", r.method, string(body)) if res.StatusCode != http.StatusOK { return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(body)) } return parseBody[R](body) } -func (u UploaderRequest[R, P]) Do(up *Uploader) (R, error) { - return u.DoWithContext(context.Background(), up) +func (r UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) { + var zero R + + result, err := up.api.pool.Submit(ctx, func(ctx context.Context) (any, error) { + return r.doRequest(ctx, up) + }) + if err != nil { + return zero, err + } + + select { + case <-ctx.Done(): + return zero, ctx.Err() + case res := <-result: + if res.Err != nil { + return zero, res.Err + } + if val, ok := res.Value.(R); ok { + return val, nil + } + return zero, ErrPoolUnexpected + } +} +func (r UploaderRequest[R, P]) Do(up *Uploader) (R, error) { + return r.DoWithContext(context.Background(), up) } func prepareMultipart[P any](files []UploaderFile, params P) (*bytes.Buffer, string, error) { diff --git a/utils/version.go b/utils/version.go index 0630841..fb99be2 100644 --- a/utils/version.go +++ b/utils/version.go @@ -1,9 +1,9 @@ package utils const ( - VersionString = "0.8.0-beta.3" + VersionString = "0.8.0-beta.4" VersionMajor = 0 VersionMinor = 8 VersionPatch = 0 - Beta = 3 + Beta = 4 )