Multi Connection Example
This example starts off several Tcp connections on a loop to a remote endpoint: in this case the TcpListener
behind the NymProxyServer
instance on the echo server found in
nym/tools/echo-server/
(opens in a new tab). It pipes a few messages to it, logs the replies, and keeps track of the number of replies received per connection.
You can find this code here (opens in a new tab)
use nym_sdk::mixnet::Recipient;
use nym_sdk::tcp_proxy;
use rand::rngs::SmallRng;
use rand::Rng;
use rand::SeedableRng;
use serde::{Deserialize, Serialize};
use std::env;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::signal;
use tokio_stream::StreamExt;
use tokio_util::codec;
#[derive(Serialize, Deserialize, Debug)]
struct ExampleMessage {
message_id: i8,
message_bytes: Vec<u8>,
tcp_conn: i8,
}
// To run:
// - run the echo server with `cargo run`
// - run this example with `cargo run --example tcp_proxy_multistream -- <ECHO_SERVER_NYM_ADDRESS> <ENV_FILE_PATH> <CLIENT_PORT>` e.g.
// cargo run --example tcp_proxy_multistream -- DMHyxo8n6sKWHHTVvjRVDxDSMX8gYXRU1AQ6UpwsrWiB.6STYCWGWyRxqn2juWdgjMkAMsT9EaAzPpLWq5zkS68MB@CJG5zTcmoLijmDrtAiLV9PZHxNz8LQu6hmgA89V2RxxL ../../../envs/canary.env 8080
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let server_address = env::args().nth(1).expect("Server address not provided");
let server: Recipient =
Recipient::try_from_base58_string(&server_address).expect("Invalid server address");
// Comment this out to just see println! statements from this example.
// Nym client logging is very informative but quite verbose.
// The Message Decay related logging gives you an ideas of the internals of the proxy message ordering: you need to switch
// to DEBUG to see the contents of the msg buffer, sphinx packet chunking, etc.
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
let env_path = env::args().nth(2).expect("Env file not specified");
let env = env_path.to_string();
let listen_port = env::args().nth(3).expect("Port not specified");
// Within the TcpProxyClient, individual client shutdown is triggered by the timeout.
let proxy_client =
tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env)).await?;
tokio::spawn(async move {
proxy_client.run().await?;
Ok::<(), anyhow::Error>(())
});
println!("waiting for everything to be set up..");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("done. sending bytes");
// In the info traces you will see the different session IDs being set up, one for each TcpStream.
for i in 0..4 {
let conn_id = i;
println!("Starting TCP connection {}", conn_id);
let local_tcp_addr = format!("127.0.0.1:{}", listen_port.clone());
tokio::spawn(async move {
// Now the client and server proxies are running we can create and pipe traffic to/from
// a socket on the same port as our ProxyClient instance as if we were just communicating
// between a client and host via a normal TcpStream - albeit with a decent amount of additional latency.
//
// The assumption regarding integration is that you know what you're sending, and will do proper
// framing before and after, know what data types you're expecting; the proxies are just piping bytes
// back and forth using tokio's `Bytecodec` under the hood.
let stream = TcpStream::connect(local_tcp_addr).await?;
let (read, mut write) = stream.into_split();
// Lets just send a bunch of messages to the server with variable delays between them, with a message and tcp connection ids to keep track of ordering on the server side (for illustrative purposes **only**; keeping track of anonymous replies is handled by the proxy under the hood with Single Use Reply Blocks (SURBs); for this illustration we want some kind of app-level message id, but irl most of the time you'll probably be parsing on e.g. the incoming response type instead)
tokio::spawn(async move {
for i in 0..4 {
let mut rng = SmallRng::from_entropy();
let delay: f64 = rng.gen_range(2.5..5.0);
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
let random_bytes = gen_bytes_fixed(i as usize);
let msg = ExampleMessage {
message_id: i,
message_bytes: random_bytes,
tcp_conn: conn_id,
};
let serialised = bincode::serialize(&msg)?;
write
.write_all(&serialised)
.await
.expect("couldn't write to stream");
println!(
">> client sent {}: {} bytes on conn {}",
&i,
msg.message_bytes.len(),
&conn_id
);
}
Ok::<(), anyhow::Error>(())
});
tokio::spawn(async move {
let mut reply_counter = 0;
let codec = codec::BytesCodec::new();
let mut framed_read = codec::FramedRead::new(read, codec);
while let Some(Ok(bytes)) = framed_read.next().await {
match bincode::deserialize::<ExampleMessage>(&bytes) {
Ok(msg) => {
println!(
"<< client received {}: {} bytes on conn {}",
msg.message_id,
msg.message_bytes.len(),
msg.tcp_conn
);
reply_counter += 1;
println!(
"tcp connection {} replies received {}/4",
msg.tcp_conn, reply_counter
);
}
Err(e) => {
println!("<< client received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e);
}
}
}
});
Ok::<(), anyhow::Error>(())
});
let mut rng = SmallRng::from_entropy();
let delay: f64 = rng.gen_range(4.5..7.0);
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
}
// Once timeout is passed, you can either wait for graceful shutdown or just hard stop it.
signal::ctrl_c().await?;
println!("CTRL+C received, shutting down");
Ok(())
}
// emulate a series of small messages followed by a closing larger one
fn gen_bytes_fixed(i: usize) -> Vec<u8> {
let amounts = [10, 15, 50, 1000];
let len = amounts[i];
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.gen::<u8>()).collect()
}