diff --git a/mainbot.js b/mainbot.js index 1939cb5..ea6c7f2 100644 --- a/mainbot.js +++ b/mainbot.js @@ -76,8 +76,16 @@ print(JSON.stringify(dialog.buttons)) const config = { version: 1.1, + timezone: 3, } +const notifications = [ + // { + // time: "18:14", + // message: {literal: "show_projects"}, + // }, +] + // {config, dialog} -const c = { config: config, dialog: dialog } +const c = { config: config, dialog: dialog, notifications: notifications } c diff --git a/src/bot_manager.rs b/src/bot_manager.rs index 2806fd9..af45b65 100644 --- a/src/bot_manager.rs +++ b/src/bot_manager.rs @@ -12,6 +12,7 @@ use teloxide::{ dispatching::dialogue::serializer::Json, dptree, prelude::{Dispatcher, Requester}, + types::{ChatId, UserId}, Bot, }; use tokio::runtime::Handle; @@ -19,6 +20,7 @@ use tokio::runtime::Handle; use crate::{ bot_handler::{script_handler, BotHandler}, db::{bots::BotInstance, DbError, DB}, + message_answerer::MessageAnswerer, mongodb_storage::MongodbStorage, BotController, BotError, BotResult, BotRuntime, }; @@ -26,9 +28,15 @@ use crate::{ pub struct BotRunner { controller: BotController, info: BotInfo, + notificator: NotificatorThread, thread: Option>>, } +pub enum NotificatorThread { + Running(Option>>), + Done, +} + #[derive(Clone)] pub struct BotInfo { pub name: String, @@ -149,6 +157,8 @@ where let thread = spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?; + let notificator = spawn_notificator_thread(controller.clone()).await?; + let notificator = NotificatorThread::Running(Some(notificator)); let info = BotInfo { name: bi.name.clone(), @@ -156,6 +166,7 @@ where let runner = BotRunner { controller, info: info.clone(), + notificator, thread: Some(thread), }; @@ -207,3 +218,40 @@ pub async fn spawn_bot_thread( Ok(thread) } + +pub async fn spawn_notificator_thread( + mut c: BotController, +) -> BotResult>> { + let thread = std::thread::spawn(move || -> BotResult<()> { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + loop { + let r = c.runtime.lock().unwrap(); + let notifications = r.rc.get_nearest_notifications(); + drop(r); // unlocking mutex + + match notifications { + Some(n) => { + // waiting time to send notification + tokio::time::sleep(n.wait_for()).await; + 'n: for n in n.notifications().into_iter() { + for user in n.get_users(&c.db).await?.into_iter() { + let text = match n.resolve_message(&c.db, &user).await? { + Some(text) => text, + None => continue 'n, + }; + + let ma = MessageAnswerer::new(&c.bot, &mut c.db, user.id); + ma.answer_text(text.clone(), None).await?; + } + } + } + None => break Ok(()), + } + } + }) + }); + + Ok(thread) +} diff --git a/src/botscript.rs b/src/botscript.rs index fff66c9..d52a44a 100644 --- a/src/botscript.rs +++ b/src/botscript.rs @@ -2,15 +2,17 @@ pub mod application; pub mod db; use std::collections::HashMap; use std::sync::{Arc, Mutex, PoisonError}; +use std::time::Duration; use crate::db::raw_calls::RawCallError; -use crate::db::{CallDB, DbError, DB}; +use crate::db::{CallDB, DbError, User, DB}; use crate::utils::parcelable::{ParcelType, Parcelable, ParcelableError, ParcelableResult}; +use chrono::{DateTime, Days, NaiveTime, ParseError, TimeDelta, Timelike, Utc}; use db::attach_db_obj; use futures::future::join_all; use futures::lock::MutexGuard; use itertools::Itertools; -use quickjs_rusty::serde::from_js; +use quickjs_rusty::serde::{from_js, to_js}; use quickjs_rusty::utils::create_empty_object; use quickjs_rusty::utils::create_string; use quickjs_rusty::ContextError; @@ -269,6 +271,11 @@ fn print(s: String) { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct BotConfig { version: f64, + /// relative to UTC, for e.g., + /// timezone = 3 will be UTC+3, + /// timezone =-2 will be UTC-2, + #[serde(default)] + timezone: i8, } pub trait ResolveValue { @@ -597,10 +604,256 @@ impl Parcelable for BotDialog { } } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum NotificationTime { + Delta { + #[serde(default)] + delta_hours: u32, + #[serde(default)] + delta_minutes: u32, + }, + Specific(SpecificTime), +} + +impl NotificationTime { + pub fn when_next(&self, start_time: &DateTime, now: &DateTime) -> DateTime { + let now = *now; + match self { + NotificationTime::Delta { + delta_hours, + delta_minutes, + } => { + let delta = TimeDelta::minutes((delta_minutes + delta_hours * 60).into()); + + let mut estimation = *start_time; + // super non-optimal, but fun :) + loop { + if estimation < now + Duration::from_secs(1) { + estimation += delta; + } else { + break estimation; + } + } + } + NotificationTime::Specific(time) => { + let estimation = now; + let estimation = estimation.with_hour(time.hour.into()).unwrap_or(estimation); + let mut estimation = estimation + .with_minute(time.minutes.into()) + .unwrap_or(estimation); + // super non-optimal, but fun :) + loop { + if estimation < now { + estimation = estimation + Days::new(1); + } else { + break estimation; + } + } + } + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(try_from = "SpecificTimeFormat")] +pub struct SpecificTime { + hour: u8, + minutes: u8, +} + +impl SpecificTime { + pub fn new(hour: u8, minutes: u8) -> Self { + Self { hour, minutes } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum SpecificTimeFormat { + String(String), + Verbose { hour: u8, minutes: u8 }, +} + +impl TryFrom for SpecificTime { + type Error = ParseError; + + fn try_from(stf: SpecificTimeFormat) -> Result { + match stf { + SpecificTimeFormat::Verbose { hour, minutes } => Ok(Self::new(hour, minutes)), + SpecificTimeFormat::String(timestring) => { + let time: NaiveTime = timestring.parse()?; + + Ok(Self::new(time.hour() as u8, time.minute() as u8)) + } + } + } +} + +#[derive(Serialize, Deserialize, Default, Debug, Clone)] +#[serde(untagged)] +pub enum NotificationFilter { + #[default] + #[serde(rename = "all")] + All, + /// Send to randomly selected N people + Random { random: u32 }, + /// Function that returns list of user id's who should get notification + BotFunction(BotFunction), +} + +impl NotificationFilter { + pub async fn get_users(&self, db: &DB) -> ScriptResult> { + match self { + NotificationFilter::All => Ok(db.get_users().await?), + NotificationFilter::Random { random } => Ok(db.get_random_users(*random).await?), + NotificationFilter::BotFunction(f) => { + let users = f.call()?; + let users = from_js(f.context().unwrap(), &users)?; + Ok(users) + } + } + } +} + +impl Parcelable for NotificationFilter { + fn get_field(&mut self, name: &str) -> ParcelableResult> { + todo!() + } + + fn resolve(&mut self) -> ParcelableResult> + where + Self: Sized + 'static, + { + match self { + NotificationFilter::All => Ok(ParcelType::Other(())), + NotificationFilter::Random { .. } => Ok(ParcelType::Other(())), + NotificationFilter::BotFunction(f) => Ok(Parcelable::<_>::resolve(f)?), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum NotificationMessage { + Literal { + literal: String, + }, + Text { + text: String, + }, + /// Function can accept user which will be notified and then return generated message + BotFunction(BotFunction), +} + +impl Parcelable for NotificationMessage { + fn get_field(&mut self, name: &str) -> ParcelableResult> { + todo!() + } + + fn resolve(&mut self) -> ParcelableResult> + where + Self: Sized + 'static, + { + match self { + NotificationMessage::Literal { .. } => Ok(ParcelType::Other(())), + NotificationMessage::Text { .. } => Ok(ParcelType::Other(())), + NotificationMessage::BotFunction(f) => Ok(f.resolve()?), + } + } +} + +impl NotificationMessage { + pub async fn resolve(&self, db: &DB, user: &User) -> ScriptResult> { + match self { + NotificationMessage::Literal { literal } => Ok(db.get_literal_value(literal).await?), + NotificationMessage::Text { text } => Ok(Some(text.to_string())), + NotificationMessage::BotFunction(f) => { + let jsuser = to_js(f.context().expect("Function is not js"), user).unwrap(); + let text = f.call_args(vec![jsuser])?; + let text = from_js(f.context().unwrap(), &text)?; + Ok(text) + } + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct BotNotification { + time: NotificationTime, + #[serde(default)] + filter: NotificationFilter, + message: NotificationMessage, +} + +impl Parcelable for BotNotification { + fn get_field(&mut self, name: &str) -> ParcelableResult> { + match name { + "filter" => Ok(Parcelable::<_>::resolve(&mut self.filter)?), + "message" => Ok(Parcelable::::resolve(&mut self.message)?), + field => Err(ParcelableError::FieldError(format!( + "tried to get field {field}, but this field does not exists or private" + ))), + } + } +} + +impl BotNotification { + pub fn left_time(&self, start_time: &DateTime, now: &DateTime) -> Duration { + let next = self.time.when_next(start_time, now); + + // immidate notification if time to do it passed + let duration = (next - now).to_std().unwrap_or(Duration::from_secs(1)); + + // Rounding partitions of seconds + Duration::from_secs(duration.as_secs()) + } + + pub async fn get_users(&self, db: &DB) -> ScriptResult> { + self.filter.get_users(db).await + } + pub async fn resolve_message(&self, db: &DB, user: &User) -> ScriptResult> { + self.message.resolve(db, user).await + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct RunnerConfig { config: BotConfig, pub dialog: BotDialog, + #[serde(default)] + notifications: Vec, + #[serde(skip)] + created_at: ConfigCreatedAt, +} + +#[derive(Debug, Clone)] +struct ConfigCreatedAt { + at: DateTime, +} + +impl Default for ConfigCreatedAt { + fn default() -> Self { + Self { + at: chrono::offset::Utc::now(), + } + } +} + +#[derive(Debug, Clone)] +pub struct NotificationBlock { + wait_for: Duration, + notifications: Vec, +} + +impl NotificationBlock { + pub fn wait_for(&self) -> Duration { + self.wait_for + } + + pub fn notifications(&self) -> &[BotNotification] { + &self.notifications + } } impl RunnerConfig { @@ -616,12 +869,49 @@ impl RunnerConfig { bm.map(|bm| bm.fill_literal(callback.to_string())) } + + pub fn created_at(&self) -> DateTime { + self.created_at.at + TimeDelta::try_hours(self.config.timezone.into()).unwrap() + } + + /// if None is returned, then garanteed that later calls will also return None, + /// so, if you'll get None, no notifications will be provided later + pub fn get_nearest_notifications(&self) -> Option { + let start_time = self.created_at(); + let now = + chrono::offset::Utc::now() + TimeDelta::try_hours(self.config.timezone.into()).unwrap(); + + let ordered = self + .notifications + .iter() + .filter(|f| f.left_time(&start_time, &now) > Duration::from_secs(1)) + .sorted_by_key(|f| f.left_time(&start_time, &now)) + .collect::>(); + + let left = match ordered.first() { + Some(notification) => notification.left_time(&start_time, &now), + // No notifications provided + None => return None, + }; + // get all that should be sent at the same time + let notifications = ordered + .into_iter() + .filter(|n| n.left_time(&start_time, &now) == left) + .cloned() + .collect::>(); + + Some(NotificationBlock { + wait_for: left, + notifications, + }) + } } impl Parcelable for RunnerConfig { fn get_field(&mut self, name: &str) -> Result, ParcelableError> { match name { "dialog" => Ok(ParcelType::Parcelable(&mut self.dialog)), + "notifications" => Ok(ParcelType::Parcelable(&mut self.notifications)), field => Err(ParcelableError::FieldError(format!( "tried to get field {field}, but this field does not exists or private" ))), @@ -707,6 +997,7 @@ impl Runner { #[allow(clippy::print_stdout)] mod tests { use quickjs_rusty::{serde::from_js, OwnedJsObject}; + use serde_json::json; use super::*; @@ -785,4 +1076,59 @@ mod tests { panic!("test returned an error, but the wrong one, {errstr}") } } + + #[test] + fn test_notification_struct() { + let botn = json!({ + "time": "18:00", + "filter": {"random": 2}, + "message": {"text": "some"}, + }); + let n: BotNotification = serde_json::from_value(botn).unwrap(); + println!("BotNotification: {n:#?}"); + assert!(matches!(n.time, NotificationTime::Specific(..))); + let time = if let NotificationTime::Specific(st) = n.time { + st + } else { + unreachable!() + }; + assert_eq!( + time, + SpecificTime { + hour: 18, + minutes: 00 + } + ); + } + + #[test] + fn test_notification_time() { + let botn = json!({ + "time": "18:00", + "filter": {"random": 2}, + "message": {"text": "some"}, + }); + let n: BotNotification = serde_json::from_value(botn).unwrap(); + println!("BotNotification: {n:#?}"); + let start_time = chrono::offset::Utc::now(); + // let start_time = chrono::offset::Utc::now() + TimeDelta::try_hours(5).unwrap(); + let start_time = start_time.with_hour(13).unwrap().with_minute(23).unwrap(); + let left = n.left_time(&start_time, &start_time); + let secs = left.as_secs(); + let minutes = secs / 60; + let hours = minutes / 60; + let minutes = minutes % 60; + println!("Left: {hours}:{minutes}"); + + let when_should = chrono::offset::Utc::now() + .with_hour(18) + .unwrap() + .with_minute(00) + .unwrap(); + + let should_left = (when_should - start_time).to_std().unwrap(); + let should_left = Duration::from_secs(should_left.as_secs()); + + assert_eq!(left, should_left) + } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 1e5882f..508bdbe 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -11,6 +11,7 @@ use chrono::{DateTime, Utc}; use enum_stringify::EnumStringify; use futures::stream::TryStreamExt; +use futures::StreamExt; use mongodb::options::IndexOptions; use mongodb::{bson::doc, options::ClientOptions, Client}; use mongodb::{Collection, Database, IndexModel}; @@ -212,6 +213,10 @@ impl CallDB for DB { async fn get_database(&mut self) -> Database { self.client.database(&self.name) } + + async fn get_database_immut(&self) -> Database { + self.client.database(&self.name) + } } impl GetCollection for T { @@ -226,6 +231,8 @@ impl GetCollection for T { pub enum DbError { #[error("error while processing mongodb query: {0}")] MongodbError(#[from] mongodb::error::Error), + #[error("error while coverting values: {0}")] + SerdeJsonError(#[from] serde_json::error::Error), } pub type DbResult = Result; @@ -233,14 +240,35 @@ pub type DbResult = Result; pub trait CallDB { //type C; async fn get_database(&mut self) -> Database; + async fn get_database_immut(&self) -> Database; //async fn get_pool(&mut self) -> PooledConnection<'_, AsyncDieselConnectionManager>; - async fn get_users(&mut self) -> DbResult> { - let db = self.get_database().await; + async fn get_users(&self) -> DbResult> { + let db = self.get_database_immut().await; let users = db.collection::("users"); Ok(users.find(doc! {}).await?.try_collect().await?) } + async fn get_random_users(&self, n: u32) -> DbResult> { + let db = self.get_database_immut().await; + let users = db.collection::("users"); + + let random_users: Vec = users + .aggregate(vec![doc! {"$sample": {"size": n}}]) + .await? + .try_collect() + .await?; + let random_users = random_users + .into_iter() + .map(|d| match serde_json::to_value(d) { + Ok(value) => serde_json::from_value::(value), + Err(err) => Err(err), + }) + .collect::>()?; + + Ok(random_users) + } + async fn set_admin(&mut self, userid: i64, isadmin: bool) -> DbResult<()> { let db = self.get_database().await; let users = db.collection::("users"); @@ -350,8 +378,8 @@ pub trait CallDB { Ok(()) } - async fn get_literal(&mut self, literal: &str) -> DbResult> { - let db = self.get_database().await; + async fn get_literal(&self, literal: &str) -> DbResult> { + let db = self.get_database_immut().await; let messages = db.collection::("literals"); let literal = messages.find_one(doc! { "token": literal }).await?; @@ -359,7 +387,7 @@ pub trait CallDB { Ok(literal) } - async fn get_literal_value(&mut self, literal: &str) -> DbResult> { + async fn get_literal_value(&self, literal: &str) -> DbResult> { let literal = self.get_literal(literal).await?; Ok(literal.map(|l| l.value)) diff --git a/src/db/tests/mod.rs b/src/db/tests/mod.rs index 8da40ce..5dbc6bc 100644 --- a/src/db/tests/mod.rs +++ b/src/db/tests/mod.rs @@ -174,3 +174,11 @@ async fn test_drop_media_except() { let _ = db.drop_media(literal).await.unwrap(); } + +#[tokio::test] +async fn test_get_random_users() { + let mut db = setup_db().await; + + let users = db.get_random_users(1).await.unwrap(); + assert_eq!(users.len(), 1); +} diff --git a/src/message_answerer.rs b/src/message_answerer.rs index c9d10ce..924361c 100644 --- a/src/message_answerer.rs +++ b/src/message_answerer.rs @@ -86,6 +86,14 @@ impl<'a> MessageAnswerer<'a> { self.answer_inner(text, literal, variant, keyboard).await } + pub async fn answer_text( + self, + text: String, + keyboard: Option, + ) -> BotResult<(i64, i32)> { + self.send_message(text, keyboard).await + } + async fn answer_inner( mut self, text: String, diff --git a/src/utils.rs b/src/utils.rs index 04ff503..091cba2 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -50,7 +50,7 @@ pub async fn create_callback_button( ) -> BotResult where C: Serialize + for<'a> Deserialize<'a> + Send + Sync, - D: CallDB + Send, + D: CallDB + Send + Sync, { let text = db .get_literal_value(literal)