From 0f793f84a267c622c86e992b886b67fa1b7afb58 Mon Sep 17 00:00:00 2001
From: Paul Makles <paulmakles@gmail.com>
Date: Mon, 13 Apr 2020 16:29:48 +0100
Subject: [PATCH] Fix serialization.

---
 src/database/message.rs             | 26 ++++++++--------
 src/main.rs                         |  1 +
 src/notifications/events/message.rs |  2 +-
 src/notifications/events/mod.rs     | 21 ++++++++++++-
 src/notifications/mod.rs            | 47 +++++++++++++++++++++++++++--
 src/notifications/pubsub.rs         |  8 ++---
 src/notifications/state.rs          |  9 ++----
 7 files changed, 83 insertions(+), 31 deletions(-)

diff --git a/src/database/message.rs b/src/database/message.rs
index f87bb54..c48c71a 100644
--- a/src/database/message.rs
+++ b/src/database/message.rs
@@ -1,9 +1,9 @@
 use super::get_collection;
 use crate::guards::channel::ChannelRef;
-use crate::routes::channel::ChannelType;
 use crate::notifications;
-use crate::notifications::events::Notification::MessageCreate;
 use crate::notifications::events::message::Create;
+use crate::notifications::events::Notification::MessageCreate;
+use crate::routes::channel::ChannelType;
 
 use bson::{doc, to_bson, UtcDateTime};
 use serde::{Deserialize, Serialize};
@@ -37,20 +37,18 @@ impl Message {
             .insert_one(to_bson(&self).unwrap().as_document().unwrap().clone(), None)
             .is_ok()
         {
-            let data = MessageCreate(
-                Create {
-                    id: self.id.clone(),
-                    nonce: self.nonce.clone(),
-                    channel: self.channel.clone(),
-                    author: self.author.clone(),
-                    content: self.content.clone(),
-                }
-            );
+            let data = MessageCreate(Create {
+                id: self.id.clone(),
+                nonce: self.nonce.clone(),
+                channel: self.channel.clone(),
+                author: self.author.clone(),
+                content: self.content.clone(),
+            });
 
             match target.channel_type {
-                0..=1 => notifications::send_message(target.recipients.clone(), None, data),
-                2 => notifications::send_message(target.recipients.clone(), None, data),
-                _ => unreachable!()
+                0..=1 => notifications::send_message_threaded(target.recipients.clone(), None, data),
+                2 => notifications::send_message_threaded(None, target.guild.clone(), data),
+                _ => unreachable!(),
             };
 
             let short_content: String = self.content.chars().take(24).collect();
diff --git a/src/main.rs b/src/main.rs
index 691f57d..13d7dd1 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -20,6 +20,7 @@ use std::thread;
 fn main() {
     dotenv::dotenv().ok();
     database::connect();
+    notifications::start_worker();
 
     thread::spawn(|| {
         notifications::pubsub::launch_subscriber();
diff --git a/src/notifications/events/message.rs b/src/notifications/events/message.rs
index 5729488..b668150 100644
--- a/src/notifications/events/message.rs
+++ b/src/notifications/events/message.rs
@@ -1,6 +1,6 @@
 use serde::{Deserialize, Serialize};
 
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
 pub struct Create {
     pub id: String,
     pub nonce: Option<String>,
diff --git a/src/notifications/events/mod.rs b/src/notifications/events/mod.rs
index 2143edd..70aa92d 100644
--- a/src/notifications/events/mod.rs
+++ b/src/notifications/events/mod.rs
@@ -1,8 +1,27 @@
 use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
 
 pub mod message;
 
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
 pub enum Notification {
     MessageCreate(message::Create),
 }
+
+impl Notification {
+    pub fn serialize(self) -> String {
+        if let Value::Object(obj) = json!(self) {
+            let (key, value) = obj.iter().next().unwrap();
+
+            if let Value::Object(data) = value {
+                let mut data = data.clone();
+                data.insert("type".to_string(), Value::String(key.to_string()));
+                json!(data).to_string()
+            } else {
+                unreachable!()
+            }
+        } else {
+            unreachable!()
+        }
+    }
+}
diff --git a/src/notifications/mod.rs b/src/notifications/mod.rs
index e5a04c4..4a2cb9a 100644
--- a/src/notifications/mod.rs
+++ b/src/notifications/mod.rs
@@ -1,3 +1,7 @@
+use once_cell::sync::OnceCell;
+use std::sync::mpsc::{channel, Sender};
+use std::thread;
+
 pub mod events;
 pub mod pubsub;
 pub mod state;
@@ -11,11 +15,50 @@ pub fn send_message<U: Into<Option<Vec<String>>>, G: Into<Option<String>>>(
     let users = users.into();
     let guild = guild.into();
 
-    if pubsub::send_message(users.clone(), guild.clone(), data) {
-        state::send_message(users, guild, "bruh".to_string());
+    if pubsub::send_message(users.clone(), guild.clone(), data.clone()) {
+        state::send_message(users, guild, data.serialize());
 
         true
     } else {
         false
     }
 }
+
+struct NotificationArguments {
+    users: Option<Vec<String>>,
+    guild: Option<String>,
+    data: events::Notification,
+}
+
+static mut SENDER: OnceCell<Sender<NotificationArguments>> = OnceCell::new();
+
+pub fn start_worker() {
+    let (sender, receiver) = channel();
+    unsafe {
+        SENDER.set(sender).unwrap();
+    }
+
+    thread::spawn(move || {
+        while let Ok(data) = receiver.recv() {
+            send_message(data.users, data.guild, data.data);
+        }
+    });
+}
+
+pub fn send_message_threaded<U: Into<Option<Vec<String>>>, G: Into<Option<String>>>(
+    users: U,
+    guild: G,
+    data: events::Notification,
+) -> bool {
+    unsafe {
+        SENDER
+            .get()
+            .unwrap()
+            .send(NotificationArguments {
+                users: users.into(),
+                guild: guild.into(),
+                data,
+            })
+            .is_ok()
+    }
+}
diff --git a/src/notifications/pubsub.rs b/src/notifications/pubsub.rs
index 757fa10..06d516b 100644
--- a/src/notifications/pubsub.rs
+++ b/src/notifications/pubsub.rs
@@ -23,11 +23,7 @@ pub struct PubSubMessage {
     data: Notification,
 }
 
-pub fn send_message(
-    users: Option<Vec<String>>,
-    guild: Option<String>,
-    data: Notification,
-) -> bool {
+pub fn send_message(users: Option<Vec<String>>, guild: Option<String>, data: Notification) -> bool {
     let message = PubSubMessage {
         id: Ulid::new().to_string(),
         source: SOURCEID.get().unwrap().to_string(),
@@ -92,7 +88,7 @@ pub fn launch_subscriber() {
                                 super::state::send_message(
                                     message.user_recipients,
                                     message.target_guild,
-                                    json!(message.data).to_string(),
+                                    message.data.serialize(),
                                 );
                             }
                         } else {
diff --git a/src/notifications/state.rs b/src/notifications/state.rs
index 8f4127f..217459d 100644
--- a/src/notifications/state.rs
+++ b/src/notifications/state.rs
@@ -1,4 +1,3 @@
-use super::events::Notification;
 use crate::database;
 use crate::util::vec_to_set;
 
@@ -174,14 +173,10 @@ pub fn init() {
     }
 }
 
-pub fn send_message(
-    users: Option<Vec<String>>,
-    guild: Option<String>,
-    data: String,
-) {
+pub fn send_message(users: Option<Vec<String>>, guild: Option<String>, data: String) {
     let state = unsafe { DATA.get().unwrap().read().unwrap() };
     let mut connections = HashSet::new();
-    
+
     let mut users = vec_to_set(&users.unwrap_or(vec![]));
     if let Some(guild) = guild {
         if let Some(entry) = state.guilds.get(&guild) {
-- 
GitLab