package laniakea import ( "context" "fmt" "sort" "strings" "sync" "time" "git.nix13.pw/scuroneko/extypes" "git.nix13.pw/scuroneko/laniakea/tgapi" "git.nix13.pw/scuroneko/laniakea/utils" "git.nix13.pw/scuroneko/slog" "github.com/alitto/pond/v2" ) // DbContext is an interface representing the application's database context. // It is injected into plugins and middleware via Bot.DatabaseContext(). // // Example: // // type MyDB struct { ... } // bot := NewBot[MyDB](opts).DatabaseContext(&myDB) // // Use NoDB if no database is needed. type DbContext any // NoDB is a placeholder type for bots that do not use a database. // Use Bot[NoDB] to indicate no dependency injection is required. type NoDB struct{ DbContext } // DbLogger is a function type that returns a slog.LoggerWriter for database logging. // Used to inject database-specific log output (e.g., SQL queries, ORM events). type DbLogger[T DbContext] func(db *T) slog.LoggerWriter // BotPayloadType defines the serialization format for callback data payloads. type BotPayloadType string var ( // BotPayloadBase64 encodes callback data as a Base64 string. BotPayloadBase64 BotPayloadType = "base64" // BotPayloadJson encodes callback data as a JSON string. BotPayloadJson BotPayloadType = "json" ) // Bot is the core Telegram bot instance. // // Manages: // - API communication via tgapi // - Update processing pipeline (middleware → plugins) // - Background runners // - Logging and rate limiting // - Localization and draft message support // // All methods are safe for concurrent use. Direct field access is not recommended. type Bot[T DbContext] struct { token string debug bool errorTemplate string username string payloadType BotPayloadType maxWorkers int logger *slog.Logger // Main bot logger (JSON stdout + optional file) RequestLogger *slog.Logger // Optional request-level API logging extraLoggers extypes.Slice[*slog.Logger] // API, Uploader, and custom loggers plugins []Plugin[T] // Command/event handlers middlewares []Middleware[T] // Pre-processing filters (sorted by order) prefixes []string // Command prefixes (e.g., "/", "!") runners []Runner[T] // Background tasks (e.g., cleanup, cron) api *tgapi.API // Telegram API client uploader *tgapi.Uploader // File uploader dbContext *T // Injected database context l10n *L10n // Localization manager draftProvider *DraftProvider // Draft message builder updateOffsetMu sync.Mutex updateOffset int // Last processed update ID updateTypes []tgapi.UpdateType // Types of updates to fetch updateQueue chan *tgapi.Update // Internal queue for processing updates } // NewBot creates and initializes a new Bot instance using the provided BotOpts. // // Automatically: // - Creates API and Uploader clients // - Initializes structured logging (JSON stdout + optional file) // - Fetches bot username via GetMe() // - Sets up DraftProvider with random IDs // - Adds API and Uploader loggers to extraLoggers // // Panics if: // - Token is empty // - GetMe() fails (invalid token or network error) func NewBot[T any](opts *BotOpts) *Bot[T] { if opts.Token == "" { panic("laniakea: BotOpts.Token is required") } updateQueue := make(chan *tgapi.Update, 512) //var limiter *utils.RateLimiter //if opts.RateLimit > 0 { // limiter = utils.NewRateLimiter() //} limiter := utils.NewRateLimiter() apiOpts := tgapi.NewAPIOpts(opts.Token). SetAPIUrl(opts.APIUrl). UseTestServer(opts.UseTestServer). SetLimiter(limiter) api := tgapi.NewAPI(apiOpts) uploader := tgapi.NewUploader(api) prefixes := opts.Prefixes if len(prefixes) == 0 { prefixes = []string{"/"} } workers := 32 if opts.MaxWorkers > 0 { workers = opts.MaxWorkers } bot := &Bot[T]{ updateOffset: 0, errorTemplate: "%s", payloadType: BotPayloadBase64, maxWorkers: workers, updateQueue: updateQueue, api: api, uploader: uploader, debug: opts.Debug, prefixes: prefixes, token: opts.Token, plugins: make([]Plugin[T], 0), updateTypes: make([]tgapi.UpdateType, 0), runners: make([]Runner[T], 0), extraLoggers: make([]*slog.Logger, 0), l10n: &L10n{}, draftProvider: NewRandomDraftProvider(api), } // Add API and Uploader loggers to extraLoggers for unified output bot.extraLoggers = bot.extraLoggers.Push(api.GetLogger()).Push(uploader.GetLogger()) if len(opts.ErrorTemplate) > 0 { bot.errorTemplate = opts.ErrorTemplate } if len(opts.LoggerBasePath) == 0 { opts.LoggerBasePath = "./" } bot.initLoggers(opts) // Fetch bot info to validate token and get username u, err := api.GetMe() if err != nil { _ = bot.Close() bot.logger.Fatal(err) } bot.username = Val(u.Username, "") if bot.username == "" { bot.logger.Warn("Can't get bot username. Named command handlers won't work!") } bot.logger.Infoln(fmt.Sprintf("Authorized as %s (@%s)", u.FirstName, Val(u.Username, "unknown"))) return bot } // Close gracefully shuts down the bot. // // Closes: // - Uploader (waits for pending uploads) // - API client // - RequestLogger (if enabled) // - Main logger // // Returns the first error encountered, if any. func (bot *Bot[T]) Close() error { if err := bot.uploader.Close(); err != nil { bot.logger.Errorln(err) } if err := bot.api.CloseApi(); err != nil { bot.logger.Errorln(err) } if bot.RequestLogger != nil { if err := bot.RequestLogger.Close(); err != nil { bot.logger.Errorln(err) } } if err := bot.logger.Close(); err != nil { return err } return nil } // initLoggers configures the main and optional request loggers. // // Uses DEBUG flag to set log level (DEBUG if true, FATAL otherwise). // Writes to stdout in JSON format by default. // If WriteToFile is true, writes to main.log and requests.log in LoggerBasePath. func (bot *Bot[T]) initLoggers(opts *BotOpts) { level := slog.FATAL if opts.Debug { level = slog.DEBUG } bot.logger = slog.CreateLogger().Level(level).Prefix("BOT") bot.logger.AddWriter(bot.logger.CreateJsonStdoutWriter()) if opts.WriteToFile { path := fmt.Sprintf("%s/main.log", strings.TrimRight(opts.LoggerBasePath, "/")) fileWriter, err := bot.logger.CreateTextFileWriter(path) if err != nil { bot.logger.Fatal(err) } bot.logger.AddWriter(fileWriter) } if opts.UseRequestLogger { bot.RequestLogger = slog.CreateLogger().Level(level).Prefix("REQUESTS") bot.RequestLogger.AddWriter(bot.RequestLogger.CreateJsonStdoutWriter()) if opts.WriteToFile { path := fmt.Sprintf("%s/requests.log", strings.TrimRight(opts.LoggerBasePath, "/")) fileWriter, err := bot.RequestLogger.CreateTextFileWriter(path) if err != nil { bot.logger.Fatal(err) } bot.RequestLogger.AddWriter(fileWriter) } } } // GetUpdateOffset returns the current update offset (thread-safe). func (bot *Bot[T]) GetUpdateOffset() int { bot.updateOffsetMu.Lock() defer bot.updateOffsetMu.Unlock() return bot.updateOffset } // SetUpdateOffset sets the update offset for next GetUpdates call (thread-safe). func (bot *Bot[T]) SetUpdateOffset(offset int) { bot.updateOffsetMu.Lock() defer bot.updateOffsetMu.Unlock() bot.updateOffset = offset } // GetUpdateTypes returns the list of update types the bot is configured to receive. func (bot *Bot[T]) GetUpdateTypes() []tgapi.UpdateType { return bot.updateTypes } // GetLogger returns the main bot logger. func (bot *Bot[T]) GetLogger() *slog.Logger { return bot.logger } // GetDBContext returns the injected database context. // Returns nil if not set via DatabaseContext(). func (bot *Bot[T]) GetDBContext() *T { return bot.dbContext } // L10n translates a key in the given language. // Returns empty string if translation not found. func (bot *Bot[T]) L10n(lang, key string) string { return bot.l10n.Translate(lang, key) } // SetDraftProvider replaces the default DraftProvider with a custom one. // Useful for using LinearDraftIdGenerator to persist draft IDs across restarts. func (bot *Bot[T]) SetDraftProvider(p *DraftProvider) *Bot[T] { bot.draftProvider = p return bot } // DatabaseContext injects a database context into the bot. // This context is accessible to plugins and middleware via GetDBContext(). func (bot *Bot[T]) DatabaseContext(ctx *T) *Bot[T] { bot.dbContext = ctx return bot } // UpdateTypes sets the list of update types the bot will request from Telegram. // Overwrites any previously set types. func (bot *Bot[T]) UpdateTypes(t ...tgapi.UpdateType) *Bot[T] { bot.updateTypes = make([]tgapi.UpdateType, 0) bot.updateTypes = append(bot.updateTypes, t...) return bot } // SetPayloadType sets the type, that bot will use for payload // json - string `{"cmd": "command", "args": [...]} // base64 - same json, but encoded in base64 string func (bot *Bot[T]) SetPayloadType(t BotPayloadType) *Bot[T] { bot.payloadType = t return bot } // AddUpdateType adds one or more update types to the list. // Does not overwrite existing types. func (bot *Bot[T]) AddUpdateType(t ...tgapi.UpdateType) *Bot[T] { bot.updateTypes = append(bot.updateTypes, t...) return bot } // AddPrefixes adds one or more command prefixes (e.g., "/", "!"). // Must have at least one prefix before Run(). func (bot *Bot[T]) AddPrefixes(prefixes ...string) *Bot[T] { bot.prefixes = append(bot.prefixes, prefixes...) return bot } // ErrorTemplate sets the format string for error messages sent to users. // Use "%s" to insert the error message. // Example: "❌ Error: %s" → "❌ Error: Command not found" func (bot *Bot[T]) ErrorTemplate(s string) *Bot[T] { bot.errorTemplate = s return bot } // Debug enables or disables debug logging. func (bot *Bot[T]) Debug(debug bool) *Bot[T] { bot.debug = debug return bot } // AddPlugins registers one or more plugins. // Plugins are executed in registration order unless filtered by middleware. func (bot *Bot[T]) AddPlugins(plugin ...*Plugin[T]) *Bot[T] { for _, p := range plugin { bot.plugins = append(bot.plugins, *p) bot.logger.Debugln(fmt.Sprintf("plugins with name \"%s\" registered", p.name)) } return bot } // AddMiddleware registers one or more middleware handlers. // // Middleware are executed in order of increasing .order value before plugins. // If two middleware have the same order, they are sorted lexicographically by name. // // Middleware can: // - Modify or reject updates before they reach plugins // - Inject context (e.g., user auth state, rate limit status) // - Log, validate, or transform incoming data // // Example: // // bot.AddMiddleware(&authMiddleware, &rateLimitMiddleware) // // Panics if any middleware has a nil name. func (bot *Bot[T]) AddMiddleware(middleware ...Middleware[T]) *Bot[T] { for _, m := range middleware { if m.name == "" { panic("laniakea: middleware must have a non-empty name") } bot.middlewares = append(bot.middlewares, m) bot.logger.Debugln(fmt.Sprintf("middleware with name \"%s\" registered", m.name)) } // Stable sort by order (ascending), then by name (lexicographic) sort.Slice(bot.middlewares, func(i, j int) bool { first := bot.middlewares[i] second := bot.middlewares[j] if first.order != second.order { return first.order < second.order } return first.name < second.name }) return bot } // AddRunner registers a background runner to execute concurrently with the bot. // // Runners are goroutines that run independently of update processing. // Common use cases: // - Periodic cleanup (e.g., expiring drafts, clearing temp files) // - Metrics collection or health checks // - Scheduled tasks (e.g., daily announcements) // // Runners are started immediately after Bot.Run() is called. // // Example: // // bot.AddRunner(&cleanupRunner) // // Panics if runner has a nil name. func (bot *Bot[T]) AddRunner(runner Runner[T]) *Bot[T] { if runner.name == "" { panic("laniakea: runner must have a non-empty name") } bot.runners = append(bot.runners, runner) bot.logger.Debugln(fmt.Sprintf("runner with name \"%s\" registered", runner.name)) return bot } // AddL10n sets the localization (i18n) provider for the bot. // // The L10n instance must be pre-populated with translations. // Translations are accessed via Bot.L10n(lang, key). // // Example: // // l10n := l10n.New() // l10n.Add("en", "hello", "Hello!") // l10n.Add("es", "hello", "¡Hola!") // bot.AddL10n(l10n) // // Replaces any previously set L10n instance. func (bot *Bot[T]) AddL10n(l *L10n) *Bot[T] { if l == nil { bot.logger.Warn("AddL10n called with nil L10n; localization will be disabled") } bot.l10n = l return bot } // AddDatabaseLoggerWriter adds a database logger writer to all loggers. // // The writer will receive logs from: // - Main bot logger // - Request logger (if enabled) // - API and Uploader loggers // // Example: // // bot.AddDatabaseLoggerWriter(func(db *MyDB) slog.LoggerWriter { // return db.QueryLogger() // }) func (bot *Bot[T]) AddDatabaseLoggerWriter(writer DbLogger[T]) *Bot[T] { w := writer(bot.dbContext) bot.logger.AddWriter(w) if bot.RequestLogger != nil { bot.RequestLogger.AddWriter(w) } for _, l := range bot.extraLoggers { l.AddWriter(w) } return bot } // RunWithContext starts the bot with a given context for graceful shutdown. // // This is the main entry point for bot execution. It: // - Validates required configuration (prefixes, plugins) // - Starts all registered runners as background goroutines // - Begins polling for updates via Telegram's GetUpdates API // - Processes updates concurrently using a worker pool with size configurable via BotOpts.MaxWorkers // // The context controls graceful shutdown. When canceled, the bot: // - Stops polling for new updates // - Finishes processing currently queued updates // - Closes all resources (API, uploader, loggers) // // Example: // // ctx, cancel := context.WithCancel(context.Background()) // go bot.RunWithContext(ctx) // // ... later ... // cancel() // triggers graceful shutdown func (bot *Bot[T]) RunWithContext(ctx context.Context) { if len(bot.prefixes) == 0 { bot.logger.Fatalln("no prefixes defined") return } if len(bot.plugins) == 0 { bot.logger.Fatalln("no plugins defined") return } bot.ExecRunners(ctx) bot.logger.Infoln("Bot running. Press CTRL+C to exit.") // Start update polling in a goroutine go func() { defer func() { if r := recover(); r != nil { bot.logger.Errorln(fmt.Sprintf("panic in update polling: %v", r)) } close(bot.updateQueue) }() for { select { case <-ctx.Done(): return default: updates, err := bot.Updates() if err != nil { bot.logger.Errorln("failed to fetch updates:", err) time.Sleep(2 * time.Second) // exponential backoff continue } for _, u := range updates { u := u // copy loop variable to avoid race condition select { case bot.updateQueue <- &u: case <-ctx.Done(): return } } } } }() // Start worker pool for concurrent update handling pool := pond.NewPool(bot.maxWorkers) for update := range bot.updateQueue { u := update // capture loop variable pool.Submit(func() { bot.handle(u) }) } pool.Stop() // Wait for all tasks to complete and stop the pool } // Run starts the bot using a background context. // // Equivalent to RunWithContext(context.Background()). // Use this for simple bots where graceful shutdown is not required. // // For production use, prefer RunWithContext to handle SIGINT/SIGTERM gracefully. func (bot *Bot[T]) Run() { bot.RunWithContext(context.Background()) }