Feature: BotNotificator #18

Merged
akulij merged 13 commits from dev into main 2025-06-02 11:54:14 +00:00
7 changed files with 455 additions and 9 deletions

View File

@ -76,8 +76,16 @@ print(JSON.stringify(dialog.buttons))
const config = { const config = {
version: 1.1, version: 1.1,
timezone: 3,
} }
const notifications = [
// {
// time: "18:14",
// message: {literal: "show_projects"},
// },
]
// {config, dialog} // {config, dialog}
const c = { config: config, dialog: dialog } const c = { config: config, dialog: dialog, notifications: notifications }
c c

View File

@ -12,6 +12,7 @@ use teloxide::{
dispatching::dialogue::serializer::Json, dispatching::dialogue::serializer::Json,
dptree, dptree,
prelude::{Dispatcher, Requester}, prelude::{Dispatcher, Requester},
types::{ChatId, UserId},
Bot, Bot,
}; };
use tokio::runtime::Handle; use tokio::runtime::Handle;
@ -19,6 +20,7 @@ use tokio::runtime::Handle;
use crate::{ use crate::{
bot_handler::{script_handler, BotHandler}, bot_handler::{script_handler, BotHandler},
db::{bots::BotInstance, DbError, DB}, db::{bots::BotInstance, DbError, DB},
message_answerer::MessageAnswerer,
mongodb_storage::MongodbStorage, mongodb_storage::MongodbStorage,
BotController, BotError, BotResult, BotRuntime, BotController, BotError, BotResult, BotRuntime,
}; };
@ -26,9 +28,15 @@ use crate::{
pub struct BotRunner { pub struct BotRunner {
controller: BotController, controller: BotController,
info: BotInfo, info: BotInfo,
notificator: NotificatorThread,
thread: Option<JoinHandle<BotResult<()>>>, thread: Option<JoinHandle<BotResult<()>>>,
} }
pub enum NotificatorThread {
Running(Option<JoinHandle<BotResult<()>>>),
Done,
}
#[derive(Clone)] #[derive(Clone)]
pub struct BotInfo { pub struct BotInfo {
pub name: String, pub name: String,
@ -149,6 +157,8 @@ where
let thread = let thread =
spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?; spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?;
let notificator = spawn_notificator_thread(controller.clone()).await?;
let notificator = NotificatorThread::Running(Some(notificator));
let info = BotInfo { let info = BotInfo {
name: bi.name.clone(), name: bi.name.clone(),
@ -156,6 +166,7 @@ where
let runner = BotRunner { let runner = BotRunner {
controller, controller,
info: info.clone(), info: info.clone(),
notificator,
thread: Some(thread), thread: Some(thread),
}; };
@ -207,3 +218,40 @@ pub async fn spawn_bot_thread(
Ok(thread) Ok(thread)
} }
pub async fn spawn_notificator_thread(
mut c: BotController,
) -> BotResult<JoinHandle<BotResult<()>>> {
let thread = std::thread::spawn(move || -> BotResult<()> {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
loop {
let r = c.runtime.lock().unwrap();
let notifications = r.rc.get_nearest_notifications();
drop(r); // unlocking mutex
match notifications {
Some(n) => {
// waiting time to send notification
tokio::time::sleep(n.wait_for()).await;
'n: for n in n.notifications().into_iter() {
for user in n.get_users(&c.db).await?.into_iter() {
let text = match n.resolve_message(&c.db, &user).await? {
Some(text) => text,
None => continue 'n,
};
let ma = MessageAnswerer::new(&c.bot, &mut c.db, user.id);
ma.answer_text(text.clone(), None).await?;
}
}
}
None => break Ok(()),
}
}
})
});
Ok(thread)
}

View File

@ -2,15 +2,17 @@ pub mod application;
pub mod db; pub mod db;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex, PoisonError}; use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;
use crate::db::raw_calls::RawCallError; use crate::db::raw_calls::RawCallError;
use crate::db::{CallDB, DbError, DB}; use crate::db::{CallDB, DbError, User, DB};
use crate::utils::parcelable::{ParcelType, Parcelable, ParcelableError, ParcelableResult}; use crate::utils::parcelable::{ParcelType, Parcelable, ParcelableError, ParcelableResult};
use chrono::{DateTime, Days, NaiveTime, ParseError, TimeDelta, Timelike, Utc};
use db::attach_db_obj; use db::attach_db_obj;
use futures::future::join_all; use futures::future::join_all;
use futures::lock::MutexGuard; use futures::lock::MutexGuard;
use itertools::Itertools; use itertools::Itertools;
use quickjs_rusty::serde::from_js; use quickjs_rusty::serde::{from_js, to_js};
use quickjs_rusty::utils::create_empty_object; use quickjs_rusty::utils::create_empty_object;
use quickjs_rusty::utils::create_string; use quickjs_rusty::utils::create_string;
use quickjs_rusty::ContextError; use quickjs_rusty::ContextError;
@ -269,6 +271,11 @@ fn print(s: String) {
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BotConfig { pub struct BotConfig {
version: f64, version: f64,
/// relative to UTC, for e.g.,
/// timezone = 3 will be UTC+3,
/// timezone =-2 will be UTC-2,
#[serde(default)]
timezone: i8,
} }
pub trait ResolveValue { pub trait ResolveValue {
@ -597,10 +604,256 @@ impl Parcelable<BotFunction> for BotDialog {
} }
} }
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum NotificationTime {
Delta {
#[serde(default)]
delta_hours: u32,
#[serde(default)]
delta_minutes: u32,
},
Specific(SpecificTime),
}
impl NotificationTime {
pub fn when_next(&self, start_time: &DateTime<Utc>, now: &DateTime<Utc>) -> DateTime<Utc> {
let now = *now;
match self {
NotificationTime::Delta {
delta_hours,
delta_minutes,
} => {
let delta = TimeDelta::minutes((delta_minutes + delta_hours * 60).into());
let mut estimation = *start_time;
// super non-optimal, but fun :)
loop {
if estimation < now + Duration::from_secs(1) {
estimation += delta;
} else {
break estimation;
}
}
}
NotificationTime::Specific(time) => {
let estimation = now;
let estimation = estimation.with_hour(time.hour.into()).unwrap_or(estimation);
let mut estimation = estimation
.with_minute(time.minutes.into())
.unwrap_or(estimation);
// super non-optimal, but fun :)
loop {
if estimation < now {
estimation = estimation + Days::new(1);
} else {
break estimation;
}
}
}
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(try_from = "SpecificTimeFormat")]
pub struct SpecificTime {
hour: u8,
minutes: u8,
}
impl SpecificTime {
pub fn new(hour: u8, minutes: u8) -> Self {
Self { hour, minutes }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum SpecificTimeFormat {
String(String),
Verbose { hour: u8, minutes: u8 },
}
impl TryFrom<SpecificTimeFormat> for SpecificTime {
type Error = ParseError;
fn try_from(stf: SpecificTimeFormat) -> Result<Self, Self::Error> {
match stf {
SpecificTimeFormat::Verbose { hour, minutes } => Ok(Self::new(hour, minutes)),
SpecificTimeFormat::String(timestring) => {
let time: NaiveTime = timestring.parse()?;
Ok(Self::new(time.hour() as u8, time.minute() as u8))
}
}
}
}
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
#[serde(untagged)]
pub enum NotificationFilter {
#[default]
#[serde(rename = "all")]
All,
/// Send to randomly selected N people
Random { random: u32 },
/// Function that returns list of user id's who should get notification
BotFunction(BotFunction),
}
impl NotificationFilter {
pub async fn get_users(&self, db: &DB) -> ScriptResult<Vec<User>> {
match self {
NotificationFilter::All => Ok(db.get_users().await?),
NotificationFilter::Random { random } => Ok(db.get_random_users(*random).await?),
NotificationFilter::BotFunction(f) => {
let users = f.call()?;
let users = from_js(f.context().unwrap(), &users)?;
Ok(users)
}
}
}
}
impl Parcelable<BotFunction> for NotificationFilter {
fn get_field(&mut self, name: &str) -> ParcelableResult<ParcelType<BotFunction>> {
todo!()
}
fn resolve(&mut self) -> ParcelableResult<ParcelType<BotFunction>>
where
Self: Sized + 'static,
{
match self {
NotificationFilter::All => Ok(ParcelType::Other(())),
NotificationFilter::Random { .. } => Ok(ParcelType::Other(())),
NotificationFilter::BotFunction(f) => Ok(Parcelable::<_>::resolve(f)?),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(untagged)]
pub enum NotificationMessage {
Literal {
literal: String,
},
Text {
text: String,
},
/// Function can accept user which will be notified and then return generated message
BotFunction(BotFunction),
}
impl Parcelable<BotFunction> for NotificationMessage {
fn get_field(&mut self, name: &str) -> ParcelableResult<ParcelType<BotFunction>> {
todo!()
}
fn resolve(&mut self) -> ParcelableResult<ParcelType<BotFunction>>
where
Self: Sized + 'static,
{
match self {
NotificationMessage::Literal { .. } => Ok(ParcelType::Other(())),
NotificationMessage::Text { .. } => Ok(ParcelType::Other(())),
NotificationMessage::BotFunction(f) => Ok(f.resolve()?),
}
}
}
impl NotificationMessage {
pub async fn resolve(&self, db: &DB, user: &User) -> ScriptResult<Option<String>> {
match self {
NotificationMessage::Literal { literal } => Ok(db.get_literal_value(literal).await?),
NotificationMessage::Text { text } => Ok(Some(text.to_string())),
NotificationMessage::BotFunction(f) => {
let jsuser = to_js(f.context().expect("Function is not js"), user).unwrap();
let text = f.call_args(vec![jsuser])?;
let text = from_js(f.context().unwrap(), &text)?;
Ok(text)
}
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BotNotification {
time: NotificationTime,
#[serde(default)]
filter: NotificationFilter,
message: NotificationMessage,
}
impl Parcelable<BotFunction> for BotNotification {
fn get_field(&mut self, name: &str) -> ParcelableResult<ParcelType<BotFunction>> {
match name {
"filter" => Ok(Parcelable::<_>::resolve(&mut self.filter)?),
"message" => Ok(Parcelable::<BotFunction>::resolve(&mut self.message)?),
field => Err(ParcelableError::FieldError(format!(
"tried to get field {field}, but this field does not exists or private"
))),
}
}
}
impl BotNotification {
pub fn left_time(&self, start_time: &DateTime<Utc>, now: &DateTime<Utc>) -> Duration {
let next = self.time.when_next(start_time, now);
// immidate notification if time to do it passed
let duration = (next - now).to_std().unwrap_or(Duration::from_secs(1));
// Rounding partitions of seconds
Duration::from_secs(duration.as_secs())
}
pub async fn get_users(&self, db: &DB) -> ScriptResult<Vec<User>> {
self.filter.get_users(db).await
}
pub async fn resolve_message(&self, db: &DB, user: &User) -> ScriptResult<Option<String>> {
self.message.resolve(db, user).await
}
}
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RunnerConfig { pub struct RunnerConfig {
config: BotConfig, config: BotConfig,
pub dialog: BotDialog, pub dialog: BotDialog,
#[serde(default)]
notifications: Vec<BotNotification>,
#[serde(skip)]
created_at: ConfigCreatedAt,
}
#[derive(Debug, Clone)]
struct ConfigCreatedAt {
at: DateTime<Utc>,
}
impl Default for ConfigCreatedAt {
fn default() -> Self {
Self {
at: chrono::offset::Utc::now(),
}
}
}
#[derive(Debug, Clone)]
pub struct NotificationBlock {
wait_for: Duration,
notifications: Vec<BotNotification>,
}
impl NotificationBlock {
pub fn wait_for(&self) -> Duration {
self.wait_for
}
pub fn notifications(&self) -> &[BotNotification] {
&self.notifications
}
} }
impl RunnerConfig { impl RunnerConfig {
@ -616,12 +869,49 @@ impl RunnerConfig {
bm.map(|bm| bm.fill_literal(callback.to_string())) bm.map(|bm| bm.fill_literal(callback.to_string()))
} }
pub fn created_at(&self) -> DateTime<Utc> {
self.created_at.at + TimeDelta::try_hours(self.config.timezone.into()).unwrap()
}
/// if None is returned, then garanteed that later calls will also return None,
/// so, if you'll get None, no notifications will be provided later
pub fn get_nearest_notifications(&self) -> Option<NotificationBlock> {
let start_time = self.created_at();
let now =
chrono::offset::Utc::now() + TimeDelta::try_hours(self.config.timezone.into()).unwrap();
let ordered = self
.notifications
.iter()
.filter(|f| f.left_time(&start_time, &now) > Duration::from_secs(1))
.sorted_by_key(|f| f.left_time(&start_time, &now))
.collect::<Vec<_>>();
let left = match ordered.first() {
Some(notification) => notification.left_time(&start_time, &now),
// No notifications provided
None => return None,
};
// get all that should be sent at the same time
let notifications = ordered
.into_iter()
.filter(|n| n.left_time(&start_time, &now) == left)
.cloned()
.collect::<Vec<_>>();
Some(NotificationBlock {
wait_for: left,
notifications,
})
}
} }
impl Parcelable<BotFunction> for RunnerConfig { impl Parcelable<BotFunction> for RunnerConfig {
fn get_field(&mut self, name: &str) -> Result<ParcelType<BotFunction>, ParcelableError> { fn get_field(&mut self, name: &str) -> Result<ParcelType<BotFunction>, ParcelableError> {
match name { match name {
"dialog" => Ok(ParcelType::Parcelable(&mut self.dialog)), "dialog" => Ok(ParcelType::Parcelable(&mut self.dialog)),
"notifications" => Ok(ParcelType::Parcelable(&mut self.notifications)),
field => Err(ParcelableError::FieldError(format!( field => Err(ParcelableError::FieldError(format!(
"tried to get field {field}, but this field does not exists or private" "tried to get field {field}, but this field does not exists or private"
))), ))),
@ -707,6 +997,7 @@ impl Runner {
#[allow(clippy::print_stdout)] #[allow(clippy::print_stdout)]
mod tests { mod tests {
use quickjs_rusty::{serde::from_js, OwnedJsObject}; use quickjs_rusty::{serde::from_js, OwnedJsObject};
use serde_json::json;
use super::*; use super::*;
@ -785,4 +1076,59 @@ mod tests {
panic!("test returned an error, but the wrong one, {errstr}") panic!("test returned an error, but the wrong one, {errstr}")
} }
} }
#[test]
fn test_notification_struct() {
let botn = json!({
"time": "18:00",
"filter": {"random": 2},
"message": {"text": "some"},
});
let n: BotNotification = serde_json::from_value(botn).unwrap();
println!("BotNotification: {n:#?}");
assert!(matches!(n.time, NotificationTime::Specific(..)));
let time = if let NotificationTime::Specific(st) = n.time {
st
} else {
unreachable!()
};
assert_eq!(
time,
SpecificTime {
hour: 18,
minutes: 00
}
);
}
#[test]
fn test_notification_time() {
let botn = json!({
"time": "18:00",
"filter": {"random": 2},
"message": {"text": "some"},
});
let n: BotNotification = serde_json::from_value(botn).unwrap();
println!("BotNotification: {n:#?}");
let start_time = chrono::offset::Utc::now();
// let start_time = chrono::offset::Utc::now() + TimeDelta::try_hours(5).unwrap();
let start_time = start_time.with_hour(13).unwrap().with_minute(23).unwrap();
let left = n.left_time(&start_time, &start_time);
let secs = left.as_secs();
let minutes = secs / 60;
let hours = minutes / 60;
let minutes = minutes % 60;
println!("Left: {hours}:{minutes}");
let when_should = chrono::offset::Utc::now()
.with_hour(18)
.unwrap()
.with_minute(00)
.unwrap();
let should_left = (when_should - start_time).to_std().unwrap();
let should_left = Duration::from_secs(should_left.as_secs());
assert_eq!(left, should_left)
}
} }

View File

@ -11,6 +11,7 @@ use chrono::{DateTime, Utc};
use enum_stringify::EnumStringify; use enum_stringify::EnumStringify;
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
use futures::StreamExt;
use mongodb::options::IndexOptions; use mongodb::options::IndexOptions;
use mongodb::{bson::doc, options::ClientOptions, Client}; use mongodb::{bson::doc, options::ClientOptions, Client};
use mongodb::{Collection, Database, IndexModel}; use mongodb::{Collection, Database, IndexModel};
@ -212,6 +213,10 @@ impl CallDB for DB {
async fn get_database(&mut self) -> Database { async fn get_database(&mut self) -> Database {
self.client.database(&self.name) self.client.database(&self.name)
} }
async fn get_database_immut(&self) -> Database {
self.client.database(&self.name)
}
} }
impl<T: CallDB> GetCollection for T { impl<T: CallDB> GetCollection for T {
@ -226,6 +231,8 @@ impl<T: CallDB> GetCollection for T {
pub enum DbError { pub enum DbError {
#[error("error while processing mongodb query: {0}")] #[error("error while processing mongodb query: {0}")]
MongodbError(#[from] mongodb::error::Error), MongodbError(#[from] mongodb::error::Error),
#[error("error while coverting values: {0}")]
SerdeJsonError(#[from] serde_json::error::Error),
} }
pub type DbResult<T> = Result<T, DbError>; pub type DbResult<T> = Result<T, DbError>;
@ -233,14 +240,35 @@ pub type DbResult<T> = Result<T, DbError>;
pub trait CallDB { pub trait CallDB {
//type C; //type C;
async fn get_database(&mut self) -> Database; async fn get_database(&mut self) -> Database;
async fn get_database_immut(&self) -> Database;
//async fn get_pool(&mut self) -> PooledConnection<'_, AsyncDieselConnectionManager<C>>; //async fn get_pool(&mut self) -> PooledConnection<'_, AsyncDieselConnectionManager<C>>;
async fn get_users(&mut self) -> DbResult<Vec<User>> { async fn get_users(&self) -> DbResult<Vec<User>> {
let db = self.get_database().await; let db = self.get_database_immut().await;
let users = db.collection::<User>("users"); let users = db.collection::<User>("users");
Ok(users.find(doc! {}).await?.try_collect().await?) Ok(users.find(doc! {}).await?.try_collect().await?)
} }
async fn get_random_users(&self, n: u32) -> DbResult<Vec<User>> {
let db = self.get_database_immut().await;
let users = db.collection::<User>("users");
let random_users: Vec<bson::Document> = users
.aggregate(vec![doc! {"$sample": {"size": n}}])
.await?
.try_collect()
.await?;
let random_users = random_users
.into_iter()
.map(|d| match serde_json::to_value(d) {
Ok(value) => serde_json::from_value::<User>(value),
Err(err) => Err(err),
})
.collect::<Result<_, _>>()?;
Ok(random_users)
}
async fn set_admin(&mut self, userid: i64, isadmin: bool) -> DbResult<()> { async fn set_admin(&mut self, userid: i64, isadmin: bool) -> DbResult<()> {
let db = self.get_database().await; let db = self.get_database().await;
let users = db.collection::<User>("users"); let users = db.collection::<User>("users");
@ -350,8 +378,8 @@ pub trait CallDB {
Ok(()) Ok(())
} }
async fn get_literal(&mut self, literal: &str) -> DbResult<Option<Literal>> { async fn get_literal(&self, literal: &str) -> DbResult<Option<Literal>> {
let db = self.get_database().await; let db = self.get_database_immut().await;
let messages = db.collection::<Literal>("literals"); let messages = db.collection::<Literal>("literals");
let literal = messages.find_one(doc! { "token": literal }).await?; let literal = messages.find_one(doc! { "token": literal }).await?;
@ -359,7 +387,7 @@ pub trait CallDB {
Ok(literal) Ok(literal)
} }
async fn get_literal_value(&mut self, literal: &str) -> DbResult<Option<String>> { async fn get_literal_value(&self, literal: &str) -> DbResult<Option<String>> {
let literal = self.get_literal(literal).await?; let literal = self.get_literal(literal).await?;
Ok(literal.map(|l| l.value)) Ok(literal.map(|l| l.value))

View File

@ -174,3 +174,11 @@ async fn test_drop_media_except() {
let _ = db.drop_media(literal).await.unwrap(); let _ = db.drop_media(literal).await.unwrap();
} }
#[tokio::test]
async fn test_get_random_users() {
let mut db = setup_db().await;
let users = db.get_random_users(1).await.unwrap();
assert_eq!(users.len(), 1);
}

View File

@ -86,6 +86,14 @@ impl<'a> MessageAnswerer<'a> {
self.answer_inner(text, literal, variant, keyboard).await self.answer_inner(text, literal, variant, keyboard).await
} }
pub async fn answer_text(
self,
text: String,
keyboard: Option<InlineKeyboardMarkup>,
) -> BotResult<(i64, i32)> {
self.send_message(text, keyboard).await
}
async fn answer_inner( async fn answer_inner(
mut self, mut self,
text: String, text: String,

View File

@ -50,7 +50,7 @@ pub async fn create_callback_button<C, D>(
) -> BotResult<InlineKeyboardButton> ) -> BotResult<InlineKeyboardButton>
where where
C: Serialize + for<'a> Deserialize<'a> + Send + Sync, C: Serialize + for<'a> Deserialize<'a> + Send + Sync,
D: CallDB + Send, D: CallDB + Send + Sync,
{ {
let text = db let text = db
.get_literal_value(literal) .get_literal_value(literal)