From d3495c9a44d04668cc45fb17f3130724e549647b Mon Sep 17 00:00:00 2001 From: Akulij Date: Sun, 8 Jun 2025 18:12:16 +0500 Subject: [PATCH] refactor bot manager --- src/bot_manager.rs | 115 +++++++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 46 deletions(-) diff --git a/src/bot_manager.rs b/src/bot_manager.rs index 5d91685..7b7595e 100644 --- a/src/bot_manager.rs +++ b/src/bot_manager.rs @@ -17,15 +17,18 @@ use crate::{ BotController, BotResult, BotRuntime, }; +pub type BotThread = JoinHandle>; + pub struct BotRunner { controller: BotController, info: BotInfo, notificator: NotificatorThread, - thread: Option>>, + thread: Option, } +#[derive(Debug)] pub enum NotificatorThread { - Running(Option>>), + Running(Option), Done, } @@ -72,7 +75,7 @@ where pub async fn dispatch(mut self, db: &mut DB) -> BotResult<()> { loop { - 'biter: for bi in (self.bi_getter)().await { + for bi in (self.bi_getter)().await { // removing handler to force restart // TODO: wait till all updates are processed in bot // Temporarly disabling code, because it's free of js runtime @@ -88,26 +91,12 @@ where let mut bot_runner = match self.bot_pool.remove(&bi.name) { Some(br) => br, None => { - let handlers = (self.h_mapper)(bi.clone()).await; info!("NEW INSTANCE: Starting new instance! bot name: {}", bi.name); - self.start_bot(bi, db, handlers.collect()).await?; - continue 'biter; + self.create_bot_runner(&bi, db).await? } }; - // checking if thread is not finished, otherwise clearing handler - bot_runner.thread = match bot_runner.thread { - Some(thread) => { - if thread.is_finished() { - let err = thread.join(); - error!("Thread bot `{}` finished with error: {:?}", bi.name, err); - None - } else { - Some(thread) - } - } - None => None, - }; + bot_runner.thread = clear_finished_thread(bot_runner.thread, &bi); // checking if thread is running, otherwise start thread bot_runner.thread = match bot_runner.thread { @@ -129,41 +118,84 @@ where ) } }; + + bot_runner.notificator = check_notificator_done(bot_runner.notificator); + + bot_runner.notificator = match bot_runner.notificator { + NotificatorThread::Done => NotificatorThread::Done, + NotificatorThread::Running(thread) => { + NotificatorThread::Running(match thread { + Some(thread) => Some(thread), + None => { + let thread = + spawn_notificator_thread(bot_runner.controller.clone()).await?; + Some(thread) + } + }) + } + }; + self.bot_pool.insert(bi.name.clone(), bot_runner); } tokio::time::sleep(Duration::from_secs(1)).await; } } - pub async fn start_bot( + pub async fn create_bot_runner( &mut self, - bi: BotInstance, + bi: &BotInstance, db: &mut DB, - plug_handlers: Vec, - ) -> BotResult { + ) -> BotResult { let db = db.clone().with_name(bi.name.clone()); let controller = BotController::with_db(db.clone(), &bi.token, &bi.script).await?; - let handler = script_handler_gen(controller.runtime.clone(), plug_handlers).await; - - let thread = - spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?; - let notificator = spawn_notificator_thread(controller.clone()).await?; - let notificator = NotificatorThread::Running(Some(notificator)); - let info = BotInfo { name: bi.name.clone(), }; let runner = BotRunner { controller, - info: info.clone(), - notificator, - thread: Some(thread), + info, + notificator: NotificatorThread::Running(None), + thread: None, }; - self.bot_pool.insert(bi.name.clone(), runner); + Ok(runner) + } +} - Ok(info) +/// checking if thread is not finished, otherwise clearing handler +fn clear_finished_thread(thread: Option, bi: &BotInstance) -> Option { + thread.and_then(|thread| match thread.is_finished() { + false => Some(thread), + // if finished, join it (should return immidiatly), and print cause of stop + true => { + let err = thread.join(); + error!("Thread bot `{}` finished with error: {:?}", bi.name, err); + None + } + }) +} + +// sets NotificatorThread to Done if running thread returned Ok(...) +fn check_notificator_done(n: NotificatorThread) -> NotificatorThread { + match n { + NotificatorThread::Running(Some(thread)) if thread.is_finished() => { + match thread.join() { + // if thread returns Ok(_), then do not run it again + Ok(result) if result.is_ok() => NotificatorThread::Done, + + // but try to restart, if returned an error + Ok(result) => { + error!("Notificator thread returned error: {result:?}"); + NotificatorThread::Running(None) + } + Err(panicerr) => { + error!("Notificator thread paniced: {panicerr:?}"); + NotificatorThread::Running(None) + } + } + } + other => other, } } @@ -181,20 +213,13 @@ async fn script_handler_gen( handler } -pub async fn spawn_bot_thread( - bot: Bot, - mut db: DB, - handler: BotHandler, -) -> BotResult>> { +pub async fn spawn_bot_thread(bot: Bot, mut db: DB, handler: BotHandler) -> BotResult { let state_mgr = MongodbStorage::from_db(&mut db, Json) .await .map_err(DbError::from)?; let thread = std::thread::spawn(move || -> BotResult<()> { let state_mgr = state_mgr; - // let rt = tokio::runtime::Builder::new_current_thread() - // .enable_all() - // .build()?; let rt = tokio::runtime::Runtime::new()?; rt.block_on( @@ -210,9 +235,7 @@ pub async fn spawn_bot_thread( Ok(thread) } -pub async fn spawn_notificator_thread( - mut c: BotController, -) -> BotResult>> { +pub async fn spawn_notificator_thread(mut c: BotController) -> BotResult { let thread = std::thread::spawn(move || -> BotResult<()> { let rt = tokio::runtime::Runtime::new()?;