From f32f4472337b0c853a2c43699da44cd0f69699f5 Mon Sep 17 00:00:00 2001
From: Paul <paulmakles@gmail.com>
Date: Wed, 2 Jun 2021 10:27:51 +0100
Subject: [PATCH] Notifications: Subscribe users to new guild channels.

---
 src/notifications/events.rs | 57 ++++++++++++++++++++++++-------------
 1 file changed, 38 insertions(+), 19 deletions(-)

diff --git a/src/notifications/events.rs b/src/notifications/events.rs
index 3a118be..a40035e 100644
--- a/src/notifications/events.rs
+++ b/src/notifications/events.rs
@@ -1,12 +1,12 @@
-use std::collections::HashMap;
-
+use futures::StreamExt;
 use hive_pubsub::PubSub;
 use rauth::auth::Session;
-use rocket_contrib::json::JsonValue;
+use mongodb::bson::{Document, doc};
 use serde::{Deserialize, Serialize};
+use rocket_contrib::json::JsonValue;
 
 use super::hive::{get_hive, subscribe_if_exists};
-use crate::database::*;
+use crate::{database::*, util::result::{Error, Result}};
 
 #[derive(Serialize, Deserialize, Debug)]
 #[serde(tag = "error")]
@@ -137,27 +137,15 @@ pub enum ClientboundNotification {
 impl ClientboundNotification {
     pub fn publish(self, topic: String) {
         async_std::task::spawn(async move {
-            prehandle_hook(&self); // ! TODO: this should be moved to pubsub
+            prehandle_hook(&self).await.ok(); // ! FIXME: this should be moved to pubsub
             hive_pubsub::backend::mongo::publish(get_hive(), &topic, self)
                 .await
                 .ok();
         });
     }
-
-    pub fn publish_to(self, channel: &Channel) {
-        // ! FIXME: update all for channel
-        // ! FIXME: temporary solution for pushing to guilds
-        self.publish(
-            if let Channel::TextChannel { server, .. } = channel {
-                server.clone()
-            } else {
-                channel.id().to_string()
-            }
-        )
-    }
 }
 
-pub fn prehandle_hook(notification: &ClientboundNotification) {
+pub async fn prehandle_hook(notification: &ClientboundNotification) -> Result<()> {
     match &notification {
         ClientboundNotification::ChannelGroupJoin { id, user } => {
             subscribe_if_exists(user.clone(), id.clone()).ok();
@@ -173,7 +161,36 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
                         subscribe_if_exists(recipient.clone(), channel_id.to_string()).ok();
                     }
                 }
-                _ => {}
+                Channel::TextChannel { server, .. } => {
+                    // ! FIXME: write a better algorithm?
+                    let members = get_collection("server_members")
+                        .find(
+                            doc! {
+                                "_id.server": server
+                            },
+                            None,
+                        )
+                        .await
+                        .map_err(|_| Error::DatabaseError {
+                            operation: "find",
+                            with: "server_members",
+                        })?
+                        .filter_map(async move |s| s.ok())
+                        .collect::<Vec<Document>>()
+                        .await
+                        .into_iter()
+                        .filter_map(|x| {
+                            x.get_document("_id")
+                                .ok()
+                                .map(|i| i.get_str("user").ok().map(|x| x.to_string()))
+                        })
+                        .flatten()
+                        .collect::<Vec<String>>();
+
+                    for member in members {
+                        subscribe_if_exists(member.clone(), channel_id.to_string()).ok();
+                    }
+                }
             }
         }
         ClientboundNotification::ServerMemberJoin { id, user } => {
@@ -189,6 +206,8 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
         }
         _ => {}
     }
+
+    Ok(())
 }
 
 pub fn posthandle_hook(notification: &ClientboundNotification) {
-- 
GitLab