Files
Laniakea/tgapi/pool.go
2026-03-06 11:59:17 +03:00

143 lines
7.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package tgapi
import (
"context"
"sync"
)
// workerPool — приватная структура, управляющая пулом воркеров.
// Внешний код не может создавать или напрямую взаимодействовать с этой структурой.
// Используется только через экспортируемые методы newWorkerPool, start, stop, submit.
type workerPool struct {
taskCh chan requestEnvelope // канал для принятия задач (буферизованный)
queueSize int // максимальный размер очереди
workers int // количество воркеров (горутин)
wg sync.WaitGroup // синхронизирует завершение всех воркеров при остановке
quit chan struct{} // канал для сигнала остановки
started bool // флаг, указывающий, запущен ли пул
startedMu sync.Mutex // мьютекс для безопасного доступа к started
}
// requestEnvelope — приватная структура, инкапсулирующая задачу и канал для результата.
// Используется только внутри пакета для передачи задач воркерам.
type requestEnvelope struct {
doFunc func(context.Context) (any, error) // функция, выполняющая запрос
resultCh chan requestResult // канал, через который воркер вернёт результат
}
// requestResult — приватная структура, представляющая результат выполнения задачи.
// Внешний код получает его через канал, но не знает структуры — только через <-chan requestResult.
type requestResult struct {
value any // значение, возвращённое задачей
err error // ошибка, если возникла
}
// newWorkerPool создаёт новый пул воркеров с заданным количеством горутин и размером очереди.
// Это единственный способ создать workerPool — внешний код не может создать его напрямую.
func newWorkerPool(workers int, queueSize int) *workerPool {
if workers <= 0 {
workers = 1 // защита от некорректных значений
}
if queueSize <= 0 {
queueSize = 100 // разумный дефолт
}
return &workerPool{
taskCh: make(chan requestEnvelope, queueSize),
queueSize: queueSize,
workers: workers,
quit: make(chan struct{}),
}
}
// start запускает воркеры (горутины), которые будут обрабатывать задачи из очереди.
// Метод идемпотентен: если пул уже запущен — ничего не делает.
// Должен вызываться перед первым вызовом submit.
func (p *workerPool) start(ctx context.Context) {
p.startedMu.Lock()
defer p.startedMu.Unlock()
if p.started {
return // уже запущен — ничего не делаем
}
p.started = true
// Запускаем воркеры — каждый будет обрабатывать задачи в бесконечном цикле
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(ctx) // запускаем горутину с контекстом
}
}
// stop останавливает пул воркеров.
// Отправляет сигнал остановки через quit-канал и ждёт завершения всех активных задач.
// Безопасно вызывать многократно — после остановки повторные вызовы не имеют эффекта.
func (p *workerPool) stop() {
close(p.quit) // сигнал для всех воркеров — выйти из цикла
p.wg.Wait() // ждём, пока все воркеры завершатся
}
// submit отправляет задачу в очередь и возвращает канал, через который будет получен результат.
// Если очередь переполнена — возвращает ErrPoolQueueFull.
// Канал результата имеет буфер 1, чтобы не блокировать воркера при записи.
// Контекст используется для отмены задачи, если клиент отменил запрос до отправки.
func (p *workerPool) submit(ctx context.Context, do func(context.Context) (any, error)) (<-chan requestResult, error) {
// Проверяем, не превышена ли очередь
if len(p.taskCh) >= p.queueSize {
return nil, ErrPoolQueueFull
}
// Создаём канал для результата — буферизованный, чтобы не блокировать воркера
resultCh := make(chan requestResult, 1)
// Создаём обёртку задачи
envelope := requestEnvelope{
doFunc: do,
resultCh: resultCh,
}
// Пытаемся отправить задачу в очередь
select {
case <-ctx.Done():
// Клиент отменил операцию до отправки — возвращаем ошибку отмены
return nil, ctx.Err()
case p.taskCh <- envelope:
// Успешно отправлено — возвращаем канал для чтения результата
return resultCh, nil
default:
// Очередь переполнена — не должно происходить при проверке len(p.taskCh), но на всякий случай
return nil, ErrPoolQueueFull
}
}
// worker — приватная горутина, выполняющая задачи из очереди.
// Каждый воркер работает в бесконечном цикле, пока не получит сигнал остановки.
// При получении задачи:
// - вызывает doFunc с контекстом
// - записывает результат в resultCh
// - закрывает канал, чтобы клиент мог прочитать и завершить
//
// После закрытия quit-канала — воркер завершает работу.
func (p *workerPool) worker(ctx context.Context) {
defer p.wg.Done() // уменьшаем WaitGroup при завершении горутины
for {
select {
case <-p.quit:
// Получен сигнал остановки — выходим из цикла
return
case envelope := <-p.taskCh:
// Выполняем задачу с переданным контекстом (клиентский или общий)
value, err := envelope.doFunc(ctx)
// Записываем результат в канал — не блокируем, т.к. буфер 1
envelope.resultCh <- requestResult{
value: value,
err: err,
}
// Закрываем канал — клиент знает, что результат пришёл и больше не будет
close(envelope.resultCh)
}
}
}