Feature: BotNotificator #18
@ -12,6 +12,7 @@ use teloxide::{
|
||||
dispatching::dialogue::serializer::Json,
|
||||
dptree,
|
||||
prelude::{Dispatcher, Requester},
|
||||
types::{ChatId, UserId},
|
||||
Bot,
|
||||
};
|
||||
use tokio::runtime::Handle;
|
||||
@ -19,6 +20,7 @@ use tokio::runtime::Handle;
|
||||
use crate::{
|
||||
bot_handler::{script_handler, BotHandler},
|
||||
db::{bots::BotInstance, DbError, DB},
|
||||
message_answerer::MessageAnswerer,
|
||||
mongodb_storage::MongodbStorage,
|
||||
BotController, BotError, BotResult, BotRuntime,
|
||||
};
|
||||
@ -26,9 +28,15 @@ use crate::{
|
||||
pub struct BotRunner {
|
||||
controller: BotController,
|
||||
info: BotInfo,
|
||||
notificator: NotificatorThread,
|
||||
thread: Option<JoinHandle<BotResult<()>>>,
|
||||
}
|
||||
|
||||
pub enum NotificatorThread {
|
||||
Running(Option<JoinHandle<BotResult<()>>>),
|
||||
Done,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BotInfo {
|
||||
pub name: String,
|
||||
@ -149,6 +157,9 @@ where
|
||||
|
||||
let thread =
|
||||
spawn_bot_thread(controller.bot.clone(), controller.db.clone(), handler).await?;
|
||||
println!("Starting notificator");
|
||||
let notificator = spawn_notificator_thread(controller.clone()).await?;
|
||||
let notificator = NotificatorThread::Running(Some(notificator));
|
||||
|
||||
let info = BotInfo {
|
||||
name: bi.name.clone(),
|
||||
@ -156,6 +167,7 @@ where
|
||||
let runner = BotRunner {
|
||||
controller,
|
||||
info: info.clone(),
|
||||
notificator,
|
||||
thread: Some(thread),
|
||||
};
|
||||
|
||||
@ -207,3 +219,42 @@ pub async fn spawn_bot_thread(
|
||||
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
pub async fn spawn_notificator_thread(
|
||||
mut c: BotController,
|
||||
) -> BotResult<JoinHandle<BotResult<()>>> {
|
||||
let thread = std::thread::spawn(move || -> BotResult<()> {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
|
||||
rt.block_on(async {
|
||||
loop {
|
||||
let r = c.runtime.lock().unwrap();
|
||||
let notifications = r.rc.get_nearest_notifications();
|
||||
drop(r); // unlocking mutex
|
||||
|
||||
match notifications {
|
||||
Some(n) => {
|
||||
// waiting time to send notification
|
||||
println!("Will send notification after {:?}", n.wait_for());
|
||||
println!("Notifications: {:#?}", n.notifications());
|
||||
tokio::time::sleep(n.wait_for()).await;
|
||||
'n: for n in n.notifications().into_iter() {
|
||||
for user in n.get_users(&c.db).await?.into_iter() {
|
||||
let text = match n.resolve_message(&c.db, &user).await? {
|
||||
Some(text) => text,
|
||||
None => continue 'n,
|
||||
};
|
||||
|
||||
let ma = MessageAnswerer::new(&c.bot, &mut c.db, user.id);
|
||||
ma.answer_text(text.clone(), None).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => break Ok(()),
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
221
src/botscript.rs
221
src/botscript.rs
@ -5,14 +5,14 @@ use std::sync::{Arc, Mutex, PoisonError};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::db::raw_calls::RawCallError;
|
||||
use crate::db::{CallDB, DbError, DB};
|
||||
use crate::db::{CallDB, DbError, User, DB};
|
||||
use crate::utils::parcelable::{ParcelType, Parcelable, ParcelableError, ParcelableResult};
|
||||
use chrono::{NaiveTime, ParseError, Timelike};
|
||||
use chrono::{DateTime, Days, NaiveTime, ParseError, TimeDelta, Timelike, Utc};
|
||||
use db::attach_db_obj;
|
||||
use futures::future::join_all;
|
||||
use futures::lock::MutexGuard;
|
||||
use itertools::Itertools;
|
||||
use quickjs_rusty::serde::from_js;
|
||||
use quickjs_rusty::serde::{from_js, to_js};
|
||||
use quickjs_rusty::utils::create_empty_object;
|
||||
use quickjs_rusty::utils::create_string;
|
||||
use quickjs_rusty::ContextError;
|
||||
@ -274,6 +274,7 @@ pub struct BotConfig {
|
||||
/// relative to UTC, for e.g.,
|
||||
/// timezone = 3 will be UTC+3,
|
||||
/// timezone =-2 will be UTC-2,
|
||||
#[serde(default)]
|
||||
timezone: i8,
|
||||
}
|
||||
|
||||
@ -606,10 +607,56 @@ impl Parcelable<BotFunction> for BotDialog {
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(untagged)]
|
||||
pub enum NotificationTime {
|
||||
Delta(isize),
|
||||
Delta {
|
||||
#[serde(default)]
|
||||
delta_hours: u32,
|
||||
#[serde(default)]
|
||||
delta_minutes: u32,
|
||||
},
|
||||
Specific(SpecificTime),
|
||||
}
|
||||
|
||||
impl NotificationTime {
|
||||
pub fn when_next(&self, start_time: &DateTime<Utc>, now: &DateTime<Utc>) -> DateTime<Utc> {
|
||||
let now = *now;
|
||||
match self {
|
||||
NotificationTime::Delta {
|
||||
delta_hours,
|
||||
delta_minutes,
|
||||
} => {
|
||||
let delta = TimeDelta::minutes((delta_minutes + delta_hours * 60).into());
|
||||
println!("Delta: {delta}");
|
||||
|
||||
let mut estimation = *start_time;
|
||||
// super non-optimal, but fun :)
|
||||
loop {
|
||||
println!("Adding delta to estimation");
|
||||
if estimation < now + Duration::from_secs(1) {
|
||||
estimation += delta;
|
||||
} else {
|
||||
break estimation;
|
||||
}
|
||||
}
|
||||
}
|
||||
NotificationTime::Specific(time) => {
|
||||
let estimation = now;
|
||||
let estimation = estimation.with_hour(time.hour.into()).unwrap_or(estimation);
|
||||
let mut estimation = estimation
|
||||
.with_minute(time.minutes.into())
|
||||
.unwrap_or(estimation);
|
||||
// super non-optimal, but fun :)
|
||||
loop {
|
||||
if estimation < now {
|
||||
estimation = estimation + Days::new(1);
|
||||
} else {
|
||||
break estimation;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
#[serde(try_from = "SpecificTimeFormat")]
|
||||
pub struct SpecificTime {
|
||||
@ -652,11 +699,42 @@ pub enum NotificationFilter {
|
||||
#[serde(rename = "all")]
|
||||
All,
|
||||
/// Send to randomly selected N people
|
||||
Random { random: u8 },
|
||||
Random { random: u32 },
|
||||
/// Function that returns list of user id's who should get notification
|
||||
BotFunction(BotFunction),
|
||||
}
|
||||
|
||||
impl NotificationFilter {
|
||||
pub async fn get_users(&self, db: &DB) -> ScriptResult<Vec<User>> {
|
||||
match self {
|
||||
NotificationFilter::All => Ok(db.get_users().await?),
|
||||
NotificationFilter::Random { random } => Ok(db.get_random_users(*random).await?),
|
||||
NotificationFilter::BotFunction(f) => {
|
||||
let users = f.call()?;
|
||||
let users = from_js(f.context().unwrap(), &users)?;
|
||||
Ok(users)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Parcelable<BotFunction> for NotificationFilter {
|
||||
fn get_field(&mut self, name: &str) -> ParcelableResult<ParcelType<BotFunction>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn resolve(&mut self) -> ParcelableResult<ParcelType<BotFunction>>
|
||||
where
|
||||
Self: Sized + 'static,
|
||||
{
|
||||
match self {
|
||||
NotificationFilter::All => Ok(ParcelType::Other(())),
|
||||
NotificationFilter::Random { .. } => Ok(ParcelType::Other(())),
|
||||
NotificationFilter::BotFunction(f) => Ok(Parcelable::<_>::resolve(f)?),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(untagged)]
|
||||
pub enum NotificationMessage {
|
||||
@ -670,6 +748,38 @@ pub enum NotificationMessage {
|
||||
BotFunction(BotFunction),
|
||||
}
|
||||
|
||||
impl Parcelable<BotFunction> for NotificationMessage {
|
||||
fn get_field(&mut self, name: &str) -> ParcelableResult<ParcelType<BotFunction>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn resolve(&mut self) -> ParcelableResult<ParcelType<BotFunction>>
|
||||
where
|
||||
Self: Sized + 'static,
|
||||
{
|
||||
match self {
|
||||
NotificationMessage::Literal { .. } => Ok(ParcelType::Other(())),
|
||||
NotificationMessage::Text { .. } => Ok(ParcelType::Other(())),
|
||||
NotificationMessage::BotFunction(f) => Ok(f.resolve()?),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NotificationMessage {
|
||||
pub async fn resolve(&self, db: &DB, user: &User) -> ScriptResult<Option<String>> {
|
||||
match self {
|
||||
NotificationMessage::Literal { literal } => Ok(db.get_literal_value(literal).await?),
|
||||
NotificationMessage::Text { text } => Ok(Some(text.to_string())),
|
||||
NotificationMessage::BotFunction(f) => {
|
||||
let jsuser = to_js(f.context().expect("Function is not js"), user).unwrap();
|
||||
let text = f.call_args(vec![jsuser])?;
|
||||
let text = from_js(f.context().unwrap(), &text)?;
|
||||
Ok(text)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct BotNotification {
|
||||
time: NotificationTime,
|
||||
@ -678,12 +788,75 @@ pub struct BotNotification {
|
||||
message: NotificationMessage,
|
||||
}
|
||||
|
||||
impl Parcelable<BotFunction> for BotNotification {
|
||||
fn get_field(&mut self, name: &str) -> ParcelableResult<ParcelType<BotFunction>> {
|
||||
match name {
|
||||
"filter" => Ok(Parcelable::<_>::resolve(&mut self.filter)?),
|
||||
"message" => Ok(Parcelable::<BotFunction>::resolve(&mut self.message)?),
|
||||
field => Err(ParcelableError::FieldError(format!(
|
||||
"tried to get field {field}, but this field does not exists or private"
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BotNotification {
|
||||
pub fn left_time(&self, start_time: &DateTime<Utc>, now: &DateTime<Utc>) -> Duration {
|
||||
let next = self.time.when_next(start_time, now);
|
||||
|
||||
// immidate notification if time to do it passed
|
||||
let duration = (next - now).to_std().unwrap_or(Duration::from_secs(1));
|
||||
|
||||
println!("Left time: {duration:?}");
|
||||
// Rounding partitions of seconds
|
||||
Duration::from_secs(duration.as_secs())
|
||||
}
|
||||
|
||||
pub async fn get_users(&self, db: &DB) -> ScriptResult<Vec<User>> {
|
||||
self.filter.get_users(db).await
|
||||
}
|
||||
pub async fn resolve_message(&self, db: &DB, user: &User) -> ScriptResult<Option<String>> {
|
||||
self.message.resolve(db, user).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct RunnerConfig {
|
||||
config: BotConfig,
|
||||
pub dialog: BotDialog,
|
||||
#[serde(default)]
|
||||
notifications: Vec<BotNotification>,
|
||||
#[serde(skip)]
|
||||
created_at: ConfigCreatedAt,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ConfigCreatedAt {
|
||||
at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Default for ConfigCreatedAt {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
at: chrono::offset::Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NotificationBlock {
|
||||
wait_for: Duration,
|
||||
notifications: Vec<BotNotification>,
|
||||
}
|
||||
|
||||
impl NotificationBlock {
|
||||
pub fn wait_for(&self) -> Duration {
|
||||
self.wait_for
|
||||
}
|
||||
|
||||
pub fn notifications(&self) -> &[BotNotification] {
|
||||
&self.notifications
|
||||
}
|
||||
}
|
||||
|
||||
impl RunnerConfig {
|
||||
@ -699,12 +872,50 @@ impl RunnerConfig {
|
||||
|
||||
bm.map(|bm| bm.fill_literal(callback.to_string()))
|
||||
}
|
||||
|
||||
pub fn created_at(&self) -> DateTime<Utc> {
|
||||
self.created_at.at + TimeDelta::try_hours(self.config.timezone.into()).unwrap()
|
||||
}
|
||||
|
||||
/// if None is returned, then garanteed that later calls will also return None,
|
||||
/// so, if you'll get None, no notifications will be provided later
|
||||
pub fn get_nearest_notifications(&self) -> Option<NotificationBlock> {
|
||||
let start_time = self.created_at();
|
||||
let now =
|
||||
chrono::offset::Utc::now() + TimeDelta::try_hours(self.config.timezone.into()).unwrap();
|
||||
|
||||
let ordered = self
|
||||
.notifications
|
||||
.iter()
|
||||
.filter(|f| f.left_time(&start_time, &now) > Duration::from_secs(1))
|
||||
.sorted_by_key(|f| f.left_time(&start_time, &now))
|
||||
.collect::<Vec<_>>();
|
||||
println!("Orederd: {:#?}", ordered);
|
||||
|
||||
let left = match ordered.first() {
|
||||
Some(notification) => notification.left_time(&start_time, &now),
|
||||
// No notifications provided
|
||||
None => return None,
|
||||
};
|
||||
// get all that should be sent at the same time
|
||||
let notifications = ordered
|
||||
.into_iter()
|
||||
.filter(|n| n.left_time(&start_time, &now) == left)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Some(NotificationBlock {
|
||||
wait_for: left,
|
||||
notifications,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Parcelable<BotFunction> for RunnerConfig {
|
||||
fn get_field(&mut self, name: &str) -> Result<ParcelType<BotFunction>, ParcelableError> {
|
||||
match name {
|
||||
"dialog" => Ok(ParcelType::Parcelable(&mut self.dialog)),
|
||||
"notifications" => Ok(ParcelType::Parcelable(&mut self.notifications)),
|
||||
field => Err(ParcelableError::FieldError(format!(
|
||||
"tried to get field {field}, but this field does not exists or private"
|
||||
))),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user