diff --git a/src/notifications/events.rs b/src/notifications/events.rs index 3011da7592e7e8c139fc615baa40f1d5c77df9e2..9da5f816446488ff929a95ea97445235b8d5e485 100644 --- a/src/notifications/events.rs +++ b/src/notifications/events.rs @@ -1,7 +1,6 @@ use rauth::auth::Session; use serde::{Deserialize, Serialize}; use snafu::Snafu; -use hive_pubsub::PubSub; use crate::database::entities::RelationshipStatus; @@ -89,7 +88,6 @@ pub enum ClientboundNotification { GuildDelete { id: String, },*/ - UserRelationship { id: String, user: String, diff --git a/src/notifications/mod.rs b/src/notifications/mod.rs index c1dd5c6718b79984bdf2792816483a74330dca32..22f7bf4f379a353e24eef66fecca2f90a702b2df 100644 --- a/src/notifications/mod.rs +++ b/src/notifications/mod.rs @@ -1,4 +1,4 @@ pub mod events; pub mod hive; -pub mod websocket; pub mod subscriptions; +pub mod websocket; diff --git a/src/notifications/websocket.rs b/src/notifications/websocket.rs index 3fa3fa5f5e1e172358b3dc1537f87aba861f97f4..5e2bf6921fdffbafc5862df38e8cf0cf94944bfb 100644 --- a/src/notifications/websocket.rs +++ b/src/notifications/websocket.rs @@ -1,6 +1,6 @@ use crate::database::get_collection; -use crate::{database::entities::User, util::variables::WS_HOST}; use crate::database::guards::reference::Ref; +use crate::util::variables::WS_HOST; use super::subscriptions; @@ -10,15 +10,18 @@ use async_tungstenite::tungstenite::Message; use futures::channel::mpsc::{unbounded, UnboundedSender}; use futures::stream::TryStreamExt; use futures::{pin_mut, prelude::*}; +use hive_pubsub::PubSub; use log::{debug, info}; use many_to_many::ManyToMany; use rauth::auth::{Auth, Session}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex, RwLock}; -use hive_pubsub::PubSub; -use super::{events::{ClientboundNotification, ServerboundNotification, WebSocketError}, hive::get_hive}; +use super::{ + events::{ClientboundNotification, ServerboundNotification, WebSocketError}, + hive::get_hive, +}; type Tx = UnboundedSender<Message>; type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>; @@ -54,9 +57,7 @@ async fn accept(stream: TcpStream) { CONNECTIONS.lock().unwrap().insert(addr, tx.clone()); let send = |notification: ClientboundNotification| { - if let Ok(response) = serde_json::to_string( - ¬ification, - ) { + if let Ok(response) = serde_json::to_string(¬ification) { if let Err(_) = tx.unbounded_send(Message::Text(response)) { debug!("Failed unbounded_send to websocket stream."); } @@ -71,8 +72,10 @@ async fn accept(stream: TcpStream) { match notification { ServerboundNotification::Authenticate(new_session) => { if session.is_some() { - send(ClientboundNotification::Error(WebSocketError::AlreadyAuthenticated)); - return future::ok(()) + send(ClientboundNotification::Error( + WebSocketError::AlreadyAuthenticated, + )); + return future::ok(()); } match task::block_on( @@ -80,29 +83,41 @@ async fn accept(stream: TcpStream) { ) { Ok(validated_session) => { match task::block_on( - Ref { id: validated_session.user_id.clone() } - .fetch_user() + Ref { + id: validated_session.user_id.clone(), + } + .fetch_user(), ) { Ok(user) => { if let Ok(mut map) = USERS.write() { map.insert(validated_session.user_id.clone(), addr); session = Some(validated_session); - if let Ok(_) = task::block_on(subscriptions::generate_subscriptions(&user)) { + if let Ok(_) = task::block_on( + subscriptions::generate_subscriptions(&user), + ) { send(ClientboundNotification::Authenticated); } else { - send(ClientboundNotification::Error(WebSocketError::InternalError)); + send(ClientboundNotification::Error( + WebSocketError::InternalError, + )); } } else { - send(ClientboundNotification::Error(WebSocketError::InternalError)); + send(ClientboundNotification::Error( + WebSocketError::InternalError, + )); } - }, + } Err(_) => { - send(ClientboundNotification::Error(WebSocketError::OnboardingNotFinished)); + send(ClientboundNotification::Error( + WebSocketError::OnboardingNotFinished, + )); } } } Err(_) => { - send(ClientboundNotification::Error(WebSocketError::InvalidSession)); + send(ClientboundNotification::Error( + WebSocketError::InvalidSession, + )); } } } diff --git a/src/routes/users/add_friend.rs b/src/routes/users/add_friend.rs index 514c3ba004b9a4a81bf6c5a0692ed6c8125ff975..c031909e31cca6679e0a00a97756fc6f84457664 100644 --- a/src/routes/users/add_friend.rs +++ b/src/routes/users/add_friend.rs @@ -1,4 +1,3 @@ -use crate::{notifications::{events::ClientboundNotification, hive}, util::result::Result}; use crate::{ database::{ entities::{RelationshipStatus, User}, @@ -8,6 +7,10 @@ use crate::{ }, util::result::Error, }; +use crate::{ + notifications::{events::ClientboundNotification, hive}, + util::result::Result, +}; use futures::try_join; use mongodb::bson::doc; use rocket_contrib::json::JsonValue; @@ -55,16 +58,19 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { id: user.id.clone(), user: target.id.clone(), status: RelationshipStatus::Friend - }.publish(user.id.clone()), + } + .publish(user.id.clone()), ClientboundNotification::UserRelationship { id: target.id.clone(), user: user.id.clone(), status: RelationshipStatus::Friend - }.publish(target.id.clone()) - ).ok(); + } + .publish(target.id.clone()) + ) + .ok(); Ok(json!({ "status": "Friend" })) - }, + } Err(_) => Err(Error::DatabaseError { operation: "update_one", with: "user", @@ -108,19 +114,22 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { id: user.id.clone(), user: target.id.clone(), status: RelationshipStatus::Outgoing - }.publish(user.id.clone()), + } + .publish(user.id.clone()), ClientboundNotification::UserRelationship { id: target.id.clone(), user: user.id.clone(), status: RelationshipStatus::Incoming - }.publish(target.id.clone()) - ).ok(); + } + .publish(target.id.clone()) + ) + .ok(); hive::subscribe_if_exists(user.id.clone(), target.id.clone()).ok(); hive::subscribe_if_exists(target.id.clone(), user.id.clone()).ok(); Ok(json!({ "status": "Outgoing" })) - }, + } Err(_) => Err(Error::DatabaseError { operation: "update_one", with: "user", diff --git a/src/routes/users/block_user.rs b/src/routes/users/block_user.rs index 4f86bf9c914c68f1c411c6e75e16cf7656621ea4..99d8e1740805cbdc081d88dc63fbaf2951d17cc4 100644 --- a/src/routes/users/block_user.rs +++ b/src/routes/users/block_user.rs @@ -1,8 +1,11 @@ -use crate::{notifications::{events::ClientboundNotification, hive}, util::result::Result}; use crate::{ database::entities::RelationshipStatus, database::entities::User, database::get_collection, database::guards::reference::Ref, database::permissions::get_relationship, util::result::Error, }; +use crate::{ + notifications::{events::ClientboundNotification, hive}, + util::result::Result, +}; use futures::try_join; use mongodb::bson::doc; use rocket_contrib::json::JsonValue; @@ -35,8 +38,11 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { ClientboundNotification::UserRelationship { id: user.id.clone(), user: target.id.clone(), - status: RelationshipStatus::Blocked - }.publish(user.id.clone()).await.ok(); + status: RelationshipStatus::Blocked, + } + .publish(user.id.clone()) + .await + .ok(); Ok(json!({ "status": "Blocked" })) } @@ -77,19 +83,22 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { id: user.id.clone(), user: target.id.clone(), status: RelationshipStatus::Blocked - }.publish(user.id.clone()), + } + .publish(user.id.clone()), ClientboundNotification::UserRelationship { id: target.id.clone(), user: user.id.clone(), status: RelationshipStatus::BlockedOther - }.publish(target.id.clone()) - ).ok(); + } + .publish(target.id.clone()) + ) + .ok(); hive::subscribe_if_exists(user.id.clone(), target.id.clone()).ok(); hive::subscribe_if_exists(target.id.clone(), user.id.clone()).ok(); Ok(json!({ "status": "Blocked" })) - }, + } Err(_) => Err(Error::DatabaseError { operation: "update_one", with: "user", @@ -131,16 +140,19 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { id: user.id.clone(), user: target.id.clone(), status: RelationshipStatus::Blocked - }.publish(user.id.clone()), + } + .publish(user.id.clone()), ClientboundNotification::UserRelationship { id: target.id.clone(), user: user.id.clone(), status: RelationshipStatus::BlockedOther - }.publish(target.id.clone()) - ).ok(); - + } + .publish(target.id.clone()) + ) + .ok(); + Ok(json!({ "status": "Blocked" })) - }, + } Err(_) => Err(Error::DatabaseError { operation: "update_one", with: "user", diff --git a/src/routes/users/fetch_dms.rs b/src/routes/users/fetch_dms.rs index 218efaede2ae1dc53a80dc8666b3a38a8284091d..bd30d3231250ca34505d731295b161d1ca9d2a4c 100644 --- a/src/routes/users/fetch_dms.rs +++ b/src/routes/users/fetch_dms.rs @@ -1,8 +1,8 @@ -use crate::database::entities::{Channel, User}; +use crate::database::entities::User; use crate::database::get_collection; use crate::util::result::{Error, Result}; use futures::StreamExt; -use mongodb::bson::{doc, from_bson, Bson}; +use mongodb::bson::doc; use rocket_contrib::json::JsonValue; #[get("/dms")] diff --git a/src/routes/users/remove_friend.rs b/src/routes/users/remove_friend.rs index 97f6f20fffc985e04147f4200146ee0fdc3e215c..ebec7c6b9bc0c18439315ad75f84537fed011771 100644 --- a/src/routes/users/remove_friend.rs +++ b/src/routes/users/remove_friend.rs @@ -1,12 +1,15 @@ -use crate::{notifications::{hive, events::ClientboundNotification}, util::result::Result}; use crate::{ database::entities::RelationshipStatus, database::entities::User, database::get_collection, database::guards::reference::Ref, database::permissions::get_relationship, util::result::Error, }; +use crate::{ + notifications::{events::ClientboundNotification, hive}, + util::result::Result, +}; use futures::try_join; +use hive_pubsub::PubSub; use mongodb::bson::doc; use rocket_contrib::json::JsonValue; -use hive_pubsub::PubSub; #[delete("/<target>/friend")] pub async fn req(user: User, target: Ref) -> Result<JsonValue> { @@ -54,20 +57,23 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { id: user.id.clone(), user: target.id.clone(), status: RelationshipStatus::None - }.publish(user.id.clone()), + } + .publish(user.id.clone()), ClientboundNotification::UserRelationship { id: target.id.clone(), user: user.id.clone(), status: RelationshipStatus::None - }.publish(target.id.clone()) - ).ok(); - + } + .publish(target.id.clone()) + ) + .ok(); + let hive = hive::get_hive(); hive.unsubscribe(&user.id, &target.id).ok(); hive.unsubscribe(&target.id, &user.id).ok(); - + Ok(json!({ "status": "None" })) - }, + } Err(_) => Err(Error::DatabaseError { operation: "update_one", with: "user", diff --git a/src/routes/users/unblock_user.rs b/src/routes/users/unblock_user.rs index 68ebec22c466bb9a6fd4c29f978091f12121f3ad..2d7a13b6eee1b674ddd8b8fe53bbf7d52cf42dfc 100644 --- a/src/routes/users/unblock_user.rs +++ b/src/routes/users/unblock_user.rs @@ -1,9 +1,13 @@ -use crate::{notifications::{events::ClientboundNotification, hive}, util::result::Result}; use crate::{ database::entities::RelationshipStatus, database::entities::User, database::get_collection, database::guards::reference::Ref, database::permissions::get_relationship, util::result::Error, }; +use crate::{ + notifications::{events::ClientboundNotification, hive}, + util::result::Result, +}; use futures::try_join; +use hive_pubsub::PubSub; use mongodb::bson::doc; use rocket_contrib::json::JsonValue; @@ -42,8 +46,11 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { ClientboundNotification::UserRelationship { id: user.id.clone(), user: target.id.clone(), - status: RelationshipStatus::BlockedOther - }.publish(user.id.clone()).await.ok(); + status: RelationshipStatus::BlockedOther, + } + .publish(user.id.clone()) + .await + .ok(); Ok(json!({ "status": "BlockedOther" })) } @@ -82,20 +89,23 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { id: user.id.clone(), user: target.id.clone(), status: RelationshipStatus::None - }.publish(user.id.clone()), + } + .publish(user.id.clone()), ClientboundNotification::UserRelationship { id: target.id.clone(), user: user.id.clone(), status: RelationshipStatus::None - }.publish(target.id.clone()) - ).ok(); - + } + .publish(target.id.clone()) + ) + .ok(); + let hive = hive::get_hive(); hive.unsubscribe(&user.id, &target.id).ok(); hive.unsubscribe(&target.id, &user.id).ok(); - + Ok(json!({ "status": "None" })) - }, + } Err(_) => Err(Error::DatabaseError { operation: "update_one", with: "user",