Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 1741 additions and 158 deletions
mod autumn;
mod channel; mod channel;
mod guild; mod invites;
mod message; mod message;
mod microservice;
mod server;
mod sync;
mod user; mod user;
use microservice::*;
pub use autumn::*; pub use autumn::*;
pub use channel::*; pub use channel::*;
pub use guild::*; pub use invites::*;
pub use january::*;
pub use message::*; pub use message::*;
pub use server::*;
pub use sync::*;
pub use user::*; pub use user::*;
use std::collections::HashMap;
use crate::database::*;
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use futures::StreamExt;
use mongodb::bson::{Bson, doc};
use mongodb::bson::from_document;
use mongodb::bson::to_document;
use mongodb::bson::Document;
use rocket_contrib::json::JsonValue;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MemberCompositeKey {
pub server: String,
pub user: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Member {
#[serde(rename = "_id")]
pub id: MemberCompositeKey,
#[serde(skip_serializing_if = "Option::is_none")]
pub nickname: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub avatar: Option<File>,
#[serde(skip_serializing_if = "Option::is_none")]
pub roles: Option<Vec<String>>
}
pub type PermissionTuple = (
i32, // server permission
i32 // channel permission
);
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Role {
pub name: String,
pub permissions: PermissionTuple,
#[serde(skip_serializing_if = "Option::is_none")]
pub colour: Option<String>
// Bri'ish API conventions
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Category {
pub id: String,
pub title: String,
pub channels: Vec<String>
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Ban {
#[serde(rename = "_id")]
pub id: MemberCompositeKey,
pub reason: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SystemMessageChannels {
pub user_joined: Option<String>,
pub user_left: Option<String>,
pub user_kicked: Option<String>,
pub user_banned: Option<String>,
}
pub enum RemoveMember {
Leave,
Kick,
Ban,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Server {
#[serde(rename = "_id")]
pub id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub nonce: Option<String>,
pub owner: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub channels: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub categories: Option<Vec<Category>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub system_messages: Option<SystemMessageChannels>,
#[serde(default = "HashMap::new", skip_serializing_if = "HashMap::is_empty")]
pub roles: HashMap<String, Role>,
pub default_permissions: PermissionTuple,
#[serde(skip_serializing_if = "Option::is_none")]
pub icon: Option<File>,
#[serde(skip_serializing_if = "Option::is_none")]
pub banner: Option<File>,
}
impl Server {
pub async fn create(self) -> Result<()> {
get_collection("servers")
.insert_one(
to_document(&self).map_err(|_| Error::DatabaseError {
operation: "to_bson",
with: "channel",
})?,
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "insert_one",
with: "server",
})?;
Ok(())
}
pub async fn publish_update(&self, data: JsonValue) -> Result<()> {
ClientboundNotification::ServerUpdate {
id: self.id.clone(),
data,
clear: None,
}
.publish(self.id.clone());
Ok(())
}
pub async fn delete(&self) -> Result<()> {
// Check if there are any attachments we need to delete.
Channel::delete_messages(Bson::Document(doc! { "$in": &self.channels })).await?;
// Delete all channels.
get_collection("channels")
.delete_many(
doc! {
"server": &self.id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_many",
with: "channels",
})?;
// Delete any associated objects, e.g. unreads and invites.
Channel::delete_associated_objects(Bson::Document(doc! { "$in": &self.channels })).await?;
// Delete members and bans.
for with in &["server_members", "server_bans"] {
get_collection(with)
.delete_many(
doc! {
"_id.server": &self.id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_many",
with,
})?;
}
// Delete server icon / banner.
if let Some(attachment) = &self.icon {
attachment.delete().await?;
}
if let Some(attachment) = &self.banner {
attachment.delete().await?;
}
// Delete the server
get_collection("servers")
.delete_one(
doc! {
"_id": &self.id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_one",
with: "server",
})?;
ClientboundNotification::ServerDelete {
id: self.id.clone(),
}
.publish(self.id.clone());
Ok(())
}
pub async fn fetch_members(id: &str) -> Result<Vec<Member>> {
Ok(get_collection("server_members")
.find(
doc! {
"_id.server": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "server_members",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| from_document(x).ok())
.collect::<Vec<Member>>())
}
pub async fn fetch_member_ids(id: &str) -> Result<Vec<String>> {
Ok(get_collection("server_members")
.find(
doc! {
"_id.server": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "server_members",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
x.get_document("_id")
.ok()
.map(|i| i.get_str("user").ok().map(|x| x.to_string()))
})
.flatten()
.collect::<Vec<String>>())
}
pub async fn join_member(&self, id: &str) -> Result<()> {
if get_collection("server_bans")
.find_one(
doc! {
"_id.server": &self.id,
"_id.user": &id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_bans",
})?
.is_some()
{
return Err(Error::Banned);
}
get_collection("server_members")
.insert_one(
doc! {
"_id": {
"server": &self.id,
"user": &id
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "insert_one",
with: "server_members",
})?;
ClientboundNotification::ServerMemberJoin {
id: self.id.clone(),
user: id.to_string(),
}
.publish(self.id.clone());
if let Some(channels) = &self.system_messages {
if let Some(cid) = &channels.user_joined {
let channel = Ref::from_unchecked(cid.clone()).fetch_channel().await?;
Content::SystemMessage(SystemMessage::UserJoined { id: id.to_string() })
.send_as_system(&channel)
.await?;
}
}
Ok(())
}
pub async fn remove_member(&self, id: &str, removal: RemoveMember) -> Result<()> {
let result = get_collection("server_members")
.delete_one(
doc! {
"_id": {
"server": &self.id,
"user": &id
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "delete_one",
with: "server_members",
})?;
if result.deleted_count > 0 {
ClientboundNotification::ServerMemberLeave {
id: self.id.clone(),
user: id.to_string(),
}
.publish(self.id.clone());
if let Some(channels) = &self.system_messages {
let message = match removal {
RemoveMember::Leave => {
if let Some(cid) = &channels.user_left {
Some((cid.clone(), SystemMessage::UserLeft { id: id.to_string() }))
} else {
None
}
}
RemoveMember::Kick => {
if let Some(cid) = &channels.user_kicked {
Some((
cid.clone(),
SystemMessage::UserKicked { id: id.to_string() },
))
} else {
None
}
}
RemoveMember::Ban => {
if let Some(cid) = &channels.user_banned {
Some((
cid.clone(),
SystemMessage::UserBanned { id: id.to_string() },
))
} else {
None
}
}
};
if let Some((cid, message)) = message {
let channel = Ref::from_unchecked(cid).fetch_channel().await?;
Content::SystemMessage(message)
.send_as_system(&channel)
.await?;
}
}
}
Ok(())
}
pub async fn get_member_count(id: &str) -> Result<i64> {
Ok(get_collection("server_members")
.count_documents(
doc! {
"_id.server": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "count_documents",
with: "server_members",
})?)
}
}
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub type UserSettings = HashMap<String, (i64, String)>;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")] pub struct ChannelCompositeKey {
enum Metadata { pub channel: String,
File, pub user: String,
Image { width: isize, height: isize },
Video { width: isize, height: isize },
Audio,
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct File { pub struct ChannelUnread {
#[serde(rename = "_id")] #[serde(rename = "_id")]
pub id: String, pub id: ChannelCompositeKey,
filename: String,
metadata: Metadata,
content_type: String,
message_id: Option<String>, pub last_id: Option<String>,
pub mentions: Option<Vec<String>>,
} }
use futures::StreamExt;
use mongodb::bson::Document;
use mongodb::options::{Collation, FindOneOptions};
use mongodb::{
bson::{doc, from_document},
options::FindOptions,
};
use num_enum::TryFromPrimitive;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::ops;
use ulid::Ulid;
use validator::Validate;
use crate::{database::permissions::user::UserPermissions, notifications::websocket::is_online}; use crate::database::permissions::user::UserPermissions;
use crate::database::*;
use crate::notifications::websocket::is_online;
use crate::util::result::{Error, Result};
use crate::util::variables::EARLY_ADOPTER_BADGE;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum RelationshipStatus { pub enum RelationshipStatus {
...@@ -13,21 +28,69 @@ pub enum RelationshipStatus { ...@@ -13,21 +28,69 @@ pub enum RelationshipStatus {
BlockedOther, BlockedOther,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Relationship { pub struct Relationship {
#[serde(rename = "_id")] #[serde(rename = "_id")]
pub id: String, pub id: String,
pub status: RelationshipStatus, pub status: RelationshipStatus,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Presence {
Online,
Idle,
Busy,
Invisible,
}
#[derive(Validate, Serialize, Deserialize, Debug, Clone)]
pub struct UserStatus {
#[validate(length(min = 1, max = 128))]
#[serde(skip_serializing_if = "Option::is_none")]
pub text: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub presence: Option<Presence>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserProfile {
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub background: Option<File>,
}
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(i32)]
pub enum Badges {
Developer = 1,
Translator = 2,
Supporter = 4,
ResponsibleDisclosure = 8,
RevoltTeam = 16,
EarlyAdopter = 256,
}
impl_op_ex_commutative!(+ |a: &i32, b: &Badges| -> i32 { *a | *b as i32 });
// When changing this struct, update notifications/payload.rs#80
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct User { pub struct User {
#[serde(rename = "_id")] #[serde(rename = "_id")]
pub id: String, pub id: String,
pub username: String, pub username: String,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub avatar: Option<File>,
#[serde(skip_serializing_if = "Option::is_none")]
pub relations: Option<Vec<Relationship>>, pub relations: Option<Vec<Relationship>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub badges: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<UserStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
pub profile: Option<UserProfile>,
// ? This should never be pushed to the collection. // ? This should never be pushed to the collection.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub relationship: Option<RelationshipStatus>, pub relationship: Option<RelationshipStatus>,
...@@ -38,6 +101,8 @@ pub struct User { ...@@ -38,6 +101,8 @@ pub struct User {
impl User { impl User {
/// Mutate the user object to include relationship as seen by user. /// Mutate the user object to include relationship as seen by user.
pub fn from(mut self, user: &User) -> User { pub fn from(mut self, user: &User) -> User {
self.relationship = Some(RelationshipStatus::None);
if self.id == user.id { if self.id == user.id {
self.relationship = Some(RelationshipStatus::User); self.relationship = Some(RelationshipStatus::User);
return self; return self;
...@@ -56,10 +121,180 @@ impl User { ...@@ -56,10 +121,180 @@ impl User {
/// Mutate the user object to appear as seen by user. /// Mutate the user object to appear as seen by user.
pub fn with(mut self, permissions: UserPermissions<[u32; 1]>) -> User { pub fn with(mut self, permissions: UserPermissions<[u32; 1]>) -> User {
let mut badges = self.badges.unwrap_or_else(|| 0);
if let Ok(id) = Ulid::from_string(&self.id) {
if id.datetime().timestamp_millis() < *EARLY_ADOPTER_BADGE {
badges = badges + Badges::EarlyAdopter;
}
}
self.badges = Some(badges);
if permissions.get_view_profile() { if permissions.get_view_profile() {
self.online = Some(is_online(&self.id)); self.online = Some(is_online(&self.id));
} else {
self.status = None;
} }
self.profile = None;
self self
} }
/// Mutate the user object to appear as seen by user.
/// Also overrides the relationship status.
pub async fn from_override(
mut self,
user: &User,
relationship: RelationshipStatus,
) -> Result<User> {
let permissions = PermissionCalculator::new(&user)
.with_relationship(&relationship)
.for_user(&self.id)
.await?;
self.relations = None;
self.relationship = Some(relationship);
Ok(self.with(permissions))
}
/// Utility function for checking claimed usernames.
pub async fn is_username_taken(username: &str) -> Result<bool> {
if username.to_lowercase() == "revolt" || username.to_lowercase() == "admin" || username.to_lowercase() == "system" {
return Ok(true);
}
if get_collection("users")
.find_one(
doc! {
"username": username
},
FindOneOptions::builder()
.collation(Collation::builder().locale("en").strength(2).build())
.build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "user",
})?
.is_some()
{
Ok(true)
} else {
Ok(false)
}
}
/// Utility function for fetching multiple users from the perspective of one.
/// Assumes user has a mutual connection with others.
pub async fn fetch_multiple_users(&self, user_ids: Vec<String>) -> Result<Vec<User>> {
let mut users = vec![];
let mut cursor = get_collection("users")
.find(
doc! {
"_id": {
"$in": user_ids
}
},
FindOptions::builder()
.projection(
doc! { "_id": 1, "username": 1, "avatar": 1, "badges": 1, "status": 1 },
)
.build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "users",
})?;
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
let other: User = from_document(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "user",
})?;
let permissions = PermissionCalculator::new(&self)
.with_mutual_connection()
.with_user(&other)
.for_user_given()
.await?;
users.push(other.from(&self).with(permissions));
}
}
Ok(users)
}
/// Utility function to get all of a user's memberships.
pub async fn fetch_memberships(id: &str) -> Result<Vec<Member>> {
Ok(get_collection("server_members")
.find(
doc! {
"_id.user": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "server_members",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
from_document(x).ok()
})
.collect::<Vec<Member>>())
}
/// Utility function to get all the server IDs the user is in.
pub async fn fetch_server_ids(id: &str) -> Result<Vec<String>> {
Ok(get_collection("server_members")
.find(
doc! {
"_id.user": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "server_members",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
x.get_document("_id")
.ok()
.map(|i| i.get_str("server").ok().map(|x| x.to_string()))
})
.flatten()
.collect::<Vec<String>>())
}
/// Utility function to fetch unread objects for user.
pub async fn fetch_unreads(id: &str) -> Result<Vec<Document>> {
Ok(get_collection("channel_unreads")
.find(
doc! {
"_id.user": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "user_settings",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await)
}
} }
...@@ -9,16 +9,23 @@ use validator::Validate; ...@@ -9,16 +9,23 @@ use validator::Validate;
#[derive(Validate, Serialize, Deserialize)] #[derive(Validate, Serialize, Deserialize)]
pub struct Ref { pub struct Ref {
#[validate(length(min = 26, max = 26))] #[validate(length(min = 1, max = 26))]
pub id: String, pub id: String,
} }
impl Ref { impl Ref {
pub fn from_unchecked(id: String) -> Ref {
Ref { id }
}
pub fn from(id: String) -> Result<Ref> { pub fn from(id: String) -> Result<Ref> {
Ok(Ref { id }) let r = Ref { id };
r.validate()
.map_err(|error| Error::FailedValidation { error })?;
Ok(r)
} }
pub async fn fetch<T: DeserializeOwned>(&self, collection: &'static str) -> Result<T> { async fn fetch<T: DeserializeOwned>(&self, collection: &'static str) -> Result<T> {
let doc = get_collection(&collection) let doc = get_collection(&collection)
.find_one( .find_one(
doc! { doc! {
...@@ -31,7 +38,7 @@ impl Ref { ...@@ -31,7 +38,7 @@ impl Ref {
operation: "find_one", operation: "find_one",
with: &collection, with: &collection,
})? })?
.ok_or_else(|| Error::UnknownUser)?; .ok_or_else(|| Error::NotFound)?;
Ok(from_document::<T>(doc).map_err(|_| Error::DatabaseError { Ok(from_document::<T>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document", operation: "from_document",
...@@ -47,6 +54,60 @@ impl Ref { ...@@ -47,6 +54,60 @@ impl Ref {
self.fetch("channels").await self.fetch("channels").await
} }
pub async fn fetch_server(&self) -> Result<Server> {
self.fetch("servers").await
}
pub async fn fetch_invite(&self) -> Result<Invite> {
self.fetch("channel_invites").await
}
pub async fn fetch_member(&self, server: &str) -> Result<Member> {
let doc = get_collection("server_members")
.find_one(
doc! {
"_id.user": &self.id,
"_id.server": server
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_member",
})?
.ok_or_else(|| Error::NotFound)?;
Ok(
from_document::<Member>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server_member",
})?,
)
}
pub async fn fetch_ban(&self, server: &str) -> Result<Ban> {
let doc = get_collection("server_bans")
.find_one(
doc! {
"_id.user": &self.id,
"_id.server": server
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_ban",
})?
.ok_or_else(|| Error::NotFound)?;
Ok(from_document::<Ban>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server_ban",
})?)
}
pub async fn fetch_message(&self, channel: &Channel) -> Result<Message> { pub async fn fetch_message(&self, channel: &Channel) -> Result<Message> {
let message: Message = self.fetch("messages").await?; let message: Message = self.fetch("messages").await?;
if &message.channel != channel.id() { if &message.channel != channel.id() {
......
...@@ -25,10 +25,38 @@ pub async fn create_database() { ...@@ -25,10 +25,38 @@ pub async fn create_database() {
.await .await
.expect("Failed to create messages collection."); .expect("Failed to create messages collection.");
db.create_collection("servers", None)
.await
.expect("Failed to create servers collection.");
db.create_collection("server_members", None)
.await
.expect("Failed to create server_members collection.");
db.create_collection("server_bans", None)
.await
.expect("Failed to create server_bans collection.");
db.create_collection("channel_invites", None)
.await
.expect("Failed to create channel_invites collection.");
db.create_collection("channel_unreads", None)
.await
.expect("Failed to create channel_unreads collection.");
db.create_collection("migrations", None) db.create_collection("migrations", None)
.await .await
.expect("Failed to create migrations collection."); .expect("Failed to create migrations collection.");
db.create_collection("attachments", None)
.await
.expect("Failed to create attachments collection.");
db.create_collection("user_settings", None)
.await
.expect("Failed to create user_settings collection.");
db.create_collection( db.create_collection(
"pubsub", "pubsub",
CreateCollectionOptions::builder() CreateCollectionOptions::builder()
...@@ -94,6 +122,23 @@ pub async fn create_database() { ...@@ -94,6 +122,23 @@ pub async fn create_database() {
.await .await
.expect("Failed to create username index."); .expect("Failed to create username index.");
db.run_command(
doc! {
"createIndexes": "messages",
"indexes": [
{
"key": {
"content": "text"
},
"name": "content"
}
]
},
None,
)
.await
.expect("Failed to create message index.");
db.collection("migrations") db.collection("migrations")
.insert_one( .insert_one(
doc! { doc! {
......
use crate::database::get_collection; use crate::database::{permissions, get_collection, get_db, PermissionTuple};
use futures::StreamExt;
use log::info; use log::info;
use mongodb::bson::{doc, from_document}; use mongodb::{bson::{doc, from_document, to_document}, options::FindOptions};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
...@@ -10,7 +11,7 @@ struct MigrationInfo { ...@@ -10,7 +11,7 @@ struct MigrationInfo {
revision: i32, revision: i32,
} }
pub const LATEST_REVISION: i32 = 0; pub const LATEST_REVISION: i32 = 7;
pub async fn migrate_database() { pub async fn migrate_database() {
let migrations = get_collection("migrations"); let migrations = get_collection("migrations");
...@@ -53,6 +54,155 @@ pub async fn run_migrations(revision: i32) -> i32 { ...@@ -53,6 +54,155 @@ pub async fn run_migrations(revision: i32) -> i32 {
info!("Running migration [revision 0]: Test migration system."); info!("Running migration [revision 0]: Test migration system.");
} }
if revision <= 1 {
info!("Running migration [revision 1 / 2021-04-24]: Migrate to Autumn v1.0.0.");
let messages = get_collection("messages");
let attachments = get_collection("attachments");
messages
.update_many(
doc! { "attachment": { "$exists": 1 } },
doc! { "$set": { "attachment.tag": "attachments", "attachment.size": 0 } },
None,
)
.await
.expect("Failed to update messages.");
attachments
.update_many(
doc! {},
doc! { "$set": { "tag": "attachments", "size": 0 } },
None,
)
.await
.expect("Failed to update attachments.");
}
if revision <= 2 {
info!("Running migration [revision 2 / 2021-05-08]: Add servers collection.");
get_db()
.create_collection("servers", None)
.await
.expect("Failed to create servers collection.");
}
if revision <= 3 {
info!("Running migration [revision 3 / 2021-05-25]: Support multiple file uploads, add channel_unreads and user_settings.");
let messages = get_collection("messages");
let mut cursor = messages
.find(
doc! {
"attachment": {
"$exists": 1
}
},
FindOptions::builder()
.projection(doc! {
"_id": 1,
"attachments": [ "$attachment" ]
})
.build(),
)
.await
.expect("Failed to fetch messages.");
while let Some(result) = cursor.next().await {
let doc = result.unwrap();
let id = doc.get_str("_id").unwrap();
let attachments = doc.get_array("attachments").unwrap();
messages
.update_one(
doc! { "_id": id },
doc! { "$unset": { "attachment": 1 }, "$set": { "attachments": attachments } },
None,
)
.await
.unwrap();
}
get_db()
.create_collection("channel_unreads", None)
.await
.expect("Failed to create channel_unreads collection.");
get_db()
.create_collection("user_settings", None)
.await
.expect("Failed to create user_settings collection.");
}
if revision <= 4 {
info!("Running migration [revision 4 / 2021-06-01]: Add more server collections.");
get_db()
.create_collection("server_members", None)
.await
.expect("Failed to create server_members collection.");
get_db()
.create_collection("server_bans", None)
.await
.expect("Failed to create server_bans collection.");
get_db()
.create_collection("channel_invites", None)
.await
.expect("Failed to create channel_invites collection.");
}
if revision <= 5 {
info!("Running migration [revision 5 / 2021-06-26]: Add permissions.");
#[derive(Serialize)]
struct Server {
pub default_permissions: PermissionTuple,
}
let server = Server {
default_permissions: (
*permissions::server::DEFAULT_PERMISSION as i32,
*permissions::channel::DEFAULT_PERMISSION_SERVER as i32
)
};
get_collection("servers")
.update_many(
doc! { },
doc! {
"$set": to_document(&server).unwrap()
},
None
)
.await
.expect("Failed to migrate servers.");
}
if revision <= 6 {
info!("Running migration [revision 6 / 2021-07-09]: Add message text index.");
get_db()
.run_command(
doc! {
"createIndexes": "messages",
"indexes": [
{
"key": {
"content": "text"
},
"name": "content"
}
]
},
None,
)
.await
.expect("Failed to create message index.");
}
// Reminder to update LATEST_REVISION when adding new migrations. // Reminder to update LATEST_REVISION when adding new migrations.
LATEST_REVISION LATEST_REVISION
} }
use crate::database::*; use crate::database::*;
use crate::util::result::Result; use crate::util::result::{Error, Result};
use super::PermissionCalculator; use super::PermissionCalculator;
...@@ -9,22 +9,51 @@ use std::ops; ...@@ -9,22 +9,51 @@ use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)] #[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)] #[repr(u32)]
pub enum ChannelPermission { pub enum ChannelPermission {
View = 1, View = 0b00000000000000000000000000000001, // 1
SendMessage = 2, SendMessage = 0b00000000000000000000000000000010, // 2
ManageMessages = 4, ManageMessages = 0b00000000000000000000000000000100, // 4
ManageChannel = 0b00000000000000000000000000001000, // 8
VoiceCall = 0b00000000000000000000000000010000, // 16
InviteOthers = 0b00000000000000000000000000100000, // 32
EmbedLinks = 0b00000000000000000000000001000000, // 64
UploadFiles = 0b00000000000000000000000010000000, // 128
} }
lazy_static! {
pub static ref DEFAULT_PERMISSION_DM: u32 =
ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::ManageChannel
+ ChannelPermission::VoiceCall
+ ChannelPermission::InviteOthers
+ ChannelPermission::EmbedLinks
+ ChannelPermission::UploadFiles;
pub static ref DEFAULT_PERMISSION_SERVER: u32 =
ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::VoiceCall
+ ChannelPermission::InviteOthers
+ ChannelPermission::EmbedLinks
+ ChannelPermission::UploadFiles;
}
impl_op_ex!(+ |a: &ChannelPermission, b: &ChannelPermission| -> u32 { *a as u32 | *b as u32 });
impl_op_ex_commutative!(+ |a: &u32, b: &ChannelPermission| -> u32 { *a | *b as u32 });
bitfield! { bitfield! {
pub struct ChannelPermissions(MSB0 [u32]); pub struct ChannelPermissions(MSB0 [u32]);
u32; u32;
pub get_view, _: 31; pub get_view, _: 31;
pub get_send_message, _: 30; pub get_send_message, _: 30;
pub get_manage_messages, _: 29; pub get_manage_messages, _: 29;
pub get_manage_channel, _: 28;
pub get_voice_call, _: 27;
pub get_invite_others, _: 26;
pub get_embed_links, _: 25;
pub get_upload_files, _: 24;
} }
impl_op_ex!(+ |a: &ChannelPermission, b: &ChannelPermission| -> u32 { *a as u32 | *b as u32 });
impl_op_ex_commutative!(+ |a: &u32, b: &ChannelPermission| -> u32 { *a | *b as u32 });
impl<'a> PermissionCalculator<'a> { impl<'a> PermissionCalculator<'a> {
pub async fn calculate_channel(self) -> Result<u32> { pub async fn calculate_channel(self) -> Result<u32> {
let channel = if let Some(channel) = self.channel { let channel = if let Some(channel) = self.channel {
...@@ -36,9 +65,7 @@ impl<'a> PermissionCalculator<'a> { ...@@ -36,9 +65,7 @@ impl<'a> PermissionCalculator<'a> {
match channel { match channel {
Channel::SavedMessages { user: owner, .. } => { Channel::SavedMessages { user: owner, .. } => {
if &self.perspective.id == owner { if &self.perspective.id == owner {
Ok(ChannelPermission::View Ok(u32::MAX)
+ ChannelPermission::SendMessage
+ ChannelPermission::ManageMessages)
} else { } else {
Ok(0) Ok(0)
} }
...@@ -54,7 +81,7 @@ impl<'a> PermissionCalculator<'a> { ...@@ -54,7 +81,7 @@ impl<'a> PermissionCalculator<'a> {
let perms = self.for_user(recipient).await?; let perms = self.for_user(recipient).await?;
if perms.get_send_message() { if perms.get_send_message() {
return Ok(ChannelPermission::View + ChannelPermission::SendMessage); return Ok(*DEFAULT_PERMISSION_DM);
} }
return Ok(ChannelPermission::View as u32); return Ok(ChannelPermission::View as u32);
...@@ -63,17 +90,63 @@ impl<'a> PermissionCalculator<'a> { ...@@ -63,17 +90,63 @@ impl<'a> PermissionCalculator<'a> {
Ok(0) Ok(0)
} }
Channel::Group { recipients, .. } => { Channel::Group { recipients, permissions, owner, .. } => {
if &self.perspective.id == owner {
return Ok(*DEFAULT_PERMISSION_DM)
}
if recipients if recipients
.iter() .iter()
.find(|x| *x == &self.perspective.id) .find(|x| *x == &self.perspective.id)
.is_some() .is_some()
{ {
Ok(ChannelPermission::View + ChannelPermission::SendMessage) if let Some(permissions) = permissions {
Ok(permissions.clone() as u32)
} else {
Ok(*DEFAULT_PERMISSION_DM)
}
} else { } else {
Ok(0) Ok(0)
} }
} }
Channel::TextChannel { server, default_permissions, role_permissions, .. }
| Channel::VoiceChannel { server, default_permissions, role_permissions, .. } => {
let server = Ref::from_unchecked(server.clone()).fetch_server().await?;
if self.perspective.id == server.owner {
Ok(u32::MAX)
} else {
match Ref::from_unchecked(self.perspective.id.clone()).fetch_member(&server.id).await {
Ok(member) => {
let mut perm = if let Some(permission) = default_permissions {
*permission as u32
} else {
server.default_permissions.1 as u32
};
if let Some(roles) = member.roles {
for role in roles {
if let Some(permission) = role_permissions.get(&role) {
perm |= *permission as u32;
}
if let Some(server_role) = server.roles.get(&role) {
perm |= server_role.permissions.1 as u32;
}
}
}
Ok(perm)
}
Err(error) => {
match &error {
Error::NotFound => Ok(0),
_ => Err(error)
}
}
}
}
}
} }
} }
......
pub use crate::database::*; pub use crate::database::*;
pub mod channel; pub mod channel;
pub mod server;
pub mod user; pub mod user;
pub use user::get_relationship; pub use user::get_relationship;
...@@ -9,9 +10,12 @@ pub struct PermissionCalculator<'a> { ...@@ -9,9 +10,12 @@ pub struct PermissionCalculator<'a> {
perspective: &'a User, perspective: &'a User,
user: Option<&'a User>, user: Option<&'a User>,
relationship: Option<&'a RelationshipStatus>,
channel: Option<&'a Channel>, channel: Option<&'a Channel>,
server: Option<&'a Server>,
// member: Option<&'a Member>,
has_mutual_conncetion: bool, has_mutual_connection: bool,
} }
impl<'a> PermissionCalculator<'a> { impl<'a> PermissionCalculator<'a> {
...@@ -20,9 +24,12 @@ impl<'a> PermissionCalculator<'a> { ...@@ -20,9 +24,12 @@ impl<'a> PermissionCalculator<'a> {
perspective, perspective,
user: None, user: None,
relationship: None,
channel: None, channel: None,
server: None,
// member: None,
has_mutual_conncetion: false, has_mutual_connection: false,
} }
} }
...@@ -33,6 +40,13 @@ impl<'a> PermissionCalculator<'a> { ...@@ -33,6 +40,13 @@ impl<'a> PermissionCalculator<'a> {
} }
} }
pub fn with_relationship(self, relationship: &'a RelationshipStatus) -> PermissionCalculator {
PermissionCalculator {
relationship: Some(&relationship),
..self
}
}
pub fn with_channel(self, channel: &'a Channel) -> PermissionCalculator { pub fn with_channel(self, channel: &'a Channel) -> PermissionCalculator {
PermissionCalculator { PermissionCalculator {
channel: Some(&channel), channel: Some(&channel),
...@@ -40,9 +54,23 @@ impl<'a> PermissionCalculator<'a> { ...@@ -40,9 +54,23 @@ impl<'a> PermissionCalculator<'a> {
} }
} }
pub fn with_server(self, server: &'a Server) -> PermissionCalculator {
PermissionCalculator {
server: Some(&server),
..self
}
}
/* pub fn with_member(self, member: &'a Member) -> PermissionCalculator {
PermissionCalculator {
member: Some(&member),
..self
}
} */
pub fn with_mutual_connection(self) -> PermissionCalculator<'a> { pub fn with_mutual_connection(self) -> PermissionCalculator<'a> {
PermissionCalculator { PermissionCalculator {
has_mutual_conncetion: true, has_mutual_connection: true,
..self ..self
} }
} }
......
use crate::util::result::{Error, Result};
use super::PermissionCalculator;
use super::Ref;
use num_enum::TryFromPrimitive;
use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)]
pub enum ServerPermission {
View = 0b00000000000000000000000000000001, // 1
ManageRoles = 0b00000000000000000000000000000010, // 2
ManageChannels = 0b00000000000000000000000000000100, // 4
ManageServer = 0b00000000000000000000000000001000, // 8
KickMembers = 0b00000000000000000000000000010000, // 16
BanMembers = 0b00000000000000000000000000100000, // 32
// 6 bits of space
ChangeNickname = 0b00000000000000000001000000000000, // 4096
ManageNicknames = 0b00000000000000000010000000000000, // 8192
ChangeAvatar = 0b00000000000000000100000000000000, // 16382
RemoveAvatars = 0b00000000000000001000000000000000, // 32768
// 16 bits of space
}
lazy_static! {
pub static ref DEFAULT_PERMISSION: u32 =
ServerPermission::View
+ ServerPermission::ChangeNickname
+ ServerPermission::ChangeAvatar;
}
impl_op_ex!(+ |a: &ServerPermission, b: &ServerPermission| -> u32 { *a as u32 | *b as u32 });
impl_op_ex_commutative!(+ |a: &u32, b: &ServerPermission| -> u32 { *a | *b as u32 });
bitfield! {
pub struct ServerPermissions(MSB0 [u32]);
u32;
pub get_view, _: 31;
pub get_manage_roles, _: 30;
pub get_manage_channels, _: 29;
pub get_manage_server, _: 28;
pub get_kick_members, _: 27;
pub get_ban_members, _: 26;
pub get_change_nickname, _: 19;
pub get_manage_nicknames, _: 18;
pub get_change_avatar, _: 17;
pub get_remove_avatars, _: 16;
}
impl<'a> PermissionCalculator<'a> {
pub async fn calculate_server(self) -> Result<u32> {
let server = if let Some(server) = self.server {
server
} else {
unreachable!()
};
if self.perspective.id == server.owner {
Ok(u32::MAX)
} else {
match Ref::from_unchecked(self.perspective.id.clone()).fetch_member(&server.id).await {
Ok(member) => {
let mut perm = server.default_permissions.0 as u32;
if let Some(roles) = member.roles {
for role in roles {
if let Some(server_role) = server.roles.get(&role) {
perm |= server_role.permissions.0 as u32;
}
}
}
Ok(perm)
}
Err(error) => {
match &error {
Error::NotFound => Ok(0),
_ => Err(error)
}
}
}
}
}
pub async fn for_server(self) -> Result<ServerPermissions<[u32; 1]>> {
Ok(ServerPermissions([self.calculate_server().await?]))
}
}
...@@ -10,10 +10,10 @@ use std::ops; ...@@ -10,10 +10,10 @@ use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)] #[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)] #[repr(u32)]
pub enum UserPermission { pub enum UserPermission {
Access = 1, Access = 0b00000000000000000000000000000001, // 1
ViewProfile = 2, ViewProfile = 0b00000000000000000000000000000010, // 2
SendMessage = 4, SendMessage = 0b00000000000000000000000000000100, // 4
Invite = 8, Invite = 0b00000000000000000000000000001000, // 8
} }
bitfield! { bitfield! {
...@@ -30,7 +30,7 @@ impl_op_ex_commutative!(+ |a: &u32, b: &UserPermission| -> u32 { *a | *b as u32 ...@@ -30,7 +30,7 @@ impl_op_ex_commutative!(+ |a: &u32, b: &UserPermission| -> u32 { *a | *b as u32
pub fn get_relationship(a: &User, b: &str) -> RelationshipStatus { pub fn get_relationship(a: &User, b: &str) -> RelationshipStatus {
if a.id == b { if a.id == b {
return RelationshipStatus::Friend; return RelationshipStatus::User;
} }
if let Some(relations) = &a.relations { if let Some(relations) = &a.relations {
...@@ -49,8 +49,13 @@ impl<'a> PermissionCalculator<'a> { ...@@ -49,8 +49,13 @@ impl<'a> PermissionCalculator<'a> {
} }
let mut permissions: u32 = 0; let mut permissions: u32 = 0;
match get_relationship(&self.perspective, &target) { match self
RelationshipStatus::Friend => return Ok(u32::MAX), .relationship
.clone()
.map(|v| v.to_owned())
.unwrap_or_else(|| get_relationship(&self.perspective, &target))
{
RelationshipStatus::Friend | RelationshipStatus::User => return Ok(u32::MAX),
RelationshipStatus::Blocked | RelationshipStatus::BlockedOther => { RelationshipStatus::Blocked | RelationshipStatus::BlockedOther => {
return Ok(UserPermission::Access as u32) return Ok(UserPermission::Access as u32)
} }
...@@ -59,34 +64,57 @@ impl<'a> PermissionCalculator<'a> { ...@@ -59,34 +64,57 @@ impl<'a> PermissionCalculator<'a> {
// ! INFO: if we add boolean switch for permission to // ! INFO: if we add boolean switch for permission to
// ! message people who have mutual, we need to get // ! message people who have mutual, we need to get
// ! rid of this return statement. // ! rid of this return statement.
return Ok(permissions); // return Ok(permissions);
} }
_ => {} _ => {}
} }
if self.has_mutual_conncetion let check_server_overlap = async || {
let server_ids = User::fetch_server_ids(&self.perspective.id).await?;
Ok(
get_collection("server_members")
.find_one(
doc! {
"_id.user": &target,
"_id.server": {
"$in": server_ids
}
},
None
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_members",
})?
.is_some()
)
};
if self.has_mutual_connection
|| check_server_overlap().await?
|| get_collection("channels") || get_collection("channels")
.find_one( .find_one(
doc! { doc! {
"$or": [ "channel_type": {
{ "type": "Group" }, "$in": ["Group", "DirectMessage"]
{ "type": "DirectMessage" }, },
], "recipients": {
"$and": [ "$all": [ &self.perspective.id, target ]
{ "recipients": &self.perspective.id }, }
{ "recipients": target }
]
}, },
None, None,
) )
.await .await
.map_err(|_| Error::DatabaseError { .map_err(|_| Error::DatabaseError {
operation: "find", operation: "find_one",
with: "channels", with: "channels",
})? })?
.is_some() .is_some()
{ {
return Ok(UserPermission::Access as u32); // ! FIXME: add privacy settings
return Ok(UserPermission::Access + UserPermission::ViewProfile);
} }
Ok(permissions) Ok(permissions)
......
...@@ -17,7 +17,9 @@ pub mod database; ...@@ -17,7 +17,9 @@ pub mod database;
pub mod notifications; pub mod notifications;
pub mod routes; pub mod routes;
pub mod util; pub mod util;
pub mod version;
use async_std::task;
use chrono::Duration; use chrono::Duration;
use futures::join; use futures::join;
use log::info; use log::info;
...@@ -38,7 +40,10 @@ async fn main() { ...@@ -38,7 +40,10 @@ async fn main() {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
env_logger::init_from_env(env_logger::Env::default().filter_or("RUST_LOG", "info")); env_logger::init_from_env(env_logger::Env::default().filter_or("RUST_LOG", "info"));
info!("Starting REVOLT server."); info!(
"Starting REVOLT server [version {}].",
crate::version::VERSION
);
util::variables::preflight_checks(); util::variables::preflight_checks();
database::connect().await; database::connect().await;
...@@ -50,10 +55,13 @@ async fn main() { ...@@ -50,10 +55,13 @@ async fn main() {
}) })
.expect("Error setting Ctrl-C handler"); .expect("Error setting Ctrl-C handler");
let web_task = task::spawn(launch_web());
let hive_task = task::spawn(notifications::hive::listen());
join!( join!(
launch_web(), web_task,
notifications::websocket::launch_server(), hive_task,
notifications::hive::listen(), notifications::websocket::launch_server()
); );
} }
...@@ -78,14 +86,19 @@ async fn launch_web() { ...@@ -78,14 +86,19 @@ async fn launch_web() {
templates: Templates { templates: Templates {
verify_email: Template { verify_email: Template {
title: "Verify your REVOLT account.", title: "Verify your Revolt account.",
text: "Verify your email here: {{url}}", text: "You're almost there!
html: include_str!("../assets/templates/verify.html"), If you did not perform this action you can safely ignore this email.
Please verify your account here: {{url}}",
html: None,
}, },
reset_password: Template { reset_password: Template {
title: "Reset your REVOLT password.", title: "Reset your Revolt password.",
text: "Reset your password here: {{url}}", text: "You requested a password reset, if you did not perform this action you can safely ignore this email.
html: include_str!("../assets/templates/reset.html"),
Reset your password here: {{url}}",
html: None,
}, },
welcome: None, welcome: None,
}, },
......
use hive_pubsub::PubSub; use hive_pubsub::PubSub;
use mongodb::bson::doc;
use rauth::auth::Session; use rauth::auth::Session;
use rocket_contrib::json::JsonValue; use rocket_contrib::json::JsonValue;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::Snafu;
use super::hive::{get_hive, subscribe_if_exists}; use super::hive::{get_hive, subscribe_if_exists};
use crate::database::*; use crate::{database::*, util::result::Result};
#[derive(Serialize, Deserialize, Debug, Snafu)] #[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "error")] #[serde(tag = "error")]
pub enum WebSocketError { pub enum WebSocketError {
#[snafu(display("This error has not been labelled."))]
LabelMe, LabelMe,
#[snafu(display("Internal server error."))]
InternalError { at: String }, InternalError { at: String },
#[snafu(display("Invalid session."))]
InvalidSession, InvalidSession,
#[snafu(display("User hasn't completed onboarding."))]
OnboardingNotFinished, OnboardingNotFinished,
#[snafu(display("Already authenticated with server."))]
AlreadyAuthenticated, AlreadyAuthenticated,
} }
...@@ -26,31 +21,74 @@ pub enum WebSocketError { ...@@ -26,31 +21,74 @@ pub enum WebSocketError {
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum ServerboundNotification { pub enum ServerboundNotification {
Authenticate(Session), Authenticate(Session),
BeginTyping { channel: String },
EndTyping { channel: String },
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveUserField {
ProfileContent,
ProfileBackground,
StatusText,
Avatar,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveChannelField {
Icon,
Description
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveServerField {
Icon,
Banner,
Description,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveRoleField {
Colour,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveMemberField {
Nickname,
Avatar,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum ClientboundNotification { pub enum ClientboundNotification {
Error(WebSocketError), Error(WebSocketError),
Authenticated, Authenticated,
Ready { Ready {
users: Vec<User>, users: Vec<User>,
servers: Vec<Server>,
channels: Vec<Channel>, channels: Vec<Channel>,
members: Vec<Member>
}, },
Message(Message), Message(Message),
MessageUpdate { MessageUpdate {
id: String, id: String,
channel: String,
data: JsonValue, data: JsonValue,
}, },
MessageDelete { MessageDelete {
id: String, id: String,
channel: String,
}, },
ChannelCreate(Channel), ChannelCreate(Channel),
ChannelUpdate { ChannelUpdate {
id: String, id: String,
data: JsonValue, data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveChannelField>,
},
ChannelDelete {
id: String,
}, },
ChannelGroupJoin { ChannelGroupJoin {
id: String, id: String,
...@@ -60,29 +98,96 @@ pub enum ClientboundNotification { ...@@ -60,29 +98,96 @@ pub enum ClientboundNotification {
id: String, id: String,
user: String, user: String,
}, },
ChannelDelete { ChannelStartTyping {
id: String, id: String,
user: String,
},
ChannelStopTyping {
id: String,
user: String,
},
ChannelAck {
id: String,
user: String,
message_id: String,
}, },
UserRelationship { ServerUpdate {
id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveServerField>,
},
ServerDelete {
id: String,
},
ServerMemberUpdate {
id: MemberCompositeKey,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveMemberField>,
},
ServerMemberJoin {
id: String,
user: String,
},
ServerMemberLeave {
id: String, id: String,
user: String, user: String,
},
ServerRoleUpdate {
id: String,
role_id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveRoleField>
},
ServerRoleDelete {
id: String,
role_id: String
},
UserUpdate {
id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveUserField>,
},
UserRelationship {
id: String,
user: User,
status: RelationshipStatus, status: RelationshipStatus,
}, },
UserPresence { UserSettingsUpdate {
id: String, id: String,
online: bool, update: JsonValue,
}, },
} }
impl ClientboundNotification { impl ClientboundNotification {
pub async fn publish(self, topic: String) -> Result<(), String> { pub fn publish(self, topic: String) {
prehandle_hook(&self); // ! TODO: this should be moved to pubsub async_std::task::spawn(async move {
hive_pubsub::backend::mongo::publish(get_hive(), &topic, self).await prehandle_hook(&self).await.ok(); // ! FIXME: this should be moved to pubsub
hive_pubsub::backend::mongo::publish(get_hive(), &topic, self)
.await
.ok();
});
}
pub fn publish_as_user(self, user: String) {
self.clone().publish(user.clone());
async_std::task::spawn(async move {
if let Ok(server_ids) = User::fetch_server_ids(&user).await {
for server in server_ids {
self.clone().publish(server.clone());
}
}
});
} }
} }
pub fn prehandle_hook(notification: &ClientboundNotification) { pub async fn prehandle_hook(notification: &ClientboundNotification) -> Result<()> {
match &notification { match &notification {
ClientboundNotification::ChannelGroupJoin { id, user } => { ClientboundNotification::ChannelGroupJoin { id, user } => {
subscribe_if_exists(user.clone(), id.clone()).ok(); subscribe_if_exists(user.clone(), id.clone()).ok();
...@@ -98,36 +203,61 @@ pub fn prehandle_hook(notification: &ClientboundNotification) { ...@@ -98,36 +203,61 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
subscribe_if_exists(recipient.clone(), channel_id.to_string()).ok(); subscribe_if_exists(recipient.clone(), channel_id.to_string()).ok();
} }
} }
Channel::TextChannel { server, .. }
| Channel::VoiceChannel { server, .. } => {
// ! FIXME: write a better algorithm?
let members = Server::fetch_member_ids(server).await?;
for member in members {
subscribe_if_exists(member.clone(), channel_id.to_string()).ok();
}
}
}
}
ClientboundNotification::ServerMemberJoin { id, user } => {
let server = Ref::from_unchecked(id.clone()).fetch_server().await?;
subscribe_if_exists(user.clone(), id.clone()).ok();
for channel in server.channels {
subscribe_if_exists(user.clone(), channel).ok();
} }
} }
ClientboundNotification::UserRelationship { id, user, status } => { ClientboundNotification::UserRelationship { id, user, status } => {
if status != &RelationshipStatus::None { if status != &RelationshipStatus::None {
subscribe_if_exists(id.clone(), user.clone()).ok(); subscribe_if_exists(id.clone(), user.id.clone()).ok();
} }
} }
_ => {} _ => {}
} }
Ok(())
} }
pub fn posthandle_hook(notification: &ClientboundNotification) { pub async fn posthandle_hook(notification: &ClientboundNotification) {
match &notification { match &notification {
ClientboundNotification::ChannelDelete { id } => { ClientboundNotification::ChannelDelete { id } => {
get_hive().hive.drop_topic(&id).ok(); get_hive().hive.drop_topic(&id).ok();
} }
ClientboundNotification::ChannelGroupLeave { id, user } => {
get_hive().hive.unsubscribe(user, id).ok();
}
ClientboundNotification::ServerDelete { id } => {
get_hive().hive.drop_topic(&id).ok();
}
ClientboundNotification::ServerMemberLeave { id, user } => {
get_hive().hive.unsubscribe(user, id).ok();
if let Ok(server) = Ref::from_unchecked(id.clone()).fetch_server().await {
for channel in server.channels {
get_hive().hive.unsubscribe(user, &channel).ok();
}
}
}
ClientboundNotification::UserRelationship { id, user, status } => { ClientboundNotification::UserRelationship { id, user, status } => {
if status == &RelationshipStatus::None { if status == &RelationshipStatus::None {
get_hive() get_hive().hive.unsubscribe(id, &user.id).ok();
.hive
.unsubscribe(&id.to_string(), &user.to_string())
.ok();
} }
} }
ClientboundNotification::ChannelGroupLeave { id, user } => {
get_hive()
.hive
.unsubscribe(&user.to_string(), &id.to_string())
.ok();
}
_ => {} _ => {}
} }
} }
...@@ -13,8 +13,11 @@ static HIVE: OnceCell<Hive> = OnceCell::new(); ...@@ -13,8 +13,11 @@ static HIVE: OnceCell<Hive> = OnceCell::new();
pub async fn init_hive() { pub async fn init_hive() {
let hive = MongodbPubSub::new( let hive = MongodbPubSub::new(
|ids, notification| { |ids, notification: ClientboundNotification| {
super::events::posthandle_hook(&notification); let notif = notification.clone();
async_std::task::spawn(async move {
super::events::posthandle_hook(&notif).await;
});
if let Ok(data) = to_string(&notification) { if let Ok(data) = to_string(&notification) {
debug!("Pushing out notification. {}", data); debug!("Pushing out notification. {}", data);
......
...@@ -6,13 +6,9 @@ use crate::{ ...@@ -6,13 +6,9 @@ use crate::{
util::result::{Error, Result}, util::result::{Error, Result},
}; };
use futures::StreamExt; use futures::StreamExt;
use mongodb::{ use mongodb::bson::{doc, from_document};
bson::{doc, from_document},
options::FindOptions,
};
pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> { pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
let mut users = vec![];
let mut user_ids: HashSet<String> = HashSet::new(); let mut user_ids: HashSet<String> = HashSet::new();
if let Some(relationships) = &user.relations { if let Some(relationships) = &user.relations {
...@@ -23,10 +19,49 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> { ...@@ -23,10 +19,49 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
); );
} }
let members = User::fetch_memberships(&user.id).await?;
let server_ids: Vec<String> = members.iter()
.map(|x| x.id.server.clone())
.collect();
let mut cursor = get_collection("servers")
.find(
doc! {
"_id": {
"$in": server_ids
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "servers",
})?;
let mut servers = vec![];
let mut channel_ids = vec![];
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
let server: Server = from_document(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server",
})?;
channel_ids.extend(server.channels.iter().cloned());
servers.push(server);
}
}
let mut cursor = get_collection("channels") let mut cursor = get_collection("channels")
.find( .find(
doc! { doc! {
"$or": [ "$or": [
{
"_id": {
"$in": channel_ids
}
},
{ {
"channel_type": "SavedMessages", "channel_type": "SavedMessages",
"user": &user.id "user": &user.id
...@@ -68,45 +103,21 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> { ...@@ -68,45 +103,21 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
} }
user_ids.remove(&user.id); user_ids.remove(&user.id);
if user_ids.len() > 0 { let mut users = if user_ids.len() > 0 {
let mut cursor = get_collection("users") user.fetch_multiple_users(user_ids.into_iter().collect::<Vec<String>>())
.find( .await?
doc! { } else {
"_id": { vec![]
"$in": user_ids.into_iter().collect::<Vec<String>>() };
}
},
FindOptions::builder()
.projection(doc! { "_id": 1, "username": 1 })
.build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "users",
})?;
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
let other: User = from_document(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "user",
})?;
let permissions = PermissionCalculator::new(&user)
.with_mutual_connection()
.with_user(&other)
.for_user_given()
.await?;
users.push(other.from(&user).with(permissions));
}
}
}
user.relationship = Some(RelationshipStatus::User); user.relationship = Some(RelationshipStatus::User);
user.online = Some(true); user.online = Some(true);
users.push(user); users.push(user);
Ok(ClientboundNotification::Ready { users, channels }) Ok(ClientboundNotification::Ready {
users,
servers,
channels,
members
})
} }
...@@ -4,6 +4,7 @@ use super::hive::get_hive; ...@@ -4,6 +4,7 @@ use super::hive::get_hive;
use futures::StreamExt; use futures::StreamExt;
use hive_pubsub::PubSub; use hive_pubsub::PubSub;
use mongodb::bson::doc; use mongodb::bson::doc;
use mongodb::bson::Document;
use mongodb::options::FindOptions; use mongodb::options::FindOptions;
pub async fn generate_subscriptions(user: &User) -> Result<(), String> { pub async fn generate_subscriptions(user: &User) -> Result<(), String> {
...@@ -16,6 +17,43 @@ pub async fn generate_subscriptions(user: &User) -> Result<(), String> { ...@@ -16,6 +17,43 @@ pub async fn generate_subscriptions(user: &User) -> Result<(), String> {
} }
} }
let server_ids = User::fetch_server_ids(&user.id)
.await
.map_err(|_| "Failed to fetch memberships.".to_string())?;
let channel_ids = get_collection("servers")
.find(
doc! {
"_id": {
"$in": &server_ids
}
},
None,
)
.await
.map_err(|_| "Failed to fetch servers.".to_string())?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
x.get_array("channels").ok().map(|v| {
v.into_iter()
.filter_map(|x| x.as_str().map(|x| x.to_string()))
.collect::<Vec<String>>()
})
})
.flatten()
.collect::<Vec<String>>();
for id in server_ids {
hive.subscribe(user.id.clone(), id)?;
}
for id in channel_ids {
hive.subscribe(user.id.clone(), id)?;
}
let mut cursor = get_collection("channels") let mut cursor = get_collection("channels")
.find( .find(
doc! { doc! {
......
...@@ -71,8 +71,6 @@ async fn accept(stream: TcpStream) { ...@@ -71,8 +71,6 @@ async fn accept(stream: TcpStream) {
let fwd = rx.map(Ok).forward(write); let fwd = rx.map(Ok).forward(write);
let incoming = read.try_for_each(async move |msg| { let incoming = read.try_for_each(async move |msg| {
let mutex = mutex_generator(); let mutex = mutex_generator();
//dbg!(&mutex.lock().unwrap());
if let Message::Text(text) = msg { if let Message::Text(text) = msg {
if let Ok(notification) = serde_json::from_str::<ServerboundNotification>(&text) { if let Ok(notification) = serde_json::from_str::<ServerboundNotification>(&text) {
match notification { match notification {
...@@ -131,13 +129,14 @@ async fn accept(stream: TcpStream) { ...@@ -131,13 +129,14 @@ async fn accept(stream: TcpStream) {
send(payload); send(payload);
if !was_online { if !was_online {
ClientboundNotification::UserPresence { ClientboundNotification::UserUpdate {
id: id.clone(), id: id.clone(),
online: true, data: json!({
"online": true
}),
clear: None
} }
.publish(id) .publish_as_user(id);
.await
.ok();
} }
} }
Err(_) => { Err(_) => {
...@@ -161,6 +160,50 @@ async fn accept(stream: TcpStream) { ...@@ -161,6 +160,50 @@ async fn accept(stream: TcpStream) {
)); ));
} }
} }
// ! TEMP: verify user part of channel
// ! Could just run permission check here.
ServerboundNotification::BeginTyping { channel } => {
if mutex.lock().unwrap().is_some() {
let user = {
let mutex = mutex.lock().unwrap();
let session = mutex.as_ref().unwrap();
session.user_id.clone()
};
ClientboundNotification::ChannelStartTyping {
id: channel.clone(),
user,
}
.publish(channel);
} else {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
));
return Ok(());
}
}
ServerboundNotification::EndTyping { channel } => {
if mutex.lock().unwrap().is_some() {
let user = {
let mutex = mutex.lock().unwrap();
let session = mutex.as_ref().unwrap();
session.user_id.clone()
};
ClientboundNotification::ChannelStopTyping {
id: channel.clone(),
user,
}
.publish(channel);
} else {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
));
return Ok(());
}
}
} }
} }
} }
...@@ -174,13 +217,28 @@ async fn accept(stream: TcpStream) { ...@@ -174,13 +217,28 @@ async fn accept(stream: TcpStream) {
info!("User {} disconnected.", &addr); info!("User {} disconnected.", &addr);
CONNECTIONS.lock().unwrap().remove(&addr); CONNECTIONS.lock().unwrap().remove(&addr);
let session = session.lock().unwrap(); let mut offline = None;
if let Some(session) = session.as_ref() { {
let mut users = USERS.write().unwrap(); let session = session.lock().unwrap();
users.remove(&session.user_id, &addr); if let Some(session) = session.as_ref() {
if users.get_left(&session.user_id).is_none() { let mut users = USERS.write().unwrap();
get_hive().drop_client(&session.user_id).unwrap(); users.remove(&session.user_id, &addr);
if users.get_left(&session.user_id).is_none() {
get_hive().drop_client(&session.user_id).unwrap();
offline = Some(session.user_id.clone());
}
}
}
if let Some(id) = offline {
ClientboundNotification::UserUpdate {
id: id.clone(),
data: json!({
"online": false
}),
clear: None
} }
.publish_as_user(id);
} }
} }
...@@ -190,10 +248,15 @@ pub fn publish(ids: Vec<String>, notification: ClientboundNotification) { ...@@ -190,10 +248,15 @@ pub fn publish(ids: Vec<String>, notification: ClientboundNotification) {
let users = USERS.read().unwrap(); let users = USERS.read().unwrap();
for id in ids { for id in ids {
// Block certain notifications from reaching users that aren't meant to see them. // Block certain notifications from reaching users that aren't meant to see them.
if let ClientboundNotification::UserRelationship { id: user_id, .. } = &notification { match &notification {
if &id != user_id { ClientboundNotification::UserRelationship { id: user_id, .. }
continue; | ClientboundNotification::UserSettingsUpdate { id: user_id, .. }
| ClientboundNotification::ChannelAck { user: user_id, .. } => {
if &id != user_id {
continue;
}
} }
_ => {}
} }
if let Some(mut arr) = users.get_left(&id) { if let Some(mut arr) = users.get_left(&id) {
......
use crate::database::*;
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use mongodb::bson::doc;
use mongodb::options::UpdateOptions;
#[put("/<target>/ack/<message>")]
pub async fn req(user: User, target: Ref, message: Ref) -> Result<()> {
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
let id = target.id();
get_collection("channel_unreads")
.update_one(
doc! {
"_id.channel": id,
"_id.user": &user.id
},
doc! {
"$unset": {
"mentions": 1
},
"$set": {
"last_id": &message.id
}
},
UpdateOptions::builder().upsert(true).build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel_unreads",
})?;
ClientboundNotification::ChannelAck {
id: id.to_string(),
user: user.id.clone(),
message_id: message.id,
}
.publish(user.id);
Ok(())
}
...@@ -11,6 +11,7 @@ pub async fn req(user: User, target: Ref) -> Result<()> { ...@@ -11,6 +11,7 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
.with_channel(&target) .with_channel(&target)
.for_channel() .for_channel()
.await?; .await?;
if !perm.get_view() { if !perm.get_view() {
Err(Error::MissingPermission)? Err(Error::MissingPermission)?
} }
...@@ -95,21 +96,22 @@ pub async fn req(user: User, target: Ref) -> Result<()> { ...@@ -95,21 +96,22 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
id: id.clone(), id: id.clone(),
user: user.id.clone(), user: user.id.clone(),
} }
.publish(id.clone()) .publish(id.clone());
.await
.ok();
Message::create( Content::SystemMessage(SystemMessage::UserLeft { id: user.id })
"00000000000000000000000000".to_string(), .send_as_system(&target)
id.clone(), .await
// ! FIXME: make a schema for this .ok();
format!("{{\"type\":\"user_left\",\"id\":\"{}\"}}", user.id),
)
.publish(&target)
.await
.ok();
Ok(()) Ok(())
} }
Channel::TextChannel { .. } |
Channel::VoiceChannel { .. } => {
if perm.get_manage_channel() {
target.delete().await
} else {
Err(Error::MissingPermission)
}
}
} }
} }
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use crate::{database::*, notifications::events::RemoveChannelField};
use mongodb::bson::{doc, to_document};
use rocket_contrib::json::Json;
use serde::{Deserialize, Serialize};
use validator::Validate;
#[derive(Validate, Serialize, Deserialize)]
pub struct Data {
#[validate(length(min = 1, max = 32))]
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
#[validate(length(min = 0, max = 1024))]
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
#[validate(length(min = 1, max = 128))]
icon: Option<String>,
remove: Option<RemoveChannelField>,
}
#[patch("/<target>", data = "<data>")]
pub async fn req(user: User, target: Ref, data: Json<Data>) -> Result<()> {
let data = data.into_inner();
data.validate()
.map_err(|error| Error::FailedValidation { error })?;
if data.name.is_none()
&& data.description.is_none()
&& data.icon.is_none()
&& data.remove.is_none()
{
return Ok(());
}
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_manage_channel() {
Err(Error::MissingPermission)?
}
match &target {
Channel::Group { id, icon, .. }
| Channel::TextChannel { id, icon, .. }
| Channel::VoiceChannel { id, icon, .. } => {
let mut set = doc! {};
let mut unset = doc! {};
let mut remove_icon = false;
if let Some(remove) = &data.remove {
match remove {
RemoveChannelField::Icon => {
unset.insert("icon", 1);
remove_icon = true;
}
RemoveChannelField::Description => {
unset.insert("description", 1);
}
}
}
if let Some(name) = &data.name {
set.insert("name", name);
}
if let Some(description) = &data.description {
set.insert("description", description);
}
if let Some(attachment_id) = &data.icon {
let attachment =
File::find_and_use(&attachment_id, "icons", "object", target.id()).await?;
set.insert(
"icon",
to_document(&attachment).map_err(|_| Error::DatabaseError {
operation: "to_document",
with: "attachment",
})?,
);
remove_icon = true;
}
let mut operations = doc! {};
if set.len() > 0 {
operations.insert("$set", &set);
}
if unset.len() > 0 {
operations.insert("$unset", unset);
}
if operations.len() > 0 {
get_collection("channels")
.update_one(doc! { "_id": &id }, operations, None)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
}
ClientboundNotification::ChannelUpdate {
id: id.clone(),
data: json!(set),
clear: data.remove,
}
.publish(id.clone());
if let Channel::Group { .. } = &target {
if let Some(name) = data.name {
Content::SystemMessage(SystemMessage::ChannelRenamed {
name,
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.description {
Content::SystemMessage(SystemMessage::ChannelDescriptionChanged {
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.icon {
Content::SystemMessage(SystemMessage::ChannelIconChanged { by: user.id })
.send_as_system(&target)
.await
.ok();
}
}
if remove_icon {
if let Some(old_icon) = icon {
old_icon.delete().await?;
}
}
Ok(())
}
_ => Err(Error::InvalidOperation),
}
}