0.8.0 beta 4
This commit is contained in:
67
tgapi/api.go
67
tgapi/api.go
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
|
||||
"git.nix13.pw/scuroneko/laniakea/utils"
|
||||
"git.nix13.pw/scuroneko/slog"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type APIOpts struct {
|
||||
@@ -18,8 +20,13 @@ type APIOpts struct {
|
||||
client *http.Client
|
||||
useTestServer bool
|
||||
apiUrl string
|
||||
|
||||
limiter *rate.Limiter
|
||||
dropOverflowLimit bool
|
||||
}
|
||||
|
||||
var ErrPoolUnexpected = errors.New("unexpected response from pool")
|
||||
|
||||
func NewAPIOpts(token string) *APIOpts {
|
||||
return &APIOpts{token: token, client: nil, useTestServer: false, apiUrl: "https://api.telegram.org"}
|
||||
}
|
||||
@@ -39,6 +46,14 @@ func (opts *APIOpts) SetAPIUrl(apiUrl string) *APIOpts {
|
||||
}
|
||||
return opts
|
||||
}
|
||||
func (opts *APIOpts) SetLimiter(limiter *rate.Limiter) *APIOpts {
|
||||
opts.limiter = limiter
|
||||
return opts
|
||||
}
|
||||
func (opts *APIOpts) SetLimiterDrop(b bool) *APIOpts {
|
||||
opts.dropOverflowLimit = b
|
||||
return opts
|
||||
}
|
||||
|
||||
type API struct {
|
||||
token string
|
||||
@@ -46,6 +61,10 @@ type API struct {
|
||||
logger *slog.Logger
|
||||
useTestServer bool
|
||||
apiUrl string
|
||||
|
||||
pool *WorkerPool
|
||||
limiter *rate.Limiter
|
||||
dropOverflowLimit bool
|
||||
}
|
||||
|
||||
func NewAPI(opts *APIOpts) *API {
|
||||
@@ -55,9 +74,18 @@ func NewAPI(opts *APIOpts) *API {
|
||||
if client == nil {
|
||||
client = &http.Client{Timeout: time.Second * 45}
|
||||
}
|
||||
return &API{opts.token, client, l, opts.useTestServer, opts.apiUrl}
|
||||
pool := NewWorkerPool(16, 256)
|
||||
pool.Start(context.Background())
|
||||
return &API{
|
||||
opts.token, client, l,
|
||||
opts.useTestServer, opts.apiUrl,
|
||||
pool, opts.limiter, opts.dropOverflowLimit,
|
||||
}
|
||||
}
|
||||
func (api *API) CloseApi() error {
|
||||
api.pool.Stop()
|
||||
return api.logger.Close()
|
||||
}
|
||||
func (api *API) CloseApi() error { return api.logger.Close() }
|
||||
func (api *API) GetLogger() *slog.Logger { return api.logger }
|
||||
|
||||
type ApiResponse[R any] struct {
|
||||
@@ -74,8 +102,20 @@ type TelegramRequest[R, P any] struct {
|
||||
func NewRequest[R, P any](method string, params P) TelegramRequest[R, P] {
|
||||
return TelegramRequest[R, P]{method: method, params: params}
|
||||
}
|
||||
func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) {
|
||||
func (r TelegramRequest[R, P]) doRequest(ctx context.Context, api *API) (R, error) {
|
||||
var zero R
|
||||
if api.limiter != nil {
|
||||
if api.dropOverflowLimit {
|
||||
if !api.limiter.Allow() {
|
||||
return zero, errors.New("rate limited")
|
||||
}
|
||||
} else {
|
||||
if err := api.limiter.Wait(ctx); err != nil {
|
||||
return zero, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data, err := json.Marshal(r.params)
|
||||
if err != nil {
|
||||
return zero, err
|
||||
@@ -113,7 +153,28 @@ func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R,
|
||||
return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(data))
|
||||
}
|
||||
return parseBody[R](data)
|
||||
}
|
||||
func (r TelegramRequest[R, P]) DoWithContext(ctx context.Context, api *API) (R, error) {
|
||||
var zero R
|
||||
result, err := api.pool.Submit(ctx, func(ctx context.Context) (any, error) {
|
||||
return r.doRequest(ctx, api)
|
||||
})
|
||||
if err != nil {
|
||||
return zero, err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return zero, ctx.Err()
|
||||
case res := <-result:
|
||||
if res.Err != nil {
|
||||
return zero, res.Err
|
||||
}
|
||||
if val, ok := res.Value.(R); ok {
|
||||
return val, nil
|
||||
}
|
||||
return zero, ErrPoolUnexpected
|
||||
}
|
||||
}
|
||||
func (r TelegramRequest[R, P]) Do(api *API) (R, error) {
|
||||
return r.DoWithContext(context.Background(), api)
|
||||
|
||||
92
tgapi/pool.go
Normal file
92
tgapi/pool.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package tgapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var ErrPoolQueueFull = errors.New("worker pool queue full")
|
||||
|
||||
type RequestEnvelope struct {
|
||||
DoFunc func(context.Context) (any, error) // функция, которая выполнит запрос и вернет any
|
||||
ResultCh chan RequestResult // канал для результата
|
||||
}
|
||||
type RequestResult struct {
|
||||
Value any
|
||||
Err error
|
||||
}
|
||||
|
||||
// WorkerPool управляет воркерами и очередью
|
||||
type WorkerPool struct {
|
||||
taskCh chan RequestEnvelope
|
||||
queueSize int
|
||||
workers int
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
started bool
|
||||
startedMu sync.Mutex
|
||||
}
|
||||
|
||||
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
|
||||
return &WorkerPool{
|
||||
taskCh: make(chan RequestEnvelope, queueSize),
|
||||
queueSize: queueSize,
|
||||
workers: workers,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start запускает воркеров
|
||||
func (p *WorkerPool) Start(ctx context.Context) {
|
||||
p.startedMu.Lock()
|
||||
defer p.startedMu.Unlock()
|
||||
if p.started {
|
||||
return
|
||||
}
|
||||
p.started = true
|
||||
for i := 0; i < p.workers; i++ {
|
||||
p.wg.Add(1)
|
||||
go p.worker(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop останавливает пул (ждет завершения текущих задач)
|
||||
func (p *WorkerPool) Stop() {
|
||||
close(p.quit)
|
||||
p.wg.Wait()
|
||||
}
|
||||
|
||||
// Submit отправляет задачу в очередь и возвращает канал для результата
|
||||
func (p *WorkerPool) Submit(ctx context.Context, do func(context.Context) (any, error)) (<-chan RequestResult, error) {
|
||||
if len(p.taskCh) >= p.queueSize {
|
||||
return nil, ErrPoolQueueFull
|
||||
}
|
||||
|
||||
resultCh := make(chan RequestResult, 1) // буфер 1, чтобы не блокировать воркера
|
||||
envelope := RequestEnvelope{do, resultCh}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case p.taskCh <- envelope:
|
||||
return resultCh, nil
|
||||
default:
|
||||
return nil, ErrPoolQueueFull
|
||||
}
|
||||
}
|
||||
|
||||
// worker выполняет задачи
|
||||
func (p *WorkerPool) worker(ctx context.Context) {
|
||||
defer p.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-p.quit:
|
||||
return
|
||||
case envelope := <-p.taskCh:
|
||||
// Выполняем задачу с переданным контекстом (или можно использовать свой)
|
||||
val, err := envelope.DoFunc(ctx)
|
||||
envelope.ResultCh <- RequestResult{Value: val, Err: err}
|
||||
close(envelope.ResultCh)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package tgapi
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
@@ -63,10 +64,21 @@ type UploaderRequest[R, P any] struct {
|
||||
func NewUploaderRequest[R, P any](method string, params P, files ...UploaderFile) UploaderRequest[R, P] {
|
||||
return UploaderRequest[R, P]{method, files, params}
|
||||
}
|
||||
func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) {
|
||||
func (r UploaderRequest[R, P]) doRequest(ctx context.Context, up *Uploader) (R, error) {
|
||||
var zero R
|
||||
if up.api.limiter != nil {
|
||||
if up.api.dropOverflowLimit {
|
||||
if !up.api.limiter.Allow() {
|
||||
return zero, errors.New("rate limited")
|
||||
}
|
||||
} else {
|
||||
if err := up.api.limiter.Wait(ctx); err != nil {
|
||||
return zero, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buf, contentType, err := prepareMultipart(u.files, u.params)
|
||||
buf, contentType, err := prepareMultipart(r.files, r.params)
|
||||
if err != nil {
|
||||
return zero, err
|
||||
}
|
||||
@@ -75,7 +87,7 @@ func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader)
|
||||
if up.api.useTestServer {
|
||||
methodPrefix = "/test"
|
||||
}
|
||||
url := fmt.Sprintf("%s/bot%s%s/%s", up.api.apiUrl, up.api.token, methodPrefix, u.method)
|
||||
url := fmt.Sprintf("%s/bot%s%s/%s", up.api.apiUrl, up.api.token, methodPrefix, r.method)
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, buf)
|
||||
if err != nil {
|
||||
return zero, err
|
||||
@@ -84,7 +96,7 @@ func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader)
|
||||
req.Header.Set("Accept", "application/json")
|
||||
req.Header.Set("User-Agent", fmt.Sprintf("Laniakea/%s", utils.VersionString))
|
||||
|
||||
up.logger.Debugln("UPLOADER REQ", u.method)
|
||||
up.logger.Debugln("UPLOADER REQ", r.method)
|
||||
res, err := up.api.client.Do(req)
|
||||
if err != nil {
|
||||
return zero, err
|
||||
@@ -92,15 +104,38 @@ func (u UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader)
|
||||
defer res.Body.Close()
|
||||
|
||||
body, err := readBody(res.Body)
|
||||
up.logger.Debugln("UPLOADER RES", u.method, string(body))
|
||||
up.logger.Debugln("UPLOADER RES", r.method, string(body))
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return zero, fmt.Errorf("unexpected status code: %d, %s", res.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return parseBody[R](body)
|
||||
}
|
||||
func (u UploaderRequest[R, P]) Do(up *Uploader) (R, error) {
|
||||
return u.DoWithContext(context.Background(), up)
|
||||
func (r UploaderRequest[R, P]) DoWithContext(ctx context.Context, up *Uploader) (R, error) {
|
||||
var zero R
|
||||
|
||||
result, err := up.api.pool.Submit(ctx, func(ctx context.Context) (any, error) {
|
||||
return r.doRequest(ctx, up)
|
||||
})
|
||||
if err != nil {
|
||||
return zero, err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return zero, ctx.Err()
|
||||
case res := <-result:
|
||||
if res.Err != nil {
|
||||
return zero, res.Err
|
||||
}
|
||||
if val, ok := res.Value.(R); ok {
|
||||
return val, nil
|
||||
}
|
||||
return zero, ErrPoolUnexpected
|
||||
}
|
||||
}
|
||||
func (r UploaderRequest[R, P]) Do(up *Uploader) (R, error) {
|
||||
return r.DoWithContext(context.Background(), up)
|
||||
}
|
||||
|
||||
func prepareMultipart[P any](files []UploaderFile, params P) (*bytes.Buffer, string, error) {
|
||||
|
||||
Reference in New Issue
Block a user