transition to OpenConfirm

This commit is contained in:
Masato Imai
2025-02-25 09:36:29 +00:00
parent 5c7f26c6be
commit da15033501
6 changed files with 116 additions and 2 deletions

View File

@ -1,2 +1,5 @@
[target.x86_64-unknown-linux-gnu]
runner = "sudo -E"
[alias]
t = "test -- --test-threads=1"

View File

@ -1,5 +1,8 @@
use anyhow::{Context, Ok, Result};
use bytes::BytesMut;
use std::io;
use std::result::Result::Ok;
use anyhow::{Context, Result};
use bytes::{BufMut, BytesMut};
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
@ -32,6 +35,53 @@ impl Connection {
Ok(Self { connection, buffer })
}
pub async fn get_message(&mut self) -> Option<Message> {
self.read_data_from_tcp_connection().await;
let buffer = self.split_buffer_at_message_separator()?;
Message::try_from(buffer).ok()
}
async fn read_data_from_tcp_connection(&mut self) {
loop {
let mut buf: Vec<u8> = vec![];
match self.connection.try_read_buf(&mut buf) {
Ok(0) => {}
Ok(n) => self.buffer.put(&buf[..]),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => panic!(
"An error occured while reading data from TCP connection: {:?}",
e
),
}
}
}
fn split_buffer_at_message_separator(&mut self) -> Option<BytesMut> {
let index = self.get_index_of_message_separator().ok()?;
if self.buffer.len() < index {
return None;
}
Some(self.buffer.split_to(index))
}
fn get_index_of_message_separator(&self) -> Result<usize> {
let minimum_message_length = 19;
if self.buffer.len() < 19 {
return Err(anyhow::anyhow!(
"Message length is too short: {}",
self.buffer.len()
));
}
Ok(u16::from_be_bytes([self.buffer[16], self.buffer[17]]) as usize)
}
pub async fn send(&mut self, message: Message) {
let bytes: BytesMut = message.into();
self.connection.write_all(&bytes[..]).await;

View File

@ -1,5 +1,8 @@
use crate::packets::open::OpenMessage;
#[derive(PartialEq, Eq, Debug, Clone, Hash)]
pub enum Event {
ManualStart,
TcpConnectionConfirmed,
BgpOpen(OpenMessage),
}

View File

@ -9,6 +9,7 @@ use super::{
open::OpenMessage,
};
#[derive(Debug)]
pub enum Message {
Open(OpenMessage),
}

View File

@ -36,6 +36,21 @@ impl Peer {
info!("event occurred, event={:?}", event);
self.handle_event(event).await;
}
if let Some(connection) = &mut self.tcp_connection {
if let Some(message) = connection.get_message().await {
info!("message received, message={:?}", message);
self.handle_message(message).await;
}
}
}
async fn handle_message(&mut self, message: Message) {
match message {
Message::Open(open) => {
self.event_queue.enqueue(Event::BgpOpen(open));
}
}
}
async fn handle_event(&mut self, event: Event) {
@ -68,6 +83,13 @@ impl Peer {
}
_ => {}
},
State::OpenSent => match event {
Event::BgpOpen(open) => {
//TODO: send keepalive message
self.state = State::OpenConfirm;
}
_ => {}
},
_ => {}
}
@ -88,6 +110,40 @@ mod tests {
use crate::peer::Peer;
use crate::state::State;
#[tokio::test]
async fn peer_can_transition_to_open_confirm_state() {
let config: Config = "64512 127.0.0.1 64513 127.0.0.2 active".parse().unwrap();
let mut peer = Peer::new(config);
peer.start();
tokio::spawn(async move {
let remote_config: Config = "64513 127.0.0.2 64512 127.0.0.1 passive".parse().unwrap();
let mut remote_peer = Peer::new(remote_config);
remote_peer.start();
let max_step = 50;
for _ in 0..max_step {
remote_peer.next().await;
if remote_peer.state == State::OpenConfirm {
break;
}
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
}
});
tokio::time::sleep(Duration::from_secs(1)).await;
let max_step = 50;
for _ in 0..max_step {
peer.next().await;
if peer.state == State::OpenConfirm {
break;
}
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
}
assert_eq!(peer.state, State::OpenConfirm);
}
#[tokio::test]
async fn peer_can_transition_to_connect_state() {
let config: Config = "64512 127.0.0.1 65413 127.0.0.2 active".parse().unwrap();

View File

@ -3,4 +3,5 @@ pub enum State {
Idle,
Connect,
OpenSent,
OpenConfirm,
}