dev #25

Merged
akulij merged 38 commits from dev into main 2025-06-18 17:10:44 +00:00
Showing only changes of commit d3495c9a44 - Show all commits

View File

@ -17,15 +17,18 @@ use crate::{
BotController, BotResult, BotRuntime,
};
pub type BotThread = JoinHandle<BotResult<()>>;
pub struct BotRunner {
controller: BotController,
info: BotInfo,
notificator: NotificatorThread,
thread: Option<JoinHandle<BotResult<()>>>,
thread: Option<BotThread>,
}
#[derive(Debug)]
pub enum NotificatorThread {
Running(Option<JoinHandle<BotResult<()>>>),
Running(Option<BotThread>),
Done,
}
@ -72,7 +75,7 @@ where
pub async fn dispatch(mut self, db: &mut DB) -> BotResult<()> {
loop {
'biter: for bi in (self.bi_getter)().await {
for bi in (self.bi_getter)().await {
// removing handler to force restart
// TODO: wait till all updates are processed in bot
// Temporarly disabling code, because it's free of js runtime
@ -88,26 +91,12 @@ where
let mut bot_runner = match self.bot_pool.remove(&bi.name) {
Some(br) => br,
None => {
let handlers = (self.h_mapper)(bi.clone()).await;
info!("NEW INSTANCE: Starting new instance! bot name: {}", bi.name);
self.start_bot(bi, db, handlers.collect()).await?;
continue 'biter;
self.create_bot_runner(&bi, db).await?
}
};
// checking if thread is not finished, otherwise clearing handler
bot_runner.thread = match bot_runner.thread {
Some(thread) => {
if thread.is_finished() {
let err = thread.join();
error!("Thread bot `{}` finished with error: {:?}", bi.name, err);
None
} else {
Some(thread)
}
}
None => None,
};
bot_runner.thread = clear_finished_thread(bot_runner.thread, &bi);
// checking if thread is running, otherwise start thread
bot_runner.thread = match bot_runner.thread {
@ -129,41 +118,84 @@ where
)
}
};
bot_runner.notificator = check_notificator_done(bot_runner.notificator);
bot_runner.notificator = match bot_runner.notificator {
NotificatorThread::Done => NotificatorThread::Done,
NotificatorThread::Running(thread) => {
NotificatorThread::Running(match thread {
Some(thread) => Some(thread),
None => {
let thread =
spawn_notificator_thread(bot_runner.controller.clone()).await?;
Some(thread)
}
})
}
};
self.bot_pool.insert(bi.name.clone(), bot_runner);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
pub async fn start_bot(
pub async fn create_bot_runner(
&mut self,
bi: BotInstance,
bi: &BotInstance,
db: &mut DB,
plug_handlers: Vec<BotHandler>,
) -> BotResult<BotInfo> {
) -> BotResult<BotRunner> {
let db = db.clone().with_name(bi.name.clone());
let controller = BotController::with_db(db.clone(), &bi.token, &bi.script).await?;
let handler = script_handler_gen(controller.runtime.clone(), plug_handlers).await;
let thread =
spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?;
let notificator = spawn_notificator_thread(controller.clone()).await?;
let notificator = NotificatorThread::Running(Some(notificator));
let info = BotInfo {
name: bi.name.clone(),
};
let runner = BotRunner {
controller,
info: info.clone(),
notificator,
thread: Some(thread),
info,
notificator: NotificatorThread::Running(None),
thread: None,
};
self.bot_pool.insert(bi.name.clone(), runner);
Ok(runner)
}
}
Ok(info)
/// checking if thread is not finished, otherwise clearing handler
fn clear_finished_thread(thread: Option<BotThread>, bi: &BotInstance) -> Option<BotThread> {
thread.and_then(|thread| match thread.is_finished() {
false => Some(thread),
// if finished, join it (should return immidiatly), and print cause of stop
true => {
let err = thread.join();
error!("Thread bot `{}` finished with error: {:?}", bi.name, err);
None
}
})
}
// sets NotificatorThread to Done if running thread returned Ok(...)
fn check_notificator_done(n: NotificatorThread) -> NotificatorThread {
match n {
NotificatorThread::Running(Some(thread)) if thread.is_finished() => {
match thread.join() {
// if thread returns Ok(_), then do not run it again
Ok(result) if result.is_ok() => NotificatorThread::Done,
// but try to restart, if returned an error
Ok(result) => {
error!("Notificator thread returned error: {result:?}");
NotificatorThread::Running(None)
}
Err(panicerr) => {
error!("Notificator thread paniced: {panicerr:?}");
NotificatorThread::Running(None)
}
}
}
other => other,
}
}
@ -181,20 +213,13 @@ async fn script_handler_gen(
handler
}
pub async fn spawn_bot_thread(
bot: Bot,
mut db: DB,
handler: BotHandler,
) -> BotResult<JoinHandle<BotResult<()>>> {
pub async fn spawn_bot_thread(bot: Bot, mut db: DB, handler: BotHandler) -> BotResult<BotThread> {
let state_mgr = MongodbStorage::from_db(&mut db, Json)
.await
.map_err(DbError::from)?;
let thread = std::thread::spawn(move || -> BotResult<()> {
let state_mgr = state_mgr;
// let rt = tokio::runtime::Builder::new_current_thread()
// .enable_all()
// .build()?;
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(
@ -210,9 +235,7 @@ pub async fn spawn_bot_thread(
Ok(thread)
}
pub async fn spawn_notificator_thread(
mut c: BotController,
) -> BotResult<JoinHandle<BotResult<()>>> {
pub async fn spawn_notificator_thread(mut c: BotController) -> BotResult<BotThread> {
let thread = std::thread::spawn(move || -> BotResult<()> {
let rt = tokio::runtime::Runtime::new()?;