Massively improve low-level packet stream interface.
Also, now PacketStream (whose previous version was Connection) should now, in theory (untested) support client connections to servers.master
parent
5aa93b97d0
commit
389bafe89d
|
@ -1165,9 +1165,9 @@ checksum = "502d53007c02d7605a05df1c1a73ee436952781653da5d0bf57ad608f66932c1"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.35"
|
version = "1.0.36"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "fb7f4c519df8c117855e19dd8cc851e89eb746fe7a73f0157e0d95fdec5369b0"
|
checksum = "4cdb98bcb1f9d81d07b536179c269ea15999b5d14ea958196413869445bb5250"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -1186,6 +1186,12 @@ dependencies = [
|
||||||
"unicode-xid",
|
"unicode-xid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "take_mut"
|
||||||
|
version = "0.2.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tempfile"
|
name = "tempfile"
|
||||||
version = "3.1.0"
|
version = "3.1.0"
|
||||||
|
@ -1270,6 +1276,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha-1",
|
"sha-1",
|
||||||
|
"take_mut",
|
||||||
"tokio",
|
"tokio",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
|
@ -32,6 +32,7 @@ async-trait = "0.1.36"
|
||||||
clap = { version = "2.33.1", features = ["yaml"] }
|
clap = { version = "2.33.1", features = ["yaml"] }
|
||||||
serde = { version = "1.0.114", features = ["derive"] }
|
serde = { version = "1.0.114", features = ["derive"] }
|
||||||
serde_json = "1.0.56"
|
serde_json = "1.0.56"
|
||||||
|
take_mut = "0.2.2"
|
||||||
tokio = { version = "0.2.22", features = ["io-util", "macros", "net", "tcp", "rt-threaded"] }
|
tokio = { version = "0.2.22", features = ["io-util", "macros", "net", "tcp", "rt-threaded"] }
|
||||||
uuid = { version = "0.8", features = ["serde"] }
|
uuid = { version = "0.8", features = ["serde"] }
|
||||||
|
|
||||||
|
|
|
@ -5,4 +5,3 @@ newline_style = "Native"
|
||||||
normalize_comments = true
|
normalize_comments = true
|
||||||
normalize_doc_attributes = true
|
normalize_doc_attributes = true
|
||||||
use_try_shorthand = true
|
use_try_shorthand = true
|
||||||
wrap_comments = true
|
|
||||||
|
|
42
src/main.rs
42
src/main.rs
|
@ -5,7 +5,7 @@
|
||||||
mod net;
|
mod net;
|
||||||
|
|
||||||
use crate::net::chat::Chat;
|
use crate::net::chat::Chat;
|
||||||
use crate::net::connection::Connection;
|
use crate::net::packet_stream::{Client, PacketStream, PacketStreamMaps};
|
||||||
use crate::net::protocol::state::handshake::Handshake;
|
use crate::net::protocol::state::handshake::Handshake;
|
||||||
use crate::net::protocol::state::login::Login;
|
use crate::net::protocol::state::login::Login;
|
||||||
use crate::net::protocol::state::play::Play;
|
use crate::net::protocol::state::play::Play;
|
||||||
|
@ -55,7 +55,7 @@ async fn listen(mut listener: TcpListener) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn accept_connection(socket: TcpStream) {
|
async fn accept_connection(socket: TcpStream) {
|
||||||
let con = Connection::new(socket);
|
let con = PacketStream::new(Box::new(socket));
|
||||||
|
|
||||||
eprintln!("Client connected.");
|
eprintln!("Client connected.");
|
||||||
match interact_handshake(con).await {
|
match interact_handshake(con).await {
|
||||||
|
@ -72,10 +72,10 @@ fn mk_err<A, S: std::borrow::Borrow<str>>(str: S) -> io::Result<A> {
|
||||||
Err(io::Error::new(io::ErrorKind::Other, str.borrow().to_string()))
|
Err(io::Error::new(io::ErrorKind::Other, str.borrow().to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interact_handshake(mut con: Connection<Handshake>) -> io::Result<()> {
|
async fn interact_handshake(mut con: PacketStream<Client, Handshake>) -> io::Result<()> {
|
||||||
use crate::net::protocol::state::handshake::*;
|
use crate::net::protocol::state::handshake::*;
|
||||||
|
|
||||||
match con.read().await? {
|
match con.recieve().await? {
|
||||||
Serverbound::HandshakePkt(handshake) => {
|
Serverbound::HandshakePkt(handshake) => {
|
||||||
use HandshakeNextState::*;
|
use HandshakeNextState::*;
|
||||||
|
|
||||||
|
@ -87,13 +87,13 @@ async fn interact_handshake(mut con: Connection<Handshake>) -> io::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interact_status(mut con: Connection<Status>) -> io::Result<()> {
|
async fn interact_status(mut con: PacketStream<Client, Status>) -> io::Result<()> {
|
||||||
use crate::net::protocol::state::status::*;
|
use crate::net::protocol::state::status::*;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match con.read().await? {
|
match con.recieve().await? {
|
||||||
Serverbound::Request(Request {}) => {
|
Serverbound::Request(Request {}) => {
|
||||||
con.write(&Clientbound::Response(Response {
|
con.send(&Clientbound::Response(Response {
|
||||||
data: ResponseData {
|
data: ResponseData {
|
||||||
version: ResponseVersion {
|
version: ResponseVersion {
|
||||||
name: "1.16.1".to_string(),
|
name: "1.16.1".to_string(),
|
||||||
|
@ -113,7 +113,7 @@ async fn interact_status(mut con: Connection<Status>) -> io::Result<()> {
|
||||||
.await?;
|
.await?;
|
||||||
},
|
},
|
||||||
Serverbound::Ping(ping) => {
|
Serverbound::Ping(ping) => {
|
||||||
con.write(&Clientbound::Pong(Pong { payload: ping.payload })).await?;
|
con.send(&Clientbound::Pong(Pong { payload: ping.payload })).await?;
|
||||||
|
|
||||||
// The status ping is now over so the server ends the connection.
|
// The status ping is now over so the server ends the connection.
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -122,13 +122,13 @@ async fn interact_status(mut con: Connection<Status>) -> io::Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interact_login(mut con: Connection<Login>) -> io::Result<()> {
|
async fn interact_login(mut con: PacketStream<Client, Login>) -> io::Result<()> {
|
||||||
use crate::net::protocol::state::login::*;
|
use crate::net::protocol::state::login::*;
|
||||||
|
|
||||||
let name = match con.read().await? {
|
let name = match con.recieve().await? {
|
||||||
Serverbound::LoginStart(login_start) => login_start.name,
|
Serverbound::LoginStart(login_start) => login_start.name,
|
||||||
_ => {
|
_ => {
|
||||||
con.write(&Clientbound::Disconnect(Disconnect {
|
con.send(&Clientbound::Disconnect(Disconnect {
|
||||||
reason: Chat {
|
reason: Chat {
|
||||||
text: "Unexpected packet (expected Login Start).".to_string(),
|
text: "Unexpected packet (expected Login Start).".to_string(),
|
||||||
},
|
},
|
||||||
|
@ -164,14 +164,14 @@ async fn interact_login(mut con: Connection<Login>) -> io::Result<()> {
|
||||||
|
|
||||||
let server_id = "";
|
let server_id = "";
|
||||||
|
|
||||||
con.write(&Clientbound::EncryptionRequest(EncryptionRequest {
|
con.send(&Clientbound::EncryptionRequest(EncryptionRequest {
|
||||||
server_id: server_id.to_string().into_boxed_str(),
|
server_id: server_id.to_string().into_boxed_str(),
|
||||||
public_key: public_key.clone().into_boxed_slice(),
|
public_key: public_key.clone().into_boxed_slice(),
|
||||||
verify_token: verify_token.clone().into_boxed_slice(),
|
verify_token: verify_token.clone().into_boxed_slice(),
|
||||||
}))
|
}))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let secret = match con.read().await? {
|
let secret = match con.recieve().await? {
|
||||||
Serverbound::EncryptionResponse(encryption_response) => {
|
Serverbound::EncryptionResponse(encryption_response) => {
|
||||||
let token = key
|
let token = key
|
||||||
.decrypt(PaddingScheme::PKCS1v15Encrypt, &encryption_response.verify_token)
|
.decrypt(PaddingScheme::PKCS1v15Encrypt, &encryption_response.verify_token)
|
||||||
|
@ -188,7 +188,7 @@ async fn interact_login(mut con: Connection<Login>) -> io::Result<()> {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
con = con.set_encryption(&secret).expect("Failed to set encryption.");
|
con.enable_encryption(&secret).expect("Failed to set encryption.");
|
||||||
|
|
||||||
#[cfg(feature = "authentication")]
|
#[cfg(feature = "authentication")]
|
||||||
{
|
{
|
||||||
|
@ -224,9 +224,15 @@ async fn interact_login(mut con: Connection<Login>) -> io::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "compression")]
|
#[cfg(feature = "compression")]
|
||||||
con.set_compression(Some(64)).await?;
|
{
|
||||||
|
con.send(&Clientbound::SetCompression(SetCompression {
|
||||||
|
threshold: crate::net::serialize::VarInt(64),
|
||||||
|
}))
|
||||||
|
.await?;
|
||||||
|
con.set_compression(Some(64));
|
||||||
|
}
|
||||||
|
|
||||||
con.write(&Clientbound::LoginSuccess(LoginSuccess {
|
con.send(&Clientbound::LoginSuccess(LoginSuccess {
|
||||||
uuid: uuid::Uuid::nil(),
|
uuid: uuid::Uuid::nil(),
|
||||||
username: name,
|
username: name,
|
||||||
}))
|
}))
|
||||||
|
@ -235,10 +241,10 @@ async fn interact_login(mut con: Connection<Login>) -> io::Result<()> {
|
||||||
interact_play(con.into_play()).await
|
interact_play(con.into_play()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn interact_play(mut con: Connection<Play>) -> io::Result<()> {
|
async fn interact_play(mut con: PacketStream<Client, Play>) -> io::Result<()> {
|
||||||
use crate::net::protocol::state::play::*;
|
use crate::net::protocol::state::play::*;
|
||||||
|
|
||||||
con.write(&Clientbound::Disconnect(Disconnect {
|
con.send(&Clientbound::Disconnect(Disconnect {
|
||||||
reason: Chat {
|
reason: Chat {
|
||||||
text: "Goodbye!".to_string(),
|
text: "Goodbye!".to_string(),
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,124 +0,0 @@
|
||||||
mod packet_format;
|
|
||||||
mod stream;
|
|
||||||
|
|
||||||
use crate::net::connection::packet_format::default::DefaultPacketFormat;
|
|
||||||
use crate::net::connection::packet_format::PacketFormat;
|
|
||||||
use crate::net::connection::stream::Stream;
|
|
||||||
use crate::net::protocol::packet_map::PacketMap;
|
|
||||||
use crate::net::protocol::state::handshake::Handshake;
|
|
||||||
use crate::net::protocol::state::login::Login;
|
|
||||||
use crate::net::protocol::state::play::Play;
|
|
||||||
use crate::net::protocol::state::status::Status;
|
|
||||||
use crate::net::protocol::state::ProtocolState;
|
|
||||||
use std::io;
|
|
||||||
use std::marker::PhantomData;
|
|
||||||
use tokio::io::BufStream;
|
|
||||||
use tokio::net::TcpStream;
|
|
||||||
|
|
||||||
pub struct Connection<St: ProtocolState> {
|
|
||||||
rw: Box<dyn Stream>,
|
|
||||||
fmt: Box<dyn PacketFormat>,
|
|
||||||
st: PhantomData<St>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<St: ProtocolState> Connection<St> {
|
|
||||||
pub async fn write(&mut self, pkt: &St::Clientbound) -> io::Result<()> {
|
|
||||||
// Turn the packet into bytes.
|
|
||||||
let mut contents = Vec::new();
|
|
||||||
pkt.write(&mut contents);
|
|
||||||
|
|
||||||
// Send the packet with the appropriate header.
|
|
||||||
self.fmt.send(&mut self.rw, &contents).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn read(&mut self) -> io::Result<St::Serverbound> {
|
|
||||||
use crate::net::serialize::VecPacketDeserializer;
|
|
||||||
|
|
||||||
let buf = self.fmt.recieve(&mut self.rw).await?;
|
|
||||||
|
|
||||||
St::Serverbound::read(&mut VecPacketDeserializer::new(buf.as_ref()))
|
|
||||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_state<NewSt: ProtocolState>(self) -> Connection<NewSt> {
|
|
||||||
Connection {
|
|
||||||
rw: self.rw,
|
|
||||||
fmt: self.fmt,
|
|
||||||
st: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn into_disconnected(self) -> Connection<!> {
|
|
||||||
self.into_state()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connection<Handshake> {
|
|
||||||
pub fn new(stream: TcpStream) -> Self {
|
|
||||||
Connection {
|
|
||||||
rw: Box::new(BufStream::new(stream)),
|
|
||||||
fmt: Box::new(DefaultPacketFormat),
|
|
||||||
st: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_status(self) -> Connection<Status> {
|
|
||||||
self.into_state()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_login(self) -> Connection<Login> {
|
|
||||||
self.into_state()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connection<Login> {
|
|
||||||
#[cfg(feature = "compression")]
|
|
||||||
pub async fn set_compression(&mut self, threshold: Option<u32>) -> io::Result<()> {
|
|
||||||
use crate::net::connection::packet_format::compressed::CompressedPacketFormat;
|
|
||||||
use crate::net::protocol::state::login::{Clientbound, SetCompression};
|
|
||||||
use crate::net::serialize::VarInt;
|
|
||||||
|
|
||||||
// Tell the client about the new compression threshold,
|
|
||||||
// using a packet compressed with the old compression threshold.
|
|
||||||
self.write(&Clientbound::SetCompression(SetCompression {
|
|
||||||
// A negative threshold will disable compression.
|
|
||||||
threshold: VarInt(threshold.map(|x| x as i32).unwrap_or(-1)),
|
|
||||||
}))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Further packets will use the new compression threshold.
|
|
||||||
match threshold {
|
|
||||||
Some(threshold) => {
|
|
||||||
self.fmt = Box::new(CompressedPacketFormat::new(threshold as usize));
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
self.fmt = Box::new(DefaultPacketFormat);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// WARNING: This function is not idempontent.
|
|
||||||
/// Calling it twice will result in the underlying stream getting encrypted
|
|
||||||
/// twice.
|
|
||||||
#[cfg(feature = "encryption")]
|
|
||||||
pub fn set_encryption(self, secret: &[u8]) -> Result<Self, String> {
|
|
||||||
use crate::net::connection::stream::encrypted::EncryptedStream;
|
|
||||||
use cfb8::stream_cipher::NewStreamCipher;
|
|
||||||
use cfb8::Cfb8;
|
|
||||||
|
|
||||||
let cipher: Cfb8<aes::Aes128> = Cfb8::new_var(secret, secret).map_err(|err| err.to_string())?;
|
|
||||||
|
|
||||||
Ok(Connection {
|
|
||||||
rw: Box::new(EncryptedStream::new(self.rw, cipher)),
|
|
||||||
fmt: self.fmt,
|
|
||||||
st: PhantomData,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_play(self) -> Connection<Play> {
|
|
||||||
self.into_state()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
#[cfg(feature = "compression")]
|
|
||||||
pub mod compressed;
|
|
||||||
pub mod default;
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use std::io;
|
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
|
|
||||||
|
|
||||||
pub type Reader = dyn AsyncRead + Unpin + Send;
|
|
||||||
pub type Writer = dyn AsyncWrite + Unpin + Send;
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait PacketFormat: Send + Sync {
|
|
||||||
async fn recieve(&self, src: &mut Reader) -> io::Result<Box<[u8]>>;
|
|
||||||
async fn send(&self, dest: &mut Writer, data: &[u8]) -> io::Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A completely arbitrary limitation on the maximum size of a recieved packet.
|
|
||||||
pub const MAX_PACKET_SIZE: usize = 35565;
|
|
||||||
|
|
||||||
pub async fn read_varint(src: &mut Reader) -> io::Result<(usize, i32)> {
|
|
||||||
let mut num_read: usize = 0;
|
|
||||||
let mut acc = 0;
|
|
||||||
while num_read < 5 {
|
|
||||||
let byte = src.read_u8().await?;
|
|
||||||
acc |= ((byte & 0b01111111) as i32) << (num_read * 7);
|
|
||||||
|
|
||||||
num_read += 1;
|
|
||||||
|
|
||||||
if byte & 0b10000000 == 0 {
|
|
||||||
return Ok((num_read, acc));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(io::Error::new(io::ErrorKind::Other, "VarInt was too long.".to_string()))
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
pub mod chat;
|
pub mod chat;
|
||||||
pub mod connection;
|
pub mod packet_stream;
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
pub mod serialize;
|
pub mod serialize;
|
||||||
|
|
|
@ -0,0 +1,237 @@
|
||||||
|
mod packet_format;
|
||||||
|
mod stream;
|
||||||
|
|
||||||
|
use crate::net::packet_stream::packet_format::{AutoPacketFormat, PacketFormat};
|
||||||
|
use crate::net::packet_stream::stream::Stream;
|
||||||
|
use crate::net::protocol::packet_map::PacketMap;
|
||||||
|
use crate::net::protocol::state::handshake::Handshake;
|
||||||
|
use crate::net::protocol::state::login::Login;
|
||||||
|
use crate::net::protocol::state::play::Play;
|
||||||
|
use crate::net::protocol::state::status::Status;
|
||||||
|
use crate::net::protocol::state::ProtocolState;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use std::io;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
|
/// Prevents outside types from implementing a trait.
|
||||||
|
mod sealed {
|
||||||
|
pub trait Sealed {}
|
||||||
|
}
|
||||||
|
use sealed::Sealed;
|
||||||
|
|
||||||
|
/// A remote end of a connection: either a client or server.
|
||||||
|
///
|
||||||
|
/// It is impossible to implement this trait for other types.
|
||||||
|
pub trait Remote: Sealed {}
|
||||||
|
/// The remote end of a connection is a client.
|
||||||
|
pub enum Client {}
|
||||||
|
/// The remote end of a connection is a server.
|
||||||
|
pub enum Server {}
|
||||||
|
impl Sealed for Client {}
|
||||||
|
impl Sealed for Server {}
|
||||||
|
impl Remote for Client {}
|
||||||
|
impl Remote for Server {}
|
||||||
|
|
||||||
|
/// A packet compression theshold (i.e. the minimum packet size to compress).
|
||||||
|
#[cfg(feature = "compression")]
|
||||||
|
pub type CompressionThreshold = usize;
|
||||||
|
#[cfg(not(feature = "compression"))]
|
||||||
|
pub type CompressionThreshold = !;
|
||||||
|
|
||||||
|
/// A shared secret used for packet encryption.
|
||||||
|
#[cfg(feature = "encryption")]
|
||||||
|
pub type SharedSecret = [u8];
|
||||||
|
#[cfg(not(feature = "encryption"))]
|
||||||
|
pub type SharedSecret = !;
|
||||||
|
|
||||||
|
/// A stream of packets.
|
||||||
|
///
|
||||||
|
/// The type parameters are used to ensure using the type system
|
||||||
|
/// that you can only send and recieve the correct type of packets.
|
||||||
|
pub struct PacketStream<Rem: Remote, St: ProtocolState> {
|
||||||
|
inner: Box<dyn Stream>,
|
||||||
|
compression_threshold: Option<CompressionThreshold>,
|
||||||
|
encryption_enabled: bool,
|
||||||
|
remote: PhantomData<Rem>,
|
||||||
|
state: PhantomData<St>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Rem: Remote, St: ProtocolState> PacketStream<Rem, St> {
|
||||||
|
/// Coerce the packet stream to any protocol state of your choosing.
|
||||||
|
/// This can be used to break PacketStream's type-enforced correctness guarantees,
|
||||||
|
/// so it should only be used internally.
|
||||||
|
///
|
||||||
|
/// The purpose of this function is to reduce code duplication
|
||||||
|
/// for all the pre-defined safe protocol state transitions.
|
||||||
|
fn into_state<NewSt: ProtocolState>(self) -> PacketStream<Rem, NewSt> {
|
||||||
|
PacketStream {
|
||||||
|
inner: self.inner,
|
||||||
|
compression_threshold: self.compression_threshold,
|
||||||
|
encryption_enabled: self.encryption_enabled,
|
||||||
|
remote: self.remote,
|
||||||
|
state: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send any packet over the stream, ignoring the declared ProtocolState.
|
||||||
|
/// This can be used to break PacketStream's type-enforced correctness guarantees,
|
||||||
|
/// so it should only be used internally.
|
||||||
|
///
|
||||||
|
/// The purpose of this function is to reduce code duplication
|
||||||
|
/// while implementing [`PacketStreamMaps`]'s send and recieve;
|
||||||
|
/// the sending and recieving code is always going to be the same,
|
||||||
|
/// but due to some shortcomings of Rust's type system
|
||||||
|
/// (specifically, because there's no way to approximate type-level functions;
|
||||||
|
/// in Haskell you could use multi-parameter type classes or type families),
|
||||||
|
/// we can't implement this generically safely over Remote/ProtocolState combinations.
|
||||||
|
async fn send_generic<Pkt: PacketMap>(&mut self, pkt: &Pkt) -> io::Result<()> {
|
||||||
|
let mut contents = Vec::new();
|
||||||
|
pkt.write(&mut contents);
|
||||||
|
|
||||||
|
AutoPacketFormat(self.compression_threshold)
|
||||||
|
.send(&mut self.inner, &contents)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Recieve any packet from the stream, ignoring the declared ProtocolState.
|
||||||
|
/// This can be used to break PacketStream's type-enforced correctness guarantees,
|
||||||
|
/// so it should only be used internally.
|
||||||
|
///
|
||||||
|
/// The purpose of this function is to reduce code duplication
|
||||||
|
/// while implementing [`PacketStreamMaps`]'s send and recieve;
|
||||||
|
/// the sending and recieving code is always going to be the same,
|
||||||
|
/// but due to some shortcomings of Rust's type system
|
||||||
|
/// (specifically, because there's no way to approximate type-level functions;
|
||||||
|
/// in Haskell you could use multi-parameter type classes or type families),
|
||||||
|
/// we can't implement this generically safely over Remote/ProtocolState combinations.
|
||||||
|
async fn recieve_generic<Pkt: PacketMap>(&mut self) -> io::Result<Pkt> {
|
||||||
|
use crate::net::serialize::VecPacketDeserializer;
|
||||||
|
|
||||||
|
let buf = AutoPacketFormat(self.compression_threshold)
|
||||||
|
.recieve(&mut self.inner)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Pkt::read(&mut VecPacketDeserializer::new(buf.as_ref()))
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition to the protocol disconnected state,
|
||||||
|
/// which prevents reading or writing any more packets.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn into_disconnected(self) -> PacketStream<Rem, !> {
|
||||||
|
self.into_state()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A valid combination of inbound and outbound packet maps,
|
||||||
|
/// which are determined by the protocol state and remote.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait PacketStreamMaps: Sealed {
|
||||||
|
/// The kind of packets that can be recieved from the remote.
|
||||||
|
type Inbound: PacketMap;
|
||||||
|
/// The kind of packets that can be sent to the remote.
|
||||||
|
type Outbound: PacketMap;
|
||||||
|
|
||||||
|
/// Recieve a packet from the remote.
|
||||||
|
async fn recieve(&mut self) -> io::Result<Self::Inbound>;
|
||||||
|
|
||||||
|
/// Send a packet to the remote.
|
||||||
|
async fn send(&mut self, pkt: &Self::Outbound) -> io::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St: ProtocolState> Sealed for PacketStream<Client, St> {}
|
||||||
|
#[async_trait]
|
||||||
|
impl<St: ProtocolState> PacketStreamMaps for PacketStream<Client, St> {
|
||||||
|
type Inbound = St::Serverbound;
|
||||||
|
type Outbound = St::Clientbound;
|
||||||
|
|
||||||
|
async fn recieve(&mut self) -> io::Result<Self::Inbound> {
|
||||||
|
self.recieve_generic().await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send(&mut self, pkt: &Self::Outbound) -> io::Result<()> {
|
||||||
|
self.send_generic(pkt).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St: ProtocolState> Sealed for PacketStream<Server, St> {}
|
||||||
|
#[async_trait]
|
||||||
|
impl<St: ProtocolState> PacketStreamMaps for PacketStream<Server, St> {
|
||||||
|
type Inbound = St::Clientbound;
|
||||||
|
type Outbound = St::Serverbound;
|
||||||
|
|
||||||
|
async fn recieve(&mut self) -> io::Result<Self::Inbound> {
|
||||||
|
self.recieve_generic().await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send(&mut self, pkt: &Self::Outbound) -> io::Result<()> {
|
||||||
|
self.send_generic(pkt).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Rem: Remote> PacketStream<Rem, Handshake> {
|
||||||
|
pub fn new(inner: Box<dyn Stream>) -> Self {
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
compression_threshold: None,
|
||||||
|
encryption_enabled: false,
|
||||||
|
remote: PhantomData,
|
||||||
|
state: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition to the protocol status state.
|
||||||
|
pub fn into_status(self) -> PacketStream<Rem, Status> {
|
||||||
|
self.into_state()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition to the protocol login state.
|
||||||
|
pub fn into_login(self) -> PacketStream<Rem, Login> {
|
||||||
|
self.into_state()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "encryption")]
|
||||||
|
pub type InvalidKeyNonceLength = cfb8::stream_cipher::InvalidKeyNonceLength;
|
||||||
|
#[cfg(not(feature = "encryption"))]
|
||||||
|
pub type InvalidKeyNonceLength = !;
|
||||||
|
|
||||||
|
impl<Rem: Remote> PacketStream<Rem, Login> {
|
||||||
|
/// Transition to the protocol play state.
|
||||||
|
pub fn into_play(self) -> PacketStream<Rem, Play> {
|
||||||
|
self.into_state()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the compression threshold, i.e. the minimum size of packet to compress.
|
||||||
|
/// Setting the compression threshold to None disables compression.
|
||||||
|
pub fn set_compression(&mut self, threshold: Option<CompressionThreshold>) {
|
||||||
|
self.compression_threshold = threshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
// I want to leave the function available even when encryption is disabled
|
||||||
|
// so that users of PacketStream can use the `Option<SharedSecret>` pattern
|
||||||
|
// to support encryption configuration without needing config features everywhere.
|
||||||
|
|
||||||
|
/// Sets the shared secret, which enables encryption.
|
||||||
|
/// Trying to enable encryption twice is an error, and the function will panic.
|
||||||
|
pub fn enable_encryption(&mut self, shared_secret: &SharedSecret) -> Result<(), InvalidKeyNonceLength> {
|
||||||
|
if self.encryption_enabled {
|
||||||
|
panic!("Tried to enable encryption twice!");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "encryption")]
|
||||||
|
{
|
||||||
|
use crate::net::packet_stream::stream::encrypted::EncryptedStream;
|
||||||
|
use cfb8::stream_cipher::NewStreamCipher;
|
||||||
|
use cfb8::Cfb8;
|
||||||
|
|
||||||
|
let cipher: Cfb8<aes::Aes128> = Cfb8::new_var(shared_secret, shared_secret)?;
|
||||||
|
take_mut::take(&mut self.inner, |inner| Box::new(EncryptedStream::new(inner, cipher)));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "encryption"))]
|
||||||
|
*shared_secret
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
#[cfg(feature = "compression")]
|
||||||
|
mod compressed;
|
||||||
|
mod default;
|
||||||
|
|
||||||
|
use crate::net::packet_stream::packet_format::default::DefaultPacketFormat;
|
||||||
|
use crate::net::packet_stream::CompressionThreshold;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use std::io;
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
|
||||||
|
|
||||||
|
pub type Reader = dyn AsyncRead + Unpin + Send;
|
||||||
|
pub type Writer = dyn AsyncWrite + Unpin + Send;
|
||||||
|
|
||||||
|
/// A packet format describes how to read a packet header and retrieve its data.
|
||||||
|
#[async_trait]
|
||||||
|
pub trait PacketFormat: Send + Sync {
|
||||||
|
/// Recieve the bytes of a packet's body (its id and header) from the provided stream.
|
||||||
|
/// This involves reading the packet header and performing decompression if necessary.
|
||||||
|
async fn recieve(&self, src: &mut Reader) -> io::Result<Box<[u8]>>;
|
||||||
|
|
||||||
|
/// Send the bytes of a packet's body (its id and header) through the provided stream.
|
||||||
|
/// This involves writing the packet header and performing compression if necessary.
|
||||||
|
async fn send(&self, dest: &mut Writer, data: &[u8]) -> io::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct AutoPacketFormat(pub Option<CompressionThreshold>);
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl PacketFormat for AutoPacketFormat {
|
||||||
|
async fn recieve(&self, src: &mut Reader) -> io::Result<Box<[u8]>> {
|
||||||
|
match self.0 {
|
||||||
|
#[cfg(not(feature = "compression"))]
|
||||||
|
Some(x) => x,
|
||||||
|
#[cfg(feature = "compression")]
|
||||||
|
Some(threshold) => {
|
||||||
|
use crate::net::packet_stream::packet_format::compressed::CompressedPacketFormat;
|
||||||
|
CompressedPacketFormat(threshold).recieve(src).await
|
||||||
|
},
|
||||||
|
None => DefaultPacketFormat.recieve(src).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send(&self, dest: &mut Writer, data: &[u8]) -> io::Result<()> {
|
||||||
|
match self.0 {
|
||||||
|
#[cfg(not(feature = "compression"))]
|
||||||
|
Some(x) => x,
|
||||||
|
#[cfg(feature = "compression")]
|
||||||
|
Some(threshold) => {
|
||||||
|
use crate::net::packet_stream::packet_format::compressed::CompressedPacketFormat;
|
||||||
|
CompressedPacketFormat(threshold).send(dest, data).await
|
||||||
|
},
|
||||||
|
None => DefaultPacketFormat.send(dest, data).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A completely arbitrary limitation on the maximum size of a recieved packet.
|
||||||
|
pub const MAX_PACKET_SIZE: usize = 35565;
|
||||||
|
|
||||||
|
async fn read_varint(src: &mut Reader) -> io::Result<(usize, i32)> {
|
||||||
|
let mut num_read: usize = 0;
|
||||||
|
let mut acc = 0;
|
||||||
|
while num_read < 5 {
|
||||||
|
let byte = src.read_u8().await?;
|
||||||
|
acc |= ((byte & 0b01111111) as i32) << (num_read * 7);
|
||||||
|
|
||||||
|
num_read += 1;
|
||||||
|
|
||||||
|
if byte & 0b10000000 == 0 {
|
||||||
|
return Ok((num_read, acc));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(io::Error::new(io::ErrorKind::Other, "VarInt was too long.".to_string()))
|
||||||
|
}
|
|
@ -1,17 +1,10 @@
|
||||||
use crate::net::connection::packet_format::{read_varint, PacketFormat, Reader, Writer, MAX_PACKET_SIZE};
|
use crate::net::packet_stream::packet_format::{read_varint, PacketFormat, Reader, Writer, MAX_PACKET_SIZE};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use std::boxed::Box;
|
use std::boxed::Box;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
pub struct CompressedPacketFormat {
|
#[repr(transparent)]
|
||||||
threshold: usize,
|
pub struct CompressedPacketFormat(pub usize);
|
||||||
}
|
|
||||||
|
|
||||||
impl CompressedPacketFormat {
|
|
||||||
pub fn new(threshold: usize) -> Self {
|
|
||||||
Self { threshold }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// A compressed header is in this format:
|
// A compressed header is in this format:
|
||||||
//
|
//
|
||||||
|
@ -76,7 +69,7 @@ impl PacketFormat for CompressedPacketFormat {
|
||||||
|
|
||||||
// If the length of the uncompressed data exceeds the threshold,
|
// If the length of the uncompressed data exceeds the threshold,
|
||||||
// then we will compress this packet.
|
// then we will compress this packet.
|
||||||
if data.len() >= self.threshold {
|
if data.len() >= self.0 {
|
||||||
// Now we compress the data.
|
// Now we compress the data.
|
||||||
use flate2::{Compress, FlushCompress};
|
use flate2::{Compress, FlushCompress};
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::net::connection::packet_format::{read_varint, PacketFormat, Reader, Writer, MAX_PACKET_SIZE};
|
use crate::net::packet_stream::packet_format::{read_varint, PacketFormat, Reader, Writer, MAX_PACKET_SIZE};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use std::boxed::Box;
|
use std::boxed::Box;
|
||||||
use std::io;
|
use std::io;
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::net::connection::stream::Stream;
|
use crate::net::packet_stream::stream::Stream;
|
||||||
use aes::Aes128;
|
use aes::Aes128;
|
||||||
use cfb8::stream_cipher::StreamCipher;
|
use cfb8::stream_cipher::StreamCipher;
|
||||||
use cfb8::Cfb8;
|
use cfb8::Cfb8;
|
|
@ -5,7 +5,7 @@ pub mod status;
|
||||||
|
|
||||||
use crate::net::protocol::packet_map::PacketMap;
|
use crate::net::protocol::packet_map::PacketMap;
|
||||||
|
|
||||||
pub trait ProtocolState {
|
pub trait ProtocolState: Send + Sync {
|
||||||
type Clientbound: PacketMap;
|
type Clientbound: PacketMap;
|
||||||
type Serverbound: PacketMap;
|
type Serverbound: PacketMap;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue