Skip to content
Snippets Groups Projects
Commit 8bb694a1 authored by insert's avatar insert
Browse files

Post-handle hook, partial updates and group delete.

parent c38977e0
Branches
Tags
No related merge requests found
Pipeline #487 passed with stage
in 2 minutes and 18 seconds
use crate::database::*; use crate::database::*;
use crate::notifications::events::ClientboundNotification; use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result}; use crate::util::result::{Error, Result};
use mongodb::bson::to_document; use mongodb::bson::{doc, to_document};
use rocket_contrib::json::JsonValue;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
...@@ -62,4 +63,50 @@ impl Channel { ...@@ -62,4 +63,50 @@ impl Channel {
Ok(()) Ok(())
} }
pub async fn publish_update(&self, partial: JsonValue) -> Result<()> {
let id = self.id().to_string();
ClientboundNotification::ChannelUpdate(partial)
.publish(id)
.await
.ok();
Ok(())
}
pub async fn delete(&self) -> Result<()> {
let id = self.id();
get_collection("messages")
.delete_many(
doc! {
"channel": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_many",
with: "messages",
})?;
get_collection("channels")
.delete_one(
doc! {
"_id": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_one",
with: "channel",
})?;
ClientboundNotification::ChannelDelete { id: id.to_string() }
.publish(id.to_string())
.await
.ok();
Ok(())
}
} }
...@@ -3,30 +3,11 @@ use crate::{ ...@@ -3,30 +3,11 @@ use crate::{
notifications::events::ClientboundNotification, notifications::events::ClientboundNotification,
util::result::{Error, Result}, util::result::{Error, Result},
}; };
use mongodb::bson::{to_bson, DateTime}; use mongodb::bson::{doc, to_bson, DateTime};
use rocket_contrib::json::JsonValue;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use ulid::Ulid; use ulid::Ulid;
/*#[derive(Serialize, Deserialize, Debug)]
pub struct PreviousEntry {
pub content: String,
pub time: DateTime,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Message {
#[serde(rename = "_id")]
pub id: String,
pub nonce: Option<String>,
pub channel: String,
pub author: String,
pub content: String,
pub edited: Option<DateTime>,
pub previous_content: Vec<PreviousEntry>,
}*/
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Message { pub struct Message {
#[serde(rename = "_id")] #[serde(rename = "_id")]
...@@ -72,9 +53,9 @@ impl Message { ...@@ -72,9 +53,9 @@ impl Message {
Ok(()) Ok(())
} }
pub async fn publish_edit(self) -> Result<()> { pub async fn publish_update(&self, partial: JsonValue) -> Result<()> {
let channel = self.channel.clone(); let channel = self.channel.clone();
ClientboundNotification::MessageEdit(self) ClientboundNotification::MessageUpdate(partial)
.publish(channel) .publish(channel)
.await .await
.ok(); .ok();
...@@ -82,9 +63,22 @@ impl Message { ...@@ -82,9 +63,22 @@ impl Message {
Ok(()) Ok(())
} }
pub async fn publish_delete(self) -> Result<()> { pub async fn delete(&self) -> Result<()> {
get_collection("messages")
.delete_one(
doc! {
"_id": &self.id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_one",
with: "message",
})?;
let channel = self.channel.clone(); let channel = self.channel.clone();
ClientboundNotification::MessageDelete { id: self.id } ClientboundNotification::MessageDelete { id: self.id.clone() }
.publish(channel) .publish(channel)
.await .await
.ok(); .ok();
......
use hive_pubsub::PubSub; use hive_pubsub::PubSub;
use rauth::auth::Session; use rauth::auth::Session;
use rocket_contrib::json::JsonValue;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::Snafu; use snafu::Snafu;
...@@ -38,12 +39,13 @@ pub enum ClientboundNotification { ...@@ -38,12 +39,13 @@ pub enum ClientboundNotification {
}, },
Message(Message), Message(Message),
MessageEdit(Message), MessageUpdate(JsonValue),
MessageDelete { MessageDelete {
id: String, id: String,
}, },
ChannelCreate(Channel), ChannelCreate(Channel),
ChannelUpdate(JsonValue),
ChannelGroupJoin { ChannelGroupJoin {
id: String, id: String,
user: String, user: String,
...@@ -52,6 +54,9 @@ pub enum ClientboundNotification { ...@@ -52,6 +54,9 @@ pub enum ClientboundNotification {
id: String, id: String,
user: String, user: String,
}, },
ChannelDelete {
id: String,
},
UserRelationship { UserRelationship {
id: String, id: String,
...@@ -95,17 +100,31 @@ pub fn prehandle_hook(notification: &ClientboundNotification) { ...@@ -95,17 +100,31 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
.unsubscribe(&user.to_string(), &id.to_string()) .unsubscribe(&user.to_string(), &id.to_string())
.ok(); .ok();
} }
ClientboundNotification::UserRelationship { id, user, status } => match status { ClientboundNotification::UserRelationship { id, user, status } => {
RelationshipStatus::None => { if status != &RelationshipStatus::None {
subscribe_if_exists(id.clone(), user.clone()).ok();
}
}
_ => {}
}
}
pub fn posthandle_hook(notification: &ClientboundNotification) {
match &notification {
ClientboundNotification::ChannelDelete { id } => {
get_hive()
.hive
.drop_topic(&id)
.ok();
}
ClientboundNotification::UserRelationship { id, user, status } => {
if status == &RelationshipStatus::None {
get_hive() get_hive()
.hive .hive
.unsubscribe(&id.to_string(), &user.to_string()) .unsubscribe(&id.to_string(), &user.to_string())
.ok(); .ok();
} }
_ => { }
subscribe_if_exists(id.clone(), user.clone()).ok();
}
},
_ => {} _ => {}
} }
} }
...@@ -14,6 +14,8 @@ static HIVE: OnceCell<Hive> = OnceCell::new(); ...@@ -14,6 +14,8 @@ static HIVE: OnceCell<Hive> = OnceCell::new();
pub async fn init_hive() { pub async fn init_hive() {
let hive = MongodbPubSub::new( let hive = MongodbPubSub::new(
|ids, notification| { |ids, notification| {
super::events::posthandle_hook(&notification);
if let Ok(data) = to_string(&notification) { if let Ok(data) = to_string(&notification) {
debug!("Pushing out notification. {}", data); debug!("Pushing out notification. {}", data);
websocket::publish(ids, notification); websocket::publish(ids, notification);
......
use crate::database::*;
use crate::util::result::{Error, Result}; use crate::util::result::{Error, Result};
use crate::{database::*, notifications::events::ClientboundNotification};
use mongodb::bson::doc; use mongodb::bson::doc;
...@@ -12,7 +12,7 @@ pub async fn req(user: User, target: Ref) -> Result<()> { ...@@ -12,7 +12,7 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
Err(Error::LabelMe)? Err(Error::LabelMe)?
} }
match target { match &target {
Channel::SavedMessages { .. } => Err(Error::NoEffect), Channel::SavedMessages { .. } => Err(Error::NoEffect),
Channel::DirectMessage { .. } => { Channel::DirectMessage { .. } => {
get_collection("channels") get_collection("channels")
...@@ -35,6 +35,77 @@ pub async fn req(user: User, target: Ref) -> Result<()> { ...@@ -35,6 +35,77 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
Ok(()) Ok(())
} }
_ => unimplemented!(), Channel::Group {
id,
owner,
recipients,
..
} => {
if &user.id == owner {
if let Some(new_owner) = recipients.iter().find(|x| *x != &user.id) {
get_collection("channels")
.update_one(
doc! {
"_id": &id
},
doc! {
"$set": {
"owner": new_owner
},
"$pull": {
"recipients": &user.id
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
target.publish_update(json!({ "owner": new_owner })).await?;
} else {
return target.delete().await
}
} else {
get_collection("channels")
.update_one(
doc! {
"_id": &id
},
doc! {
"$pull": {
"recipients": &user.id
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
}
ClientboundNotification::ChannelGroupLeave {
id: id.clone(),
user: user.id.clone(),
}
.publish(id.clone())
.await
.ok();
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
format!("<@{}> left the group.", user.id),
)
.publish()
.await
.ok();
Ok(())
}
} }
} }
...@@ -20,20 +20,5 @@ pub async fn req(user: User, target: Ref, msg: Ref) -> Result<()> { ...@@ -20,20 +20,5 @@ pub async fn req(user: User, target: Ref, msg: Ref) -> Result<()> {
} }
} }
get_collection("messages") message.delete().await
.delete_one(
doc! {
"_id": &message.id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_one",
with: "message",
})?;
message.publish_delete().await?;
Ok(())
} }
...@@ -25,7 +25,7 @@ pub async fn req(user: User, target: Ref, msg: Ref, edit: Json<Data>) -> Result< ...@@ -25,7 +25,7 @@ pub async fn req(user: User, target: Ref, msg: Ref, edit: Json<Data>) -> Result<
Err(Error::LabelMe)? Err(Error::LabelMe)?
} }
let mut message = msg.fetch_message().await?; let message = msg.fetch_message().await?;
if message.author != user.id { if message.author != user.id {
Err(Error::CannotEditMessage)? Err(Error::CannotEditMessage)?
} }
...@@ -50,9 +50,5 @@ pub async fn req(user: User, target: Ref, msg: Ref, edit: Json<Data>) -> Result< ...@@ -50,9 +50,5 @@ pub async fn req(user: User, target: Ref, msg: Ref, edit: Json<Data>) -> Result<
with: "message", with: "message",
})?; })?;
message.content = edit.content.clone(); message.publish_update(json!({ "content": edit.content, "edited": DateTime(edited) })).await
message.edited = Some(DateTime(edited));
message.publish_edit().await?;
Ok(())
} }
...@@ -8,7 +8,7 @@ use rocket_contrib::json::JsonValue; ...@@ -8,7 +8,7 @@ use rocket_contrib::json::JsonValue;
#[get("/")] #[get("/")]
pub async fn root() -> JsonValue { pub async fn root() -> JsonValue {
json!({ json!({
"revolt": "0.3.1-alpha.0", "revolt": "0.3.1",
"features": { "features": {
"registration": !*DISABLE_REGISTRATION, "registration": !*DISABLE_REGISTRATION,
"captcha": { "captcha": {
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment