Compare commits

...

7 Commits

Author SHA1 Message Date
Akulij
d3495c9a44 refactor bot manager
All checks were successful
Build && Deploy / cargo build (push) Successful in 1m2s
2025-06-08 18:12:16 +05:00
Akulij
3d60f31d99 optimize algorithm in NotificationTime 2025-06-08 17:52:18 +05:00
Akulij
f16554b764 fix: use separate db for tests 2025-06-08 15:38:58 +05:00
Akulij
e239ff1c44 fix test_get_random_users 2025-06-08 15:38:51 +05:00
Akulij
2fefe22846 fix test_drop_media 2025-06-08 15:35:50 +05:00
Akulij
e3e8a0cf79 refactor /src/botscript/application.rs 2025-06-08 15:05:41 +05:00
Akulij
b86a8f4a52 impl ScriptError::from for types BotError and PoisonError 2025-06-08 15:01:14 +05:00
5 changed files with 171 additions and 119 deletions

View File

@ -17,15 +17,18 @@ use crate::{
BotController, BotResult, BotRuntime,
};
pub type BotThread = JoinHandle<BotResult<()>>;
pub struct BotRunner {
controller: BotController,
info: BotInfo,
notificator: NotificatorThread,
thread: Option<JoinHandle<BotResult<()>>>,
thread: Option<BotThread>,
}
#[derive(Debug)]
pub enum NotificatorThread {
Running(Option<JoinHandle<BotResult<()>>>),
Running(Option<BotThread>),
Done,
}
@ -72,7 +75,7 @@ where
pub async fn dispatch(mut self, db: &mut DB) -> BotResult<()> {
loop {
'biter: for bi in (self.bi_getter)().await {
for bi in (self.bi_getter)().await {
// removing handler to force restart
// TODO: wait till all updates are processed in bot
// Temporarly disabling code, because it's free of js runtime
@ -88,26 +91,12 @@ where
let mut bot_runner = match self.bot_pool.remove(&bi.name) {
Some(br) => br,
None => {
let handlers = (self.h_mapper)(bi.clone()).await;
info!("NEW INSTANCE: Starting new instance! bot name: {}", bi.name);
self.start_bot(bi, db, handlers.collect()).await?;
continue 'biter;
self.create_bot_runner(&bi, db).await?
}
};
// checking if thread is not finished, otherwise clearing handler
bot_runner.thread = match bot_runner.thread {
Some(thread) => {
if thread.is_finished() {
let err = thread.join();
error!("Thread bot `{}` finished with error: {:?}", bi.name, err);
None
} else {
Some(thread)
}
}
None => None,
};
bot_runner.thread = clear_finished_thread(bot_runner.thread, &bi);
// checking if thread is running, otherwise start thread
bot_runner.thread = match bot_runner.thread {
@ -129,41 +118,84 @@ where
)
}
};
bot_runner.notificator = check_notificator_done(bot_runner.notificator);
bot_runner.notificator = match bot_runner.notificator {
NotificatorThread::Done => NotificatorThread::Done,
NotificatorThread::Running(thread) => {
NotificatorThread::Running(match thread {
Some(thread) => Some(thread),
None => {
let thread =
spawn_notificator_thread(bot_runner.controller.clone()).await?;
Some(thread)
}
})
}
};
self.bot_pool.insert(bi.name.clone(), bot_runner);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
pub async fn start_bot(
pub async fn create_bot_runner(
&mut self,
bi: BotInstance,
bi: &BotInstance,
db: &mut DB,
plug_handlers: Vec<BotHandler>,
) -> BotResult<BotInfo> {
) -> BotResult<BotRunner> {
let db = db.clone().with_name(bi.name.clone());
let controller = BotController::with_db(db.clone(), &bi.token, &bi.script).await?;
let handler = script_handler_gen(controller.runtime.clone(), plug_handlers).await;
let thread =
spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?;
let notificator = spawn_notificator_thread(controller.clone()).await?;
let notificator = NotificatorThread::Running(Some(notificator));
let info = BotInfo {
name: bi.name.clone(),
};
let runner = BotRunner {
controller,
info: info.clone(),
notificator,
thread: Some(thread),
info,
notificator: NotificatorThread::Running(None),
thread: None,
};
self.bot_pool.insert(bi.name.clone(), runner);
Ok(runner)
}
}
Ok(info)
/// checking if thread is not finished, otherwise clearing handler
fn clear_finished_thread(thread: Option<BotThread>, bi: &BotInstance) -> Option<BotThread> {
thread.and_then(|thread| match thread.is_finished() {
false => Some(thread),
// if finished, join it (should return immidiatly), and print cause of stop
true => {
let err = thread.join();
error!("Thread bot `{}` finished with error: {:?}", bi.name, err);
None
}
})
}
// sets NotificatorThread to Done if running thread returned Ok(...)
fn check_notificator_done(n: NotificatorThread) -> NotificatorThread {
match n {
NotificatorThread::Running(Some(thread)) if thread.is_finished() => {
match thread.join() {
// if thread returns Ok(_), then do not run it again
Ok(result) if result.is_ok() => NotificatorThread::Done,
// but try to restart, if returned an error
Ok(result) => {
error!("Notificator thread returned error: {result:?}");
NotificatorThread::Running(None)
}
Err(panicerr) => {
error!("Notificator thread paniced: {panicerr:?}");
NotificatorThread::Running(None)
}
}
}
other => other,
}
}
@ -181,20 +213,13 @@ async fn script_handler_gen(
handler
}
pub async fn spawn_bot_thread(
bot: Bot,
mut db: DB,
handler: BotHandler,
) -> BotResult<JoinHandle<BotResult<()>>> {
pub async fn spawn_bot_thread(bot: Bot, mut db: DB, handler: BotHandler) -> BotResult<BotThread> {
let state_mgr = MongodbStorage::from_db(&mut db, Json)
.await
.map_err(DbError::from)?;
let thread = std::thread::spawn(move || -> BotResult<()> {
let state_mgr = state_mgr;
// let rt = tokio::runtime::Builder::new_current_thread()
// .enable_all()
// .build()?;
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(
@ -210,9 +235,7 @@ pub async fn spawn_bot_thread(
Ok(thread)
}
pub async fn spawn_notificator_thread(
mut c: BotController,
) -> BotResult<JoinHandle<BotResult<()>>> {
pub async fn spawn_notificator_thread(mut c: BotController) -> BotResult<BotThread> {
let thread = std::thread::spawn(move || -> BotResult<()> {
let rt = tokio::runtime::Runtime::new()?;

View File

@ -2,14 +2,14 @@ pub mod application;
pub mod db;
pub mod message_info;
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::{Mutex, PoisonError};
use std::time::Duration;
use crate::db::raw_calls::RawCallError;
use crate::db::{CallDB, DbError, User, DB};
use crate::message_answerer::MessageAnswererError;
use crate::notify_admin;
use crate::utils::parcelable::{ParcelType, Parcelable, ParcelableError, ParcelableResult};
use crate::{notify_admin, BotError};
use chrono::{DateTime, Days, NaiveTime, ParseError, TimeDelta, Timelike, Utc};
use db::attach_db_obj;
use futures::future::join_all;
@ -48,6 +48,23 @@ pub enum ScriptError {
MutexError(String),
#[error("can't send message to user to user: {0:?}")]
MAError(#[from] MessageAnswererError),
#[error("other script error: {0:?}")]
Other(String),
}
impl From<BotError> for ScriptError {
fn from(value: BotError) -> Self {
match value {
crate::BotError::DBError(db_error) => ScriptError::DBError(db_error),
error => ScriptError::Other(format!("BotError: {error}")),
}
}
}
impl<T> From<PoisonError<T>> for ScriptError {
fn from(value: PoisonError<T>) -> Self {
Self::MutexError(format!("Can't lock Mutex in script, err: {}", value))
}
}
#[derive(thiserror::Error, Debug)]
@ -671,8 +688,7 @@ pub enum NotificationTime {
}
impl NotificationTime {
pub fn when_next(&self, start_time: &DateTime<Utc>, now: &DateTime<Utc>) -> DateTime<Utc> {
let now = *now;
pub fn when_next(&self, start_time: DateTime<Utc>, now: DateTime<Utc>) -> DateTime<Utc> {
match self {
NotificationTime::Delta {
delta_hours,
@ -680,15 +696,15 @@ impl NotificationTime {
} => {
let delta = TimeDelta::minutes((delta_minutes + delta_hours * 60).into());
let mut estimation = *start_time;
// super non-optimal, but fun :)
loop {
if estimation < now + Duration::from_secs(1) {
estimation += delta;
} else {
break estimation;
}
}
let secs_period = delta.num_seconds();
if secs_period == 0 {
return now;
};
let diff = now - start_time;
let passed = diff.num_seconds().abs() % secs_period;
now - Duration::from_secs(passed as u64) + delta
}
NotificationTime::Specific(time) => {
let estimation = now;
@ -696,13 +712,11 @@ impl NotificationTime {
let mut estimation = estimation
.with_minute(time.minutes.into())
.unwrap_or(estimation);
// super non-optimal, but fun :)
loop {
if estimation < now {
estimation = estimation + Days::new(1);
} else {
break estimation;
}
if estimation < now {
estimation + Days::new(1)
} else {
estimation
}
}
}
@ -861,11 +875,11 @@ impl Parcelable<BotFunction> for BotNotification {
}
impl BotNotification {
pub fn left_time(&self, start_time: &DateTime<Utc>, now: &DateTime<Utc>) -> Duration {
pub fn left_time(&self, start_time: DateTime<Utc>, now: DateTime<Utc>) -> Duration {
let next = self.time.when_next(start_time, now);
// immidate notification if time to do it passed
let duration = (next - now).to_std().unwrap_or(Duration::from_secs(1));
let duration = (next - now).to_std().unwrap_or(Duration::from_secs(0));
// Rounding partitions of seconds
Duration::from_secs(duration.as_secs())
@ -972,19 +986,19 @@ impl RunnerConfig {
let ordered = self
.notifications
.iter()
.filter(|f| f.left_time(&start_time, &now) > Duration::from_secs(1))
.sorted_by_key(|f| f.left_time(&start_time, &now))
.filter(|f| f.left_time(start_time, now) > Duration::from_secs(1))
.sorted_by_key(|f| f.left_time(start_time, now))
.collect::<Vec<_>>();
let left = match ordered.first() {
Some(notification) => notification.left_time(&start_time, &now),
Some(notification) => notification.left_time(start_time, now),
// No notifications provided
None => return None,
};
// get all that should be sent at the same time
let notifications = ordered
.into_iter()
.filter(|n| n.left_time(&start_time, &now) == left)
.filter(|n| n.left_time(start_time, now) == left)
.cloned()
.collect::<Vec<_>>();
@ -1172,7 +1186,7 @@ mod tests {
let start_time = chrono::offset::Utc::now();
// let start_time = chrono::offset::Utc::now() + TimeDelta::try_hours(5).unwrap();
let start_time = start_time.with_hour(13).unwrap().with_minute(23).unwrap();
let left = n.left_time(&start_time, &start_time);
let left = n.left_time(start_time, start_time);
let secs = left.as_secs();
let minutes = secs / 60;
let hours = minutes / 60;
@ -1190,4 +1204,37 @@ mod tests {
assert_eq!(left, should_left)
}
#[test]
fn test_notification_time_nextday() {
let botn = json!({
"time": "11:00",
"filter": {"random": 2},
"message": {"text": "some"},
});
let n: BotNotification = serde_json::from_value(botn).unwrap();
println!("BotNotification: {n:#?}");
let start_time = chrono::offset::Utc::now();
// let start_time = chrono::offset::Utc::now() + TimeDelta::try_hours(5).unwrap();
let start_time = start_time.with_hour(13).unwrap().with_minute(23).unwrap();
let left = n.left_time(start_time, start_time);
let secs = left.as_secs();
let minutes = secs / 60;
let hours = minutes / 60;
let minutes = minutes % 60;
println!("Left: {hours}:{minutes}");
let when_should = chrono::offset::Utc::now()
.with_hour(11)
.unwrap()
.with_minute(00)
.unwrap();
let should_left = (when_should + TimeDelta::days(1) - start_time)
.to_std()
.unwrap();
let should_left = Duration::from_secs(should_left.as_secs());
assert_eq!(left, should_left)
}
}

View File

@ -1,8 +1,3 @@
// just keeping track locks in asychronous calls
#![allow(clippy::await_holding_lock)]
use std::sync::RwLock;
use log::info;
use quickjs_rusty::{context::Context, serde::from_js, OwnedJsObject};
use teloxide::Bot;
use tokio::runtime::Handle;
@ -18,58 +13,41 @@ use super::ScriptError;
pub fn attach_user_application(
c: &Context,
o: &mut OwnedJsObject,
db: &DB,
bot: &Bot,
db: DB,
bot: Bot,
) -> Result<(), ScriptError> {
let db: std::sync::Arc<RwLock<DB>> = std::sync::Arc::new(RwLock::new(db.clone()));
let bot: std::sync::Arc<RwLock<Bot>> = std::sync::Arc::new(RwLock::new(bot.clone()));
// To guarantee that closure is valid if thread panics
let db: std::sync::Mutex<DB> = std::sync::Mutex::new(db);
let bot: std::sync::Mutex<Bot> = std::sync::Mutex::new(bot);
let user_application =
c.create_callback(move |q: OwnedJsObject| -> Result<_, ScriptError> {
let bot1 = bot.clone();
let bot1 = bot1.read().expect("Can't read lock bot");
let bot2 = bot.read().expect("Can't read lock bot");
let mut db = { db.lock().map_err(ScriptError::from)?.clone() };
let bot = { bot.lock().map_err(ScriptError::from)?.clone() };
let user: teloxide::types::User = match from_js(q.context(), &q) {
Ok(q) => q,
Err(_) => todo!(),
};
let application = futures::executor::block_on(
Application::new(user.clone())
.store_db(&mut db.write().expect("Can't write lock db")),
)?;
let application =
futures::executor::block_on(Application::new(user.clone()).store_db(&mut db))?;
let db2 = db.clone();
let msg = tokio::task::block_in_place(move || {
Handle::current().block_on(async move {
send_application_to_chat(
&bot1,
&mut db2.write().expect("Can't write lock db"),
&application,
)
.await
})
let msg = tokio::task::block_in_place(|| {
Handle::current()
.block_on(async { send_application_to_chat(&bot, &mut db, &application).await })
});
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
info!("Got err: {err}");
return Err(ScriptError::MutexError("🤦‍♂️".to_string()));
}
};
let msg = msg.map_err(ScriptError::from)?;
let (chat_id, msg_id) = futures::executor::block_on(
MessageAnswerer::new(
&bot2,
&mut db.write().expect("Can't write lock db"),
user.id.0 as i64,
)
.answer("left_application_msg", None, None),
)?;
let (chat_id, msg_id) = tokio::task::block_in_place(|| {
Handle::current().block_on(async {
MessageAnswerer::new(&bot, &mut db, user.id.0 as i64)
.answer("left_application_msg", None, None)
.await
})
})?;
futures::executor::block_on(
MessageForward::new(msg.chat.id.0, msg.id.0, chat_id, msg_id, false)
.store_db(&mut db.write().expect("Can't write lock db")),
.store_db(&mut db),
)?;
let ret = true;

View File

@ -10,7 +10,7 @@ async fn setup_db() -> DB {
dotenvy::dotenv().unwrap();
let db_url = std::env::var("DATABASE_URL").unwrap();
DB::new(db_url, "gongbot".to_string()).await.unwrap()
DB::new(db_url, "tests".to_string()).await.unwrap()
}
#[tokio::test]
@ -72,6 +72,8 @@ async fn test_add_media() {
async fn test_drop_media() {
let mut db = setup_db().await;
let _result = db.drop_media("test_drop_media_literal").await.unwrap();
let _result = db
.add_media("test_drop_media_literal", "photo", "file_id_1", None)
.await
@ -177,7 +179,9 @@ async fn test_drop_media_except() {
#[tokio::test]
async fn test_get_random_users() {
let db = setup_db().await;
let mut db = setup_db().await;
let _ = db.get_or_init_user(1, "Nick").await;
let users = db.get_random_users(1).await.unwrap();
assert_eq!(users.len(), 1);

View File

@ -126,7 +126,7 @@ impl BotController {
let bot = Bot::new(token);
let mut runner = Runner::init_with_db(&mut db)?;
runner.call_attacher(|c, o| attach_user_application(c, o, &db, &bot))??;
runner.call_attacher(|c, o| attach_user_application(c, o, db.clone(), bot.clone()))??;
let rc = runner.init_config(script)?;
let runtime = Arc::new(Mutex::new(BotRuntime { rc, runner }));