Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 1134 additions and 169 deletions
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
pub type UserSettings = HashMap<String, (i64, String)>;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChannelCompositeKey {
pub channel: String,
pub user: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChannelUnread {
#[serde(rename = "_id")]
pub id: ChannelCompositeKey,
pub last_id: Option<String>,
pub mentions: Option<Vec<String>>,
}
use mongodb::bson::doc;
use futures::StreamExt;
use mongodb::bson::Document;
use mongodb::options::{Collation, FindOneOptions};
use mongodb::{
bson::{doc, from_document},
options::FindOptions,
};
use num_enum::TryFromPrimitive;
use serde::{Deserialize, Serialize};
use std::ops;
use ulid::Ulid;
use validator::Validate;
use crate::database::permissions::user::UserPermissions;
use crate::database::*;
use crate::notifications::websocket::is_online;
use crate::util::result::{Error, Result};
use validator::Validate;
use crate::util::variables::EARLY_ADOPTER_BADGE;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum RelationshipStatus {
......@@ -19,21 +28,14 @@ pub enum RelationshipStatus {
BlockedOther,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Relationship {
#[serde(rename = "_id")]
pub id: String,
pub status: RelationshipStatus,
}
/*
pub enum Badge {
Developer = 1,
Translator = 2,
}
*/
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Presence {
Online,
Idle,
......@@ -41,7 +43,7 @@ pub enum Presence {
Invisible,
}
#[derive(Validate, Serialize, Deserialize, Debug)]
#[derive(Validate, Serialize, Deserialize, Debug, Clone)]
pub struct UserStatus {
#[validate(length(min = 1, max = 128))]
#[serde(skip_serializing_if = "Option::is_none")]
......@@ -50,7 +52,7 @@ pub struct UserStatus {
pub presence: Option<Presence>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserProfile {
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
......@@ -58,8 +60,21 @@ pub struct UserProfile {
pub background: Option<File>,
}
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(i32)]
pub enum Badges {
Developer = 1,
Translator = 2,
Supporter = 4,
ResponsibleDisclosure = 8,
RevoltTeam = 16,
EarlyAdopter = 256,
}
impl_op_ex_commutative!(+ |a: &i32, b: &Badges| -> i32 { *a | *b as i32 });
// When changing this struct, update notifications/payload.rs#80
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct User {
#[serde(rename = "_id")]
pub id: String,
......@@ -106,6 +121,15 @@ impl User {
/// Mutate the user object to appear as seen by user.
pub fn with(mut self, permissions: UserPermissions<[u32; 1]>) -> User {
let mut badges = self.badges.unwrap_or_else(|| 0);
if let Ok(id) = Ulid::from_string(&self.id) {
if id.datetime().timestamp_millis() < *EARLY_ADOPTER_BADGE {
badges = badges + Badges::EarlyAdopter;
}
}
self.badges = Some(badges);
if permissions.get_view_profile() {
self.online = Some(is_online(&self.id));
} else {
......@@ -135,7 +159,7 @@ impl User {
/// Utility function for checking claimed usernames.
pub async fn is_username_taken(username: &str) -> Result<bool> {
if username.to_lowercase() == "revolt" && username.to_lowercase() == "admin" {
if username.to_lowercase() == "revolt" || username.to_lowercase() == "admin" || username.to_lowercase() == "system" {
return Ok(true);
}
......@@ -160,4 +184,117 @@ impl User {
Ok(false)
}
}
/// Utility function for fetching multiple users from the perspective of one.
/// Assumes user has a mutual connection with others.
pub async fn fetch_multiple_users(&self, user_ids: Vec<String>) -> Result<Vec<User>> {
let mut users = vec![];
let mut cursor = get_collection("users")
.find(
doc! {
"_id": {
"$in": user_ids
}
},
FindOptions::builder()
.projection(
doc! { "_id": 1, "username": 1, "avatar": 1, "badges": 1, "status": 1 },
)
.build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "users",
})?;
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
let other: User = from_document(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "user",
})?;
let permissions = PermissionCalculator::new(&self)
.with_mutual_connection()
.with_user(&other)
.for_user_given()
.await?;
users.push(other.from(&self).with(permissions));
}
}
Ok(users)
}
/// Utility function to get all of a user's memberships.
pub async fn fetch_memberships(id: &str) -> Result<Vec<Member>> {
Ok(get_collection("server_members")
.find(
doc! {
"_id.user": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "server_members",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
from_document(x).ok()
})
.collect::<Vec<Member>>())
}
/// Utility function to get all the server IDs the user is in.
pub async fn fetch_server_ids(id: &str) -> Result<Vec<String>> {
Ok(get_collection("server_members")
.find(
doc! {
"_id.user": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "server_members",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
x.get_document("_id")
.ok()
.map(|i| i.get_str("server").ok().map(|x| x.to_string()))
})
.flatten()
.collect::<Vec<String>>())
}
/// Utility function to fetch unread objects for user.
pub async fn fetch_unreads(id: &str) -> Result<Vec<Document>> {
Ok(get_collection("channel_unreads")
.find(
doc! {
"_id.user": id
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "user_settings",
})?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await)
}
}
......@@ -9,16 +9,23 @@ use validator::Validate;
#[derive(Validate, Serialize, Deserialize)]
pub struct Ref {
#[validate(length(min = 26, max = 26))]
#[validate(length(min = 1, max = 26))]
pub id: String,
}
impl Ref {
pub fn from_unchecked(id: String) -> Ref {
Ref { id }
}
pub fn from(id: String) -> Result<Ref> {
Ok(Ref { id })
let r = Ref { id };
r.validate()
.map_err(|error| Error::FailedValidation { error })?;
Ok(r)
}
pub async fn fetch<T: DeserializeOwned>(&self, collection: &'static str) -> Result<T> {
async fn fetch<T: DeserializeOwned>(&self, collection: &'static str) -> Result<T> {
let doc = get_collection(&collection)
.find_one(
doc! {
......@@ -31,7 +38,7 @@ impl Ref {
operation: "find_one",
with: &collection,
})?
.ok_or_else(|| Error::UnknownUser)?;
.ok_or_else(|| Error::NotFound)?;
Ok(from_document::<T>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
......@@ -47,6 +54,60 @@ impl Ref {
self.fetch("channels").await
}
pub async fn fetch_server(&self) -> Result<Server> {
self.fetch("servers").await
}
pub async fn fetch_invite(&self) -> Result<Invite> {
self.fetch("channel_invites").await
}
pub async fn fetch_member(&self, server: &str) -> Result<Member> {
let doc = get_collection("server_members")
.find_one(
doc! {
"_id.user": &self.id,
"_id.server": server
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_member",
})?
.ok_or_else(|| Error::NotFound)?;
Ok(
from_document::<Member>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server_member",
})?,
)
}
pub async fn fetch_ban(&self, server: &str) -> Result<Ban> {
let doc = get_collection("server_bans")
.find_one(
doc! {
"_id.user": &self.id,
"_id.server": server
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_ban",
})?
.ok_or_else(|| Error::NotFound)?;
Ok(from_document::<Ban>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server_ban",
})?)
}
pub async fn fetch_message(&self, channel: &Channel) -> Result<Message> {
let message: Message = self.fetch("messages").await?;
if &message.channel != channel.id() {
......
......@@ -25,6 +25,26 @@ pub async fn create_database() {
.await
.expect("Failed to create messages collection.");
db.create_collection("servers", None)
.await
.expect("Failed to create servers collection.");
db.create_collection("server_members", None)
.await
.expect("Failed to create server_members collection.");
db.create_collection("server_bans", None)
.await
.expect("Failed to create server_bans collection.");
db.create_collection("channel_invites", None)
.await
.expect("Failed to create channel_invites collection.");
db.create_collection("channel_unreads", None)
.await
.expect("Failed to create channel_unreads collection.");
db.create_collection("migrations", None)
.await
.expect("Failed to create migrations collection.");
......@@ -33,6 +53,10 @@ pub async fn create_database() {
.await
.expect("Failed to create attachments collection.");
db.create_collection("user_settings", None)
.await
.expect("Failed to create user_settings collection.");
db.create_collection(
"pubsub",
CreateCollectionOptions::builder()
......@@ -98,6 +122,23 @@ pub async fn create_database() {
.await
.expect("Failed to create username index.");
db.run_command(
doc! {
"createIndexes": "messages",
"indexes": [
{
"key": {
"content": "text"
},
"name": "content"
}
]
},
None,
)
.await
.expect("Failed to create message index.");
db.collection("migrations")
.insert_one(
doc! {
......
use crate::database::get_collection;
use crate::database::{permissions, get_collection, get_db, PermissionTuple};
use futures::StreamExt;
use log::info;
use mongodb::bson::{doc, from_document};
use mongodb::{bson::{doc, from_document, to_document}, options::FindOptions};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
......@@ -10,7 +11,7 @@ struct MigrationInfo {
revision: i32,
}
pub const LATEST_REVISION: i32 = 2;
pub const LATEST_REVISION: i32 = 7;
pub async fn migrate_database() {
let migrations = get_collection("migrations");
......@@ -78,6 +79,130 @@ pub async fn run_migrations(revision: i32) -> i32 {
.expect("Failed to update attachments.");
}
if revision <= 2 {
info!("Running migration [revision 2 / 2021-05-08]: Add servers collection.");
get_db()
.create_collection("servers", None)
.await
.expect("Failed to create servers collection.");
}
if revision <= 3 {
info!("Running migration [revision 3 / 2021-05-25]: Support multiple file uploads, add channel_unreads and user_settings.");
let messages = get_collection("messages");
let mut cursor = messages
.find(
doc! {
"attachment": {
"$exists": 1
}
},
FindOptions::builder()
.projection(doc! {
"_id": 1,
"attachments": [ "$attachment" ]
})
.build(),
)
.await
.expect("Failed to fetch messages.");
while let Some(result) = cursor.next().await {
let doc = result.unwrap();
let id = doc.get_str("_id").unwrap();
let attachments = doc.get_array("attachments").unwrap();
messages
.update_one(
doc! { "_id": id },
doc! { "$unset": { "attachment": 1 }, "$set": { "attachments": attachments } },
None,
)
.await
.unwrap();
}
get_db()
.create_collection("channel_unreads", None)
.await
.expect("Failed to create channel_unreads collection.");
get_db()
.create_collection("user_settings", None)
.await
.expect("Failed to create user_settings collection.");
}
if revision <= 4 {
info!("Running migration [revision 4 / 2021-06-01]: Add more server collections.");
get_db()
.create_collection("server_members", None)
.await
.expect("Failed to create server_members collection.");
get_db()
.create_collection("server_bans", None)
.await
.expect("Failed to create server_bans collection.");
get_db()
.create_collection("channel_invites", None)
.await
.expect("Failed to create channel_invites collection.");
}
if revision <= 5 {
info!("Running migration [revision 5 / 2021-06-26]: Add permissions.");
#[derive(Serialize)]
struct Server {
pub default_permissions: PermissionTuple,
}
let server = Server {
default_permissions: (
*permissions::server::DEFAULT_PERMISSION as i32,
*permissions::channel::DEFAULT_PERMISSION_SERVER as i32
)
};
get_collection("servers")
.update_many(
doc! { },
doc! {
"$set": to_document(&server).unwrap()
},
None
)
.await
.expect("Failed to migrate servers.");
}
if revision <= 6 {
info!("Running migration [revision 6 / 2021-07-09]: Add message text index.");
get_db()
.run_command(
doc! {
"createIndexes": "messages",
"indexes": [
{
"key": {
"content": "text"
},
"name": "content"
}
]
},
None,
)
.await
.expect("Failed to create message index.");
}
// Reminder to update LATEST_REVISION when adding new migrations.
LATEST_REVISION
}
use crate::database::*;
use crate::util::result::Result;
use crate::util::result::{Error, Result};
use super::PermissionCalculator;
......@@ -9,13 +9,38 @@ use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)]
pub enum ChannelPermission {
View = 1,
SendMessage = 2,
ManageMessages = 4,
ManageChannel = 8,
VoiceCall = 16,
View = 0b00000000000000000000000000000001, // 1
SendMessage = 0b00000000000000000000000000000010, // 2
ManageMessages = 0b00000000000000000000000000000100, // 4
ManageChannel = 0b00000000000000000000000000001000, // 8
VoiceCall = 0b00000000000000000000000000010000, // 16
InviteOthers = 0b00000000000000000000000000100000, // 32
EmbedLinks = 0b00000000000000000000000001000000, // 64
UploadFiles = 0b00000000000000000000000010000000, // 128
}
lazy_static! {
pub static ref DEFAULT_PERMISSION_DM: u32 =
ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::ManageChannel
+ ChannelPermission::VoiceCall
+ ChannelPermission::InviteOthers
+ ChannelPermission::EmbedLinks
+ ChannelPermission::UploadFiles;
pub static ref DEFAULT_PERMISSION_SERVER: u32 =
ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::VoiceCall
+ ChannelPermission::InviteOthers
+ ChannelPermission::EmbedLinks
+ ChannelPermission::UploadFiles;
}
impl_op_ex!(+ |a: &ChannelPermission, b: &ChannelPermission| -> u32 { *a as u32 | *b as u32 });
impl_op_ex_commutative!(+ |a: &u32, b: &ChannelPermission| -> u32 { *a | *b as u32 });
bitfield! {
pub struct ChannelPermissions(MSB0 [u32]);
u32;
......@@ -24,11 +49,11 @@ bitfield! {
pub get_manage_messages, _: 29;
pub get_manage_channel, _: 28;
pub get_voice_call, _: 27;
pub get_invite_others, _: 26;
pub get_embed_links, _: 25;
pub get_upload_files, _: 24;
}
impl_op_ex!(+ |a: &ChannelPermission, b: &ChannelPermission| -> u32 { *a as u32 | *b as u32 });
impl_op_ex_commutative!(+ |a: &u32, b: &ChannelPermission| -> u32 { *a | *b as u32 });
impl<'a> PermissionCalculator<'a> {
pub async fn calculate_channel(self) -> Result<u32> {
let channel = if let Some(channel) = self.channel {
......@@ -56,9 +81,7 @@ impl<'a> PermissionCalculator<'a> {
let perms = self.for_user(recipient).await?;
if perms.get_send_message() {
return Ok(ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::VoiceCall);
return Ok(*DEFAULT_PERMISSION_DM);
}
return Ok(ChannelPermission::View as u32);
......@@ -67,20 +90,63 @@ impl<'a> PermissionCalculator<'a> {
Ok(0)
}
Channel::Group { recipients, .. } => {
Channel::Group { recipients, permissions, owner, .. } => {
if &self.perspective.id == owner {
return Ok(*DEFAULT_PERMISSION_DM)
}
if recipients
.iter()
.find(|x| *x == &self.perspective.id)
.is_some()
{
Ok(ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::ManageChannel
+ ChannelPermission::VoiceCall)
if let Some(permissions) = permissions {
Ok(permissions.clone() as u32)
} else {
Ok(*DEFAULT_PERMISSION_DM)
}
} else {
Ok(0)
}
}
Channel::TextChannel { server, default_permissions, role_permissions, .. }
| Channel::VoiceChannel { server, default_permissions, role_permissions, .. } => {
let server = Ref::from_unchecked(server.clone()).fetch_server().await?;
if self.perspective.id == server.owner {
Ok(u32::MAX)
} else {
match Ref::from_unchecked(self.perspective.id.clone()).fetch_member(&server.id).await {
Ok(member) => {
let mut perm = if let Some(permission) = default_permissions {
*permission as u32
} else {
server.default_permissions.1 as u32
};
if let Some(roles) = member.roles {
for role in roles {
if let Some(permission) = role_permissions.get(&role) {
perm |= *permission as u32;
}
if let Some(server_role) = server.roles.get(&role) {
perm |= server_role.permissions.1 as u32;
}
}
}
Ok(perm)
}
Err(error) => {
match &error {
Error::NotFound => Ok(0),
_ => Err(error)
}
}
}
}
}
}
}
......
pub use crate::database::*;
pub mod channel;
pub mod server;
pub mod user;
pub use user::get_relationship;
......@@ -11,6 +12,8 @@ pub struct PermissionCalculator<'a> {
user: Option<&'a User>,
relationship: Option<&'a RelationshipStatus>,
channel: Option<&'a Channel>,
server: Option<&'a Server>,
// member: Option<&'a Member>,
has_mutual_connection: bool,
}
......@@ -23,6 +26,8 @@ impl<'a> PermissionCalculator<'a> {
user: None,
relationship: None,
channel: None,
server: None,
// member: None,
has_mutual_connection: false,
}
......@@ -49,6 +54,20 @@ impl<'a> PermissionCalculator<'a> {
}
}
pub fn with_server(self, server: &'a Server) -> PermissionCalculator {
PermissionCalculator {
server: Some(&server),
..self
}
}
/* pub fn with_member(self, member: &'a Member) -> PermissionCalculator {
PermissionCalculator {
member: Some(&member),
..self
}
} */
pub fn with_mutual_connection(self) -> PermissionCalculator<'a> {
PermissionCalculator {
has_mutual_connection: true,
......
use crate::util::result::{Error, Result};
use super::PermissionCalculator;
use super::Ref;
use num_enum::TryFromPrimitive;
use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)]
pub enum ServerPermission {
View = 0b00000000000000000000000000000001, // 1
ManageRoles = 0b00000000000000000000000000000010, // 2
ManageChannels = 0b00000000000000000000000000000100, // 4
ManageServer = 0b00000000000000000000000000001000, // 8
KickMembers = 0b00000000000000000000000000010000, // 16
BanMembers = 0b00000000000000000000000000100000, // 32
// 6 bits of space
ChangeNickname = 0b00000000000000000001000000000000, // 4096
ManageNicknames = 0b00000000000000000010000000000000, // 8192
ChangeAvatar = 0b00000000000000000100000000000000, // 16382
RemoveAvatars = 0b00000000000000001000000000000000, // 32768
// 16 bits of space
}
lazy_static! {
pub static ref DEFAULT_PERMISSION: u32 =
ServerPermission::View
+ ServerPermission::ChangeNickname
+ ServerPermission::ChangeAvatar;
}
impl_op_ex!(+ |a: &ServerPermission, b: &ServerPermission| -> u32 { *a as u32 | *b as u32 });
impl_op_ex_commutative!(+ |a: &u32, b: &ServerPermission| -> u32 { *a | *b as u32 });
bitfield! {
pub struct ServerPermissions(MSB0 [u32]);
u32;
pub get_view, _: 31;
pub get_manage_roles, _: 30;
pub get_manage_channels, _: 29;
pub get_manage_server, _: 28;
pub get_kick_members, _: 27;
pub get_ban_members, _: 26;
pub get_change_nickname, _: 19;
pub get_manage_nicknames, _: 18;
pub get_change_avatar, _: 17;
pub get_remove_avatars, _: 16;
}
impl<'a> PermissionCalculator<'a> {
pub async fn calculate_server(self) -> Result<u32> {
let server = if let Some(server) = self.server {
server
} else {
unreachable!()
};
if self.perspective.id == server.owner {
Ok(u32::MAX)
} else {
match Ref::from_unchecked(self.perspective.id.clone()).fetch_member(&server.id).await {
Ok(member) => {
let mut perm = server.default_permissions.0 as u32;
if let Some(roles) = member.roles {
for role in roles {
if let Some(server_role) = server.roles.get(&role) {
perm |= server_role.permissions.0 as u32;
}
}
}
Ok(perm)
}
Err(error) => {
match &error {
Error::NotFound => Ok(0),
_ => Err(error)
}
}
}
}
}
pub async fn for_server(self) -> Result<ServerPermissions<[u32; 1]>> {
Ok(ServerPermissions([self.calculate_server().await?]))
}
}
......@@ -10,10 +10,10 @@ use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)]
pub enum UserPermission {
Access = 1,
ViewProfile = 2,
SendMessage = 4,
Invite = 8,
Access = 0b00000000000000000000000000000001, // 1
ViewProfile = 0b00000000000000000000000000000010, // 2
SendMessage = 0b00000000000000000000000000000100, // 4
Invite = 0b00000000000000000000000000001000, // 8
}
bitfield! {
......@@ -30,7 +30,7 @@ impl_op_ex_commutative!(+ |a: &u32, b: &UserPermission| -> u32 { *a | *b as u32
pub fn get_relationship(a: &User, b: &str) -> RelationshipStatus {
if a.id == b {
return RelationshipStatus::Friend;
return RelationshipStatus::User;
}
if let Some(relations) = &a.relations {
......@@ -55,7 +55,7 @@ impl<'a> PermissionCalculator<'a> {
.map(|v| v.to_owned())
.unwrap_or_else(|| get_relationship(&self.perspective, &target))
{
RelationshipStatus::Friend => return Ok(u32::MAX),
RelationshipStatus::Friend | RelationshipStatus::User => return Ok(u32::MAX),
RelationshipStatus::Blocked | RelationshipStatus::BlockedOther => {
return Ok(UserPermission::Access as u32)
}
......@@ -64,12 +64,36 @@ impl<'a> PermissionCalculator<'a> {
// ! INFO: if we add boolean switch for permission to
// ! message people who have mutual, we need to get
// ! rid of this return statement.
return Ok(permissions);
// return Ok(permissions);
}
_ => {}
}
let check_server_overlap = async || {
let server_ids = User::fetch_server_ids(&self.perspective.id).await?;
Ok(
get_collection("server_members")
.find_one(
doc! {
"_id.user": &target,
"_id.server": {
"$in": server_ids
}
},
None
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_members",
})?
.is_some()
)
};
if self.has_mutual_connection
|| check_server_overlap().await?
|| get_collection("channels")
.find_one(
doc! {
......@@ -84,7 +108,7 @@ impl<'a> PermissionCalculator<'a> {
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
operation: "find_one",
with: "channels",
})?
.is_some()
......
......@@ -17,6 +17,7 @@ pub mod database;
pub mod notifications;
pub mod routes;
pub mod util;
pub mod version;
use async_std::task;
use chrono::Duration;
......@@ -39,7 +40,10 @@ async fn main() {
dotenv::dotenv().ok();
env_logger::init_from_env(env_logger::Env::default().filter_or("RUST_LOG", "info"));
info!("Starting REVOLT server.");
info!(
"Starting REVOLT server [version {}].",
crate::version::VERSION
);
util::variables::preflight_checks();
database::connect().await;
......@@ -82,14 +86,19 @@ async fn launch_web() {
templates: Templates {
verify_email: Template {
title: "Verify your REVOLT account.",
text: "Verify your email here: {{url}}",
html: include_str!("../assets/templates/verify.html"),
title: "Verify your Revolt account.",
text: "You're almost there!
If you did not perform this action you can safely ignore this email.
Please verify your account here: {{url}}",
html: None,
},
reset_password: Template {
title: "Reset your REVOLT password.",
text: "Reset your password here: {{url}}",
html: include_str!("../assets/templates/reset.html"),
title: "Reset your Revolt password.",
text: "You requested a password reset, if you did not perform this action you can safely ignore this email.
Reset your password here: {{url}}",
html: None,
},
welcome: None,
},
......
use hive_pubsub::PubSub;
use mongodb::bson::doc;
use rauth::auth::Session;
use rocket_contrib::json::JsonValue;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use super::hive::{get_hive, subscribe_if_exists};
use crate::database::*;
use crate::{database::*, util::result::Result};
#[derive(Serialize, Deserialize, Debug, Snafu)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "error")]
pub enum WebSocketError {
#[snafu(display("This error has not been labelled."))]
LabelMe,
#[snafu(display("Internal server error."))]
InternalError { at: String },
#[snafu(display("Invalid session."))]
InvalidSession,
#[snafu(display("User hasn't completed onboarding."))]
OnboardingNotFinished,
#[snafu(display("Already authenticated with server."))]
AlreadyAuthenticated,
}
......@@ -30,29 +25,70 @@ pub enum ServerboundNotification {
EndTyping { channel: String },
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveUserField {
ProfileContent,
ProfileBackground,
StatusText,
Avatar,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveChannelField {
Icon,
Description
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveServerField {
Icon,
Banner,
Description,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveRoleField {
Colour,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveMemberField {
Nickname,
Avatar,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
pub enum ClientboundNotification {
Error(WebSocketError),
Authenticated,
Ready {
users: Vec<User>,
servers: Vec<Server>,
channels: Vec<Channel>,
members: Vec<Member>
},
Message(Message),
MessageUpdate {
id: String,
channel: String,
data: JsonValue,
},
MessageDelete {
id: String,
channel: String,
},
ChannelCreate(Channel),
ChannelUpdate {
id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveChannelField>,
},
ChannelDelete {
id: String,
},
ChannelGroupJoin {
id: String,
......@@ -62,9 +98,6 @@ pub enum ClientboundNotification {
id: String,
user: String,
},
ChannelDelete {
id: String,
},
ChannelStartTyping {
id: String,
user: String,
......@@ -73,30 +106,88 @@ pub enum ClientboundNotification {
id: String,
user: String,
},
ChannelAck {
id: String,
user: String,
message_id: String,
},
ServerUpdate {
id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveServerField>,
},
ServerDelete {
id: String,
},
ServerMemberUpdate {
id: MemberCompositeKey,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveMemberField>,
},
ServerMemberJoin {
id: String,
user: String,
},
ServerMemberLeave {
id: String,
user: String,
},
ServerRoleUpdate {
id: String,
role_id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveRoleField>
},
ServerRoleDelete {
id: String,
role_id: String
},
UserUpdate {
id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveUserField>,
},
UserRelationship {
id: String,
user: User,
status: RelationshipStatus,
},
UserPresence {
UserSettingsUpdate {
id: String,
online: bool,
update: JsonValue,
},
}
impl ClientboundNotification {
pub async fn publish(self, topic: String) -> Result<(), String> {
prehandle_hook(&self); // ! TODO: this should be moved to pubsub
hive_pubsub::backend::mongo::publish(get_hive(), &topic, self).await
pub fn publish(self, topic: String) {
async_std::task::spawn(async move {
prehandle_hook(&self).await.ok(); // ! FIXME: this should be moved to pubsub
hive_pubsub::backend::mongo::publish(get_hive(), &topic, self)
.await
.ok();
});
}
pub fn publish_as_user(self, user: String) {
self.clone().publish(user.clone());
async_std::task::spawn(async move {
if let Ok(server_ids) = User::fetch_server_ids(&user).await {
for server in server_ids {
self.clone().publish(server.clone());
}
}
});
}
}
pub fn prehandle_hook(notification: &ClientboundNotification) {
pub async fn prehandle_hook(notification: &ClientboundNotification) -> Result<()> {
match &notification {
ClientboundNotification::ChannelGroupJoin { id, user } => {
subscribe_if_exists(user.clone(), id.clone()).ok();
......@@ -112,6 +203,23 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
subscribe_if_exists(recipient.clone(), channel_id.to_string()).ok();
}
}
Channel::TextChannel { server, .. }
| Channel::VoiceChannel { server, .. } => {
// ! FIXME: write a better algorithm?
let members = Server::fetch_member_ids(server).await?;
for member in members {
subscribe_if_exists(member.clone(), channel_id.to_string()).ok();
}
}
}
}
ClientboundNotification::ServerMemberJoin { id, user } => {
let server = Ref::from_unchecked(id.clone()).fetch_server().await?;
subscribe_if_exists(user.clone(), id.clone()).ok();
for channel in server.channels {
subscribe_if_exists(user.clone(), channel).ok();
}
}
ClientboundNotification::UserRelationship { id, user, status } => {
......@@ -121,27 +229,35 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
}
_ => {}
}
Ok(())
}
pub fn posthandle_hook(notification: &ClientboundNotification) {
pub async fn posthandle_hook(notification: &ClientboundNotification) {
match &notification {
ClientboundNotification::ChannelDelete { id } => {
get_hive().hive.drop_topic(&id).ok();
}
ClientboundNotification::ChannelGroupLeave { id, user } => {
get_hive().hive.unsubscribe(user, id).ok();
}
ClientboundNotification::ServerDelete { id } => {
get_hive().hive.drop_topic(&id).ok();
}
ClientboundNotification::ServerMemberLeave { id, user } => {
get_hive().hive.unsubscribe(user, id).ok();
if let Ok(server) = Ref::from_unchecked(id.clone()).fetch_server().await {
for channel in server.channels {
get_hive().hive.unsubscribe(user, &channel).ok();
}
}
}
ClientboundNotification::UserRelationship { id, user, status } => {
if status == &RelationshipStatus::None {
get_hive()
.hive
.unsubscribe(&id.to_string(), &user.id.to_string())
.ok();
get_hive().hive.unsubscribe(id, &user.id).ok();
}
}
ClientboundNotification::ChannelGroupLeave { id, user } => {
get_hive()
.hive
.unsubscribe(&user.to_string(), &id.to_string())
.ok();
}
_ => {}
}
}
......@@ -13,8 +13,11 @@ static HIVE: OnceCell<Hive> = OnceCell::new();
pub async fn init_hive() {
let hive = MongodbPubSub::new(
|ids, notification| {
super::events::posthandle_hook(&notification);
|ids, notification: ClientboundNotification| {
let notif = notification.clone();
async_std::task::spawn(async move {
super::events::posthandle_hook(&notif).await;
});
if let Ok(data) = to_string(&notification) {
debug!("Pushing out notification. {}", data);
......
......@@ -6,13 +6,9 @@ use crate::{
util::result::{Error, Result},
};
use futures::StreamExt;
use mongodb::{
bson::{doc, from_document},
options::FindOptions,
};
use mongodb::bson::{doc, from_document};
pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
let mut users = vec![];
let mut user_ids: HashSet<String> = HashSet::new();
if let Some(relationships) = &user.relations {
......@@ -23,10 +19,49 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
);
}
let members = User::fetch_memberships(&user.id).await?;
let server_ids: Vec<String> = members.iter()
.map(|x| x.id.server.clone())
.collect();
let mut cursor = get_collection("servers")
.find(
doc! {
"_id": {
"$in": server_ids
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "servers",
})?;
let mut servers = vec![];
let mut channel_ids = vec![];
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
let server: Server = from_document(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server",
})?;
channel_ids.extend(server.channels.iter().cloned());
servers.push(server);
}
}
let mut cursor = get_collection("channels")
.find(
doc! {
"$or": [
{
"_id": {
"$in": channel_ids
}
},
{
"channel_type": "SavedMessages",
"user": &user.id
......@@ -68,45 +103,21 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
}
user_ids.remove(&user.id);
if user_ids.len() > 0 {
let mut cursor = get_collection("users")
.find(
doc! {
"_id": {
"$in": user_ids.into_iter().collect::<Vec<String>>()
}
},
FindOptions::builder()
.projection(doc! { "_id": 1, "username": 1, "avatar": 1, "badges": 1, "status": 1 })
.build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "users",
})?;
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
let other: User = from_document(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "user",
})?;
let permissions = PermissionCalculator::new(&user)
.with_mutual_connection()
.with_user(&other)
.for_user_given()
.await?;
users.push(other.from(&user).with(permissions));
}
}
}
let mut users = if user_ids.len() > 0 {
user.fetch_multiple_users(user_ids.into_iter().collect::<Vec<String>>())
.await?
} else {
vec![]
};
user.relationship = Some(RelationshipStatus::User);
user.online = Some(true);
users.push(user);
Ok(ClientboundNotification::Ready { users, channels })
Ok(ClientboundNotification::Ready {
users,
servers,
channels,
members
})
}
......@@ -4,6 +4,7 @@ use super::hive::get_hive;
use futures::StreamExt;
use hive_pubsub::PubSub;
use mongodb::bson::doc;
use mongodb::bson::Document;
use mongodb::options::FindOptions;
pub async fn generate_subscriptions(user: &User) -> Result<(), String> {
......@@ -16,6 +17,43 @@ pub async fn generate_subscriptions(user: &User) -> Result<(), String> {
}
}
let server_ids = User::fetch_server_ids(&user.id)
.await
.map_err(|_| "Failed to fetch memberships.".to_string())?;
let channel_ids = get_collection("servers")
.find(
doc! {
"_id": {
"$in": &server_ids
}
},
None,
)
.await
.map_err(|_| "Failed to fetch servers.".to_string())?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
x.get_array("channels").ok().map(|v| {
v.into_iter()
.filter_map(|x| x.as_str().map(|x| x.to_string()))
.collect::<Vec<String>>()
})
})
.flatten()
.collect::<Vec<String>>();
for id in server_ids {
hive.subscribe(user.id.clone(), id)?;
}
for id in channel_ids {
hive.subscribe(user.id.clone(), id)?;
}
let mut cursor = get_collection("channels")
.find(
doc! {
......
......@@ -129,13 +129,14 @@ async fn accept(stream: TcpStream) {
send(payload);
if !was_online {
ClientboundNotification::UserPresence {
ClientboundNotification::UserUpdate {
id: id.clone(),
online: true,
data: json!({
"online": true
}),
clear: None
}
.publish(id)
.await
.ok();
.publish_as_user(id);
}
}
Err(_) => {
......@@ -173,9 +174,7 @@ async fn accept(stream: TcpStream) {
id: channel.clone(),
user,
}
.publish(channel)
.await
.ok();
.publish(channel);
} else {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
......@@ -196,9 +195,7 @@ async fn accept(stream: TcpStream) {
id: channel.clone(),
user,
}
.publish(channel)
.await
.ok();
.publish(channel);
} else {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
......@@ -234,13 +231,14 @@ async fn accept(stream: TcpStream) {
}
if let Some(id) = offline {
ClientboundNotification::UserPresence {
ClientboundNotification::UserUpdate {
id: id.clone(),
online: false,
data: json!({
"online": false
}),
clear: None
}
.publish(id)
.await
.ok();
.publish_as_user(id);
}
}
......@@ -250,10 +248,15 @@ pub fn publish(ids: Vec<String>, notification: ClientboundNotification) {
let users = USERS.read().unwrap();
for id in ids {
// Block certain notifications from reaching users that aren't meant to see them.
if let ClientboundNotification::UserRelationship { id: user_id, .. } = &notification {
if &id != user_id {
continue;
match &notification {
ClientboundNotification::UserRelationship { id: user_id, .. }
| ClientboundNotification::UserSettingsUpdate { id: user_id, .. }
| ClientboundNotification::ChannelAck { user: user_id, .. } => {
if &id != user_id {
continue;
}
}
_ => {}
}
if let Some(mut arr) = users.get_left(&id) {
......
use crate::database::*;
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use mongodb::bson::doc;
use mongodb::options::UpdateOptions;
#[put("/<target>/ack/<message>")]
pub async fn req(user: User, target: Ref, message: Ref) -> Result<()> {
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
let id = target.id();
get_collection("channel_unreads")
.update_one(
doc! {
"_id.channel": id,
"_id.user": &user.id
},
doc! {
"$unset": {
"mentions": 1
},
"$set": {
"last_id": &message.id
}
},
UpdateOptions::builder().upsert(true).build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel_unreads",
})?;
ClientboundNotification::ChannelAck {
id: id.to_string(),
user: user.id.clone(),
message_id: message.id,
}
.publish(user.id);
Ok(())
}
......@@ -11,6 +11,7 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
......@@ -95,20 +96,22 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
id: id.clone(),
user: user.id.clone(),
}
.publish(id.clone())
.await
.ok();
.publish(id.clone());
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::UserLeft { id: user.id }),
)
.publish(&target)
.await
.ok();
Content::SystemMessage(SystemMessage::UserLeft { id: user.id })
.send_as_system(&target)
.await
.ok();
Ok(())
}
Channel::TextChannel { .. } |
Channel::VoiceChannel { .. } => {
if perm.get_manage_channel() {
target.delete().await
} else {
Err(Error::MissingPermission)
}
}
}
}
use crate::database::*;
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use crate::{database::*, notifications::events::RemoveChannelField};
use mongodb::bson::{doc, to_document};
use rocket_contrib::json::Json;
......@@ -17,15 +17,20 @@ pub struct Data {
description: Option<String>,
#[validate(length(min = 1, max = 128))]
icon: Option<String>,
remove: Option<RemoveChannelField>,
}
#[patch("/<target>", data = "<info>")]
pub async fn req(user: User, target: Ref, info: Json<Data>) -> Result<()> {
let info = info.into_inner();
info.validate()
#[patch("/<target>", data = "<data>")]
pub async fn req(user: User, target: Ref, data: Json<Data>) -> Result<()> {
let data = data.into_inner();
data.validate()
.map_err(|error| Error::FailedValidation { error })?;
if info.name.is_none() && info.description.is_none() && info.icon.is_none() {
if data.name.is_none()
&& data.description.is_none()
&& data.icon.is_none()
&& data.remove.is_none()
{
return Ok(());
}
......@@ -40,19 +45,36 @@ pub async fn req(user: User, target: Ref, info: Json<Data>) -> Result<()> {
}
match &target {
Channel::Group { id, icon, .. } => {
Channel::Group { id, icon, .. }
| Channel::TextChannel { id, icon, .. }
| Channel::VoiceChannel { id, icon, .. } => {
let mut set = doc! {};
if let Some(name) = &info.name {
let mut unset = doc! {};
let mut remove_icon = false;
if let Some(remove) = &data.remove {
match remove {
RemoveChannelField::Icon => {
unset.insert("icon", 1);
remove_icon = true;
}
RemoveChannelField::Description => {
unset.insert("description", 1);
}
}
}
if let Some(name) = &data.name {
set.insert("name", name);
}
if let Some(description) = info.description {
if let Some(description) = &data.description {
set.insert("description", description);
}
let mut remove_icon = false;
if let Some(attachment_id) = info.icon {
let attachment = File::find_and_use(&attachment_id, "icons", "object", &user.id).await?;
if let Some(attachment_id) = &data.icon {
let attachment =
File::find_and_use(&attachment_id, "icons", "object", target.id()).await?;
set.insert(
"icon",
to_document(&attachment).map_err(|_| Error::DatabaseError {
......@@ -60,39 +82,62 @@ pub async fn req(user: User, target: Ref, info: Json<Data>) -> Result<()> {
with: "attachment",
})?,
);
remove_icon = true;
}
get_collection("channels")
.update_one(
doc! { "_id": &id },
doc! { "$set": &set },
None
)
.await
.map_err(|_| Error::DatabaseError { operation: "update_one", with: "channel" })?;
let mut operations = doc! {};
if set.len() > 0 {
operations.insert("$set", &set);
}
if unset.len() > 0 {
operations.insert("$unset", unset);
}
if operations.len() > 0 {
get_collection("channels")
.update_one(doc! { "_id": &id }, operations, None)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
}
ClientboundNotification::ChannelUpdate {
id: id.clone(),
data: json!(set),
clear: data.remove,
}
.publish(id.clone())
.await
.ok();
if let Some(name) = info.name {
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
.publish(id.clone());
if let Channel::Group { .. } = &target {
if let Some(name) = data.name {
Content::SystemMessage(SystemMessage::ChannelRenamed {
name,
by: user.id,
}),
)
.publish(&target)
.await
.ok();
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.description {
Content::SystemMessage(SystemMessage::ChannelDescriptionChanged {
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.icon {
Content::SystemMessage(SystemMessage::ChannelIconChanged { by: user.id })
.send_as_system(&target)
.await
.ok();
}
}
if remove_icon {
......
......@@ -11,6 +11,7 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> {
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
......
......@@ -15,7 +15,8 @@ pub async fn req(user: User, target: Ref, member: Ref) -> Result<()> {
.with_channel(&channel)
.for_channel()
.await?;
if !perm.get_view() {
if !perm.get_invite_others() {
Err(Error::MissingPermission)?
}
......@@ -52,19 +53,13 @@ pub async fn req(user: User, target: Ref, member: Ref) -> Result<()> {
id: id.clone(),
user: member.id.clone(),
}
.publish(id.clone())
.await
.ok();
.publish(id.clone());
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::UserAdded {
id: member.id,
by: user.id,
}),
)
.publish(&channel)
Content::SystemMessage(SystemMessage::UserAdded {
id: member.id,
by: user.id,
})
.send_as_system(&channel)
.await
.ok();
......