From de68e41725a85ed6a0022001d610d1c0deef9bfe Mon Sep 17 00:00:00 2001 From: Akulij Date: Mon, 2 Jun 2025 16:49:05 +0500 Subject: [PATCH] create notificator --- src/bot_manager.rs | 51 +++++++++++ src/botscript.rs | 221 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 267 insertions(+), 5 deletions(-) diff --git a/src/bot_manager.rs b/src/bot_manager.rs index 2806fd9..4377e9d 100644 --- a/src/bot_manager.rs +++ b/src/bot_manager.rs @@ -12,6 +12,7 @@ use teloxide::{ dispatching::dialogue::serializer::Json, dptree, prelude::{Dispatcher, Requester}, + types::{ChatId, UserId}, Bot, }; use tokio::runtime::Handle; @@ -19,6 +20,7 @@ use tokio::runtime::Handle; use crate::{ bot_handler::{script_handler, BotHandler}, db::{bots::BotInstance, DbError, DB}, + message_answerer::MessageAnswerer, mongodb_storage::MongodbStorage, BotController, BotError, BotResult, BotRuntime, }; @@ -26,9 +28,15 @@ use crate::{ pub struct BotRunner { controller: BotController, info: BotInfo, + notificator: NotificatorThread, thread: Option>>, } +pub enum NotificatorThread { + Running(Option>>), + Done, +} + #[derive(Clone)] pub struct BotInfo { pub name: String, @@ -149,6 +157,9 @@ where let thread = spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?; + println!("Starting notificator"); + let notificator = spawn_notificator_thread(controller.clone()).await?; + let notificator = NotificatorThread::Running(Some(notificator)); let info = BotInfo { name: bi.name.clone(), @@ -156,6 +167,7 @@ where let runner = BotRunner { controller, info: info.clone(), + notificator, thread: Some(thread), }; @@ -207,3 +219,42 @@ pub async fn spawn_bot_thread( Ok(thread) } + +pub async fn spawn_notificator_thread( + mut c: BotController, +) -> BotResult>> { + let thread = std::thread::spawn(move || -> BotResult<()> { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + loop { + let r = c.runtime.lock().unwrap(); + let notifications = r.rc.get_nearest_notifications(); + drop(r); // unlocking mutex + + match notifications { + Some(n) => { + // waiting time to send notification + println!("Will send notification after {:?}", n.wait_for()); + println!("Notifications: {:#?}", n.notifications()); + tokio::time::sleep(n.wait_for()).await; + 'n: for n in n.notifications().into_iter() { + for user in n.get_users(&c.db).await?.into_iter() { + let text = match n.resolve_message(&c.db, &user).await? { + Some(text) => text, + None => continue 'n, + }; + + let ma = MessageAnswerer::new(&c.bot, &mut c.db, user.id); + ma.answer_text(text.clone(), None).await?; + } + } + } + None => break Ok(()), + } + } + }) + }); + + Ok(thread) +} diff --git a/src/botscript.rs b/src/botscript.rs index 9c0a9ab..ad745af 100644 --- a/src/botscript.rs +++ b/src/botscript.rs @@ -5,14 +5,14 @@ use std::sync::{Arc, Mutex, PoisonError}; use std::time::Duration; use crate::db::raw_calls::RawCallError; -use crate::db::{CallDB, DbError, DB}; +use crate::db::{CallDB, DbError, User, DB}; use crate::utils::parcelable::{ParcelType, Parcelable, ParcelableError, ParcelableResult}; -use chrono::{NaiveTime, ParseError, Timelike}; +use chrono::{DateTime, Days, NaiveTime, ParseError, TimeDelta, Timelike, Utc}; use db::attach_db_obj; use futures::future::join_all; use futures::lock::MutexGuard; use itertools::Itertools; -use quickjs_rusty::serde::from_js; +use quickjs_rusty::serde::{from_js, to_js}; use quickjs_rusty::utils::create_empty_object; use quickjs_rusty::utils::create_string; use quickjs_rusty::ContextError; @@ -274,6 +274,7 @@ pub struct BotConfig { /// relative to UTC, for e.g., /// timezone = 3 will be UTC+3, /// timezone =-2 will be UTC-2, + #[serde(default)] timezone: i8, } @@ -606,10 +607,56 @@ impl Parcelable for BotDialog { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(untagged)] pub enum NotificationTime { - Delta(isize), + Delta { + #[serde(default)] + delta_hours: u32, + #[serde(default)] + delta_minutes: u32, + }, Specific(SpecificTime), } +impl NotificationTime { + pub fn when_next(&self, start_time: &DateTime, now: &DateTime) -> DateTime { + let now = *now; + match self { + NotificationTime::Delta { + delta_hours, + delta_minutes, + } => { + let delta = TimeDelta::minutes((delta_minutes + delta_hours * 60).into()); + println!("Delta: {delta}"); + + let mut estimation = *start_time; + // super non-optimal, but fun :) + loop { + println!("Adding delta to estimation"); + if estimation < now + Duration::from_secs(1) { + estimation += delta; + } else { + break estimation; + } + } + } + NotificationTime::Specific(time) => { + let estimation = now; + let estimation = estimation.with_hour(time.hour.into()).unwrap_or(estimation); + let mut estimation = estimation + .with_minute(time.minutes.into()) + .unwrap_or(estimation); + // super non-optimal, but fun :) + loop { + if estimation < now { + estimation = estimation + Days::new(1); + } else { + break estimation; + } + } + } + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(try_from = "SpecificTimeFormat")] pub struct SpecificTime { @@ -652,11 +699,42 @@ pub enum NotificationFilter { #[serde(rename = "all")] All, /// Send to randomly selected N people - Random { random: u8 }, + Random { random: u32 }, /// Function that returns list of user id's who should get notification BotFunction(BotFunction), } +impl NotificationFilter { + pub async fn get_users(&self, db: &DB) -> ScriptResult> { + match self { + NotificationFilter::All => Ok(db.get_users().await?), + NotificationFilter::Random { random } => Ok(db.get_random_users(*random).await?), + NotificationFilter::BotFunction(f) => { + let users = f.call()?; + let users = from_js(f.context().unwrap(), &users)?; + Ok(users) + } + } + } +} + +impl Parcelable for NotificationFilter { + fn get_field(&mut self, name: &str) -> ParcelableResult> { + todo!() + } + + fn resolve(&mut self) -> ParcelableResult> + where + Self: Sized + 'static, + { + match self { + NotificationFilter::All => Ok(ParcelType::Other(())), + NotificationFilter::Random { .. } => Ok(ParcelType::Other(())), + NotificationFilter::BotFunction(f) => Ok(Parcelable::<_>::resolve(f)?), + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(untagged)] pub enum NotificationMessage { @@ -670,6 +748,38 @@ pub enum NotificationMessage { BotFunction(BotFunction), } +impl Parcelable for NotificationMessage { + fn get_field(&mut self, name: &str) -> ParcelableResult> { + todo!() + } + + fn resolve(&mut self) -> ParcelableResult> + where + Self: Sized + 'static, + { + match self { + NotificationMessage::Literal { .. } => Ok(ParcelType::Other(())), + NotificationMessage::Text { .. } => Ok(ParcelType::Other(())), + NotificationMessage::BotFunction(f) => Ok(f.resolve()?), + } + } +} + +impl NotificationMessage { + pub async fn resolve(&self, db: &DB, user: &User) -> ScriptResult> { + match self { + NotificationMessage::Literal { literal } => Ok(db.get_literal_value(literal).await?), + NotificationMessage::Text { text } => Ok(Some(text.to_string())), + NotificationMessage::BotFunction(f) => { + let jsuser = to_js(f.context().expect("Function is not js"), user).unwrap(); + let text = f.call_args(vec![jsuser])?; + let text = from_js(f.context().unwrap(), &text)?; + Ok(text) + } + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct BotNotification { time: NotificationTime, @@ -678,12 +788,75 @@ pub struct BotNotification { message: NotificationMessage, } +impl Parcelable for BotNotification { + fn get_field(&mut self, name: &str) -> ParcelableResult> { + match name { + "filter" => Ok(Parcelable::<_>::resolve(&mut self.filter)?), + "message" => Ok(Parcelable::::resolve(&mut self.message)?), + field => Err(ParcelableError::FieldError(format!( + "tried to get field {field}, but this field does not exists or private" + ))), + } + } +} + +impl BotNotification { + pub fn left_time(&self, start_time: &DateTime, now: &DateTime) -> Duration { + let next = self.time.when_next(start_time, now); + + // immidate notification if time to do it passed + let duration = (next - now).to_std().unwrap_or(Duration::from_secs(1)); + + println!("Left time: {duration:?}"); + // Rounding partitions of seconds + Duration::from_secs(duration.as_secs()) + } + + pub async fn get_users(&self, db: &DB) -> ScriptResult> { + self.filter.get_users(db).await + } + pub async fn resolve_message(&self, db: &DB, user: &User) -> ScriptResult> { + self.message.resolve(db, user).await + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RunnerConfig { config: BotConfig, pub dialog: BotDialog, #[serde(default)] notifications: Vec, + #[serde(skip)] + created_at: ConfigCreatedAt, +} + +#[derive(Debug, Clone)] +struct ConfigCreatedAt { + at: DateTime, +} + +impl Default for ConfigCreatedAt { + fn default() -> Self { + Self { + at: chrono::offset::Utc::now(), + } + } +} + +#[derive(Debug, Clone)] +pub struct NotificationBlock { + wait_for: Duration, + notifications: Vec, +} + +impl NotificationBlock { + pub fn wait_for(&self) -> Duration { + self.wait_for + } + + pub fn notifications(&self) -> &[BotNotification] { + &self.notifications + } } impl RunnerConfig { @@ -699,12 +872,50 @@ impl RunnerConfig { bm.map(|bm| bm.fill_literal(callback.to_string())) } + + pub fn created_at(&self) -> DateTime { + self.created_at.at + TimeDelta::try_hours(self.config.timezone.into()).unwrap() + } + + /// if None is returned, then garanteed that later calls will also return None, + /// so, if you'll get None, no notifications will be provided later + pub fn get_nearest_notifications(&self) -> Option { + let start_time = self.created_at(); + let now = + chrono::offset::Utc::now() + TimeDelta::try_hours(self.config.timezone.into()).unwrap(); + + let ordered = self + .notifications + .iter() + .filter(|f| f.left_time(&start_time, &now) > Duration::from_secs(1)) + .sorted_by_key(|f| f.left_time(&start_time, &now)) + .collect::>(); + println!("Orederd: {:#?}", ordered); + + let left = match ordered.first() { + Some(notification) => notification.left_time(&start_time, &now), + // No notifications provided + None => return None, + }; + // get all that should be sent at the same time + let notifications = ordered + .into_iter() + .filter(|n| n.left_time(&start_time, &now) == left) + .cloned() + .collect::>(); + + Some(NotificationBlock { + wait_for: left, + notifications, + }) + } } impl Parcelable for RunnerConfig { fn get_field(&mut self, name: &str) -> Result, ParcelableError> { match name { "dialog" => Ok(ParcelType::Parcelable(&mut self.dialog)), + "notifications" => Ok(ParcelType::Parcelable(&mut self.notifications)), field => Err(ParcelableError::FieldError(format!( "tried to get field {field}, but this field does not exists or private" ))),