From da1503350178984fd9d7816e21a81e3ffecfa00e Mon Sep 17 00:00:00 2001 From: Masato Imai Date: Tue, 25 Feb 2025 09:36:29 +0000 Subject: [PATCH] transition to OpenConfirm --- .cargo/config.toml | 3 +++ src/connection.rs | 54 ++++++++++++++++++++++++++++++++++++++-- src/event.rs | 3 +++ src/packets/message.rs | 1 + src/peer.rs | 56 ++++++++++++++++++++++++++++++++++++++++++ src/state.rs | 1 + 6 files changed, 116 insertions(+), 2 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 2967cf3..e158a70 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,5 @@ [target.x86_64-unknown-linux-gnu] runner = "sudo -E" + +[alias] +t = "test -- --test-threads=1" diff --git a/src/connection.rs b/src/connection.rs index 24a8bc2..8a92477 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,5 +1,8 @@ -use anyhow::{Context, Ok, Result}; -use bytes::BytesMut; +use std::io; +use std::result::Result::Ok; + +use anyhow::{Context, Result}; +use bytes::{BufMut, BytesMut}; use tokio::{ io::AsyncWriteExt, net::{TcpListener, TcpStream}, @@ -32,6 +35,53 @@ impl Connection { Ok(Self { connection, buffer }) } + pub async fn get_message(&mut self) -> Option { + self.read_data_from_tcp_connection().await; + + let buffer = self.split_buffer_at_message_separator()?; + + Message::try_from(buffer).ok() + } + + async fn read_data_from_tcp_connection(&mut self) { + loop { + let mut buf: Vec = vec![]; + + match self.connection.try_read_buf(&mut buf) { + Ok(0) => {} + Ok(n) => self.buffer.put(&buf[..]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => panic!( + "An error occured while reading data from TCP connection: {:?}", + e + ), + } + } + } + + fn split_buffer_at_message_separator(&mut self) -> Option { + let index = self.get_index_of_message_separator().ok()?; + + if self.buffer.len() < index { + return None; + } + + Some(self.buffer.split_to(index)) + } + + fn get_index_of_message_separator(&self) -> Result { + let minimum_message_length = 19; + + if self.buffer.len() < 19 { + return Err(anyhow::anyhow!( + "Message length is too short: {}", + self.buffer.len() + )); + } + + Ok(u16::from_be_bytes([self.buffer[16], self.buffer[17]]) as usize) + } + pub async fn send(&mut self, message: Message) { let bytes: BytesMut = message.into(); self.connection.write_all(&bytes[..]).await; diff --git a/src/event.rs b/src/event.rs index b03e0b9..c51694a 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,8 @@ +use crate::packets::open::OpenMessage; + #[derive(PartialEq, Eq, Debug, Clone, Hash)] pub enum Event { ManualStart, TcpConnectionConfirmed, + BgpOpen(OpenMessage), } diff --git a/src/packets/message.rs b/src/packets/message.rs index e1fafd5..2af2283 100644 --- a/src/packets/message.rs +++ b/src/packets/message.rs @@ -9,6 +9,7 @@ use super::{ open::OpenMessage, }; +#[derive(Debug)] pub enum Message { Open(OpenMessage), } diff --git a/src/peer.rs b/src/peer.rs index 650e8d5..7e9311e 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -36,6 +36,21 @@ impl Peer { info!("event occurred, event={:?}", event); self.handle_event(event).await; } + + if let Some(connection) = &mut self.tcp_connection { + if let Some(message) = connection.get_message().await { + info!("message received, message={:?}", message); + self.handle_message(message).await; + } + } + } + + async fn handle_message(&mut self, message: Message) { + match message { + Message::Open(open) => { + self.event_queue.enqueue(Event::BgpOpen(open)); + } + } } async fn handle_event(&mut self, event: Event) { @@ -68,6 +83,13 @@ impl Peer { } _ => {} }, + State::OpenSent => match event { + Event::BgpOpen(open) => { + //TODO: send keepalive message + self.state = State::OpenConfirm; + } + _ => {} + }, _ => {} } @@ -88,6 +110,40 @@ mod tests { use crate::peer::Peer; use crate::state::State; + #[tokio::test] + async fn peer_can_transition_to_open_confirm_state() { + let config: Config = "64512 127.0.0.1 64513 127.0.0.2 active".parse().unwrap(); + let mut peer = Peer::new(config); + peer.start(); + + tokio::spawn(async move { + let remote_config: Config = "64513 127.0.0.2 64512 127.0.0.1 passive".parse().unwrap(); + let mut remote_peer = Peer::new(remote_config); + remote_peer.start(); + + let max_step = 50; + for _ in 0..max_step { + remote_peer.next().await; + if remote_peer.state == State::OpenConfirm { + break; + } + tokio::time::sleep(Duration::from_secs_f32(0.1)).await; + } + }); + + tokio::time::sleep(Duration::from_secs(1)).await; + let max_step = 50; + for _ in 0..max_step { + peer.next().await; + if peer.state == State::OpenConfirm { + break; + } + tokio::time::sleep(Duration::from_secs_f32(0.1)).await; + } + + assert_eq!(peer.state, State::OpenConfirm); + } + #[tokio::test] async fn peer_can_transition_to_connect_state() { let config: Config = "64512 127.0.0.1 65413 127.0.0.2 active".parse().unwrap(); diff --git a/src/state.rs b/src/state.rs index 3113252..4f2cca3 100644 --- a/src/state.rs +++ b/src/state.rs @@ -3,4 +3,5 @@ pub enum State { Idle, Connect, OpenSent, + OpenConfirm, }