v1.0.0 beta 1
This commit is contained in:
91
bot.go
91
bot.go
@@ -1,16 +1,19 @@
|
||||
package laniakea
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"sync"
|
||||
|
||||
"git.nix13.pw/scuroneko/extypes"
|
||||
"git.nix13.pw/scuroneko/laniakea/tgapi"
|
||||
"git.nix13.pw/scuroneko/slog"
|
||||
"github.com/alitto/pond/v2"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
@@ -87,13 +90,14 @@ type Bot[T DbContext] struct {
|
||||
dbContext *T
|
||||
l10n *L10n
|
||||
|
||||
updateOffset int
|
||||
updateTypes []tgapi.UpdateType
|
||||
updateQueue *extypes.Queue[*tgapi.Update]
|
||||
updateOffsetMu sync.Mutex
|
||||
updateOffset int
|
||||
updateTypes []tgapi.UpdateType
|
||||
updateQueue chan *tgapi.Update
|
||||
}
|
||||
|
||||
func NewBot[T any](opts *BotOpts) *Bot[T] {
|
||||
updateQueue := extypes.CreateQueue[*tgapi.Update](512)
|
||||
updateQueue := make(chan *tgapi.Update, 512)
|
||||
|
||||
var limiter *rate.Limiter
|
||||
if opts.RateLimit > 0 {
|
||||
@@ -185,13 +189,20 @@ func (bot *Bot[T]) initLoggers(opts *BotOpts) {
|
||||
}
|
||||
}
|
||||
|
||||
func (bot *Bot[T]) GetUpdateOffset() int { return bot.updateOffset }
|
||||
func (bot *Bot[T]) SetUpdateOffset(offset int) { bot.updateOffset = offset }
|
||||
func (bot *Bot[T]) GetUpdateTypes() []tgapi.UpdateType { return bot.updateTypes }
|
||||
func (bot *Bot[T]) GetQueue() *extypes.Queue[*tgapi.Update] { return bot.updateQueue }
|
||||
func (bot *Bot[T]) GetLogger() *slog.Logger { return bot.logger }
|
||||
func (bot *Bot[T]) GetDBContext() *T { return bot.dbContext }
|
||||
func (bot *Bot[T]) L10n(lang, key string) string { return bot.l10n.Translate(lang, key) }
|
||||
func (bot *Bot[T]) GetUpdateOffset() int {
|
||||
bot.updateOffsetMu.Lock()
|
||||
defer bot.updateOffsetMu.Unlock()
|
||||
return bot.updateOffset
|
||||
}
|
||||
func (bot *Bot[T]) SetUpdateOffset(offset int) {
|
||||
bot.updateOffsetMu.Lock()
|
||||
defer bot.updateOffsetMu.Unlock()
|
||||
bot.updateOffset = offset
|
||||
}
|
||||
func (bot *Bot[T]) GetUpdateTypes() []tgapi.UpdateType { return bot.updateTypes }
|
||||
func (bot *Bot[T]) GetLogger() *slog.Logger { return bot.logger }
|
||||
func (bot *Bot[T]) GetDBContext() *T { return bot.dbContext }
|
||||
func (bot *Bot[T]) L10n(lang, key string) string { return bot.l10n.Translate(lang, key) }
|
||||
|
||||
type DbLogger[T DbContext] func(db *T) slog.LoggerWriter
|
||||
|
||||
@@ -235,7 +246,7 @@ func (bot *Bot[T]) Debug(debug bool) *Bot[T] {
|
||||
func (bot *Bot[T]) AddPlugins(plugin ...*Plugin[T]) *Bot[T] {
|
||||
for _, p := range plugin {
|
||||
bot.plugins = append(bot.plugins, *p)
|
||||
bot.logger.Debugln(fmt.Sprintf("plugins with name \"%s\" registered", p.Name))
|
||||
bot.logger.Debugln(fmt.Sprintf("plugins with name \"%s\" registered", p.name))
|
||||
}
|
||||
return bot
|
||||
}
|
||||
@@ -266,7 +277,15 @@ func (bot *Bot[T]) AddL10n(l *L10n) *Bot[T] {
|
||||
return bot
|
||||
}
|
||||
|
||||
func (bot *Bot[T]) Run() {
|
||||
func (bot *Bot[T]) enqueueUpdate(u *tgapi.Update) error {
|
||||
select {
|
||||
case bot.updateQueue <- u:
|
||||
return nil
|
||||
default:
|
||||
return extypes.QueueFullErr
|
||||
}
|
||||
}
|
||||
func (bot *Bot[T]) RunWithContext(ctx context.Context) {
|
||||
if len(bot.prefixes) == 0 {
|
||||
bot.logger.Fatalln("no prefixes defined")
|
||||
return
|
||||
@@ -282,26 +301,36 @@ func (bot *Bot[T]) Run() {
|
||||
bot.logger.Infoln("Bot running. Press CTRL+C to exit.")
|
||||
go func() {
|
||||
for {
|
||||
_, err := bot.Updates()
|
||||
if err != nil {
|
||||
bot.logger.Errorln(err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
updates, err := bot.Updates()
|
||||
if err != nil {
|
||||
bot.logger.Errorln(err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, u := range updates {
|
||||
select {
|
||||
case bot.updateQueue <- new(u):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
queue := bot.updateQueue
|
||||
if queue.IsEmpty() {
|
||||
time.Sleep(time.Millisecond * 25)
|
||||
continue
|
||||
}
|
||||
|
||||
u := queue.Dequeue()
|
||||
if u == nil {
|
||||
bot.logger.Errorln("update is nil")
|
||||
continue
|
||||
}
|
||||
|
||||
bot.handle(u)
|
||||
pool := pond.NewPool(16)
|
||||
for update := range bot.updateQueue {
|
||||
update := update
|
||||
log.Println(update)
|
||||
pool.Submit(func() {
|
||||
bot.handle(update)
|
||||
})
|
||||
}
|
||||
}
|
||||
func (bot *Bot[T]) Run() {
|
||||
bot.RunWithContext(context.Background())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user