Mastering Rust gRPC Streaming with Tonic: Build Server & Client
This guide walks through creating a Rust project that uses the Tonic library to implement gRPC streaming, covering project setup, protobuf definitions, server and client code, testing with grpcurl, and enabling the reflection API for service introspection.
Project Planning
We create a single Cargo workspace named tonic_sample that contains both the gRPC server and client binaries. cargo new tonic_sample The Cargo.toml file includes the necessary dependencies such as tokio, tonic, prost, and tonic-build:
[package]
name = "tonic_sample"
version = "0.1.0"
edition = "2021"
[[bin]] # Server binary
name = "stream-server"
path = "src/stream_server.rs"
[[bin]] # Client binary
name = "stream-client"
path = "src/stream_client.rs"
[dependencies]
tokio.workspace = true
ton ic = "0.9"
ton ic-reflection = "0.9.2"
prost = "0.11"
tokio-stream = "0.1"
async-stream = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.7"
h2 = { version = "0.3" }
anyhow = "1.0.75"
futures-util = "0.3.28"
[build-dependencies]
ton ic-build = "0.9"We define the protobuf file proto/echo.proto that describes two messages and four RPC methods, including unary, server‑streaming, client‑streaming, and bidirectional streaming.
syntax = "proto3";
package stream;
message EchoRequest { string message = 1; }
message EchoResponse { string message = 1; }
service Echo {
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}The build.rs script invokes tonic_build::compile_protos to generate Rust code from the protobuf definition.
use std::{env, path::PathBuf};
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/echo.proto")?;
Ok(())
}Server Implementation
The server imports the generated protobuf module, implements the four RPC methods, and uses tokio_stream to produce a repeating stream for the server‑streaming case. It also sets up a channel to forward streamed items to the client.
pub mod pb { tonic::include_proto!("stream"); }
use anyhow::Result;
use futures_util::FutureExt;
use pb::{EchoRequest, EchoResponse};
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, thread, time::Duration};
use tokio::{net::TcpListener, sync::{mpsc, oneshot::{self, Receiver, Sender}, Mutex}, task::{self, JoinHandle}};
use tokio_stream::{wrappers::{ReceiverStream, TcpListenerStream}, Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};
type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;
#[derive(Debug)]
pub struct EchoServer {}
#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {
async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> {
let req_str = req.into_inner().message;
Ok(Response::new(EchoResponse { message: req_str }))
}
type ServerStreamingEchoStream = ResponseStream;
async fn server_streaming_echo(&self, req: Request<EchoRequest>) -> EchoResult<Self::ServerStreamingEchoStream> {
let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message });
let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Some(item) = stream.next().await {
if tx.send(Ok(item)).await.is_err() { break; }
}
});
Ok(Response::new(ReceiverStream::new(rx) as Self::ServerStreamingEchoStream))
}
async fn client_streaming_echo(&self, _: Request<Streaming<EchoRequest>>) -> EchoResult<EchoResponse> {
Err(Status::unimplemented("not implemented"))
}
type BidirectionalStreamingEchoStream = ResponseStream;
async fn bidirectional_streaming_echo(&self, req: Request<Streaming<EchoRequest>>) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(v) => tx.send(Ok(EchoResponse { message: v.message })).await.unwrap(),
Err(err) => { let _ = tx.send(Err(err)).await; }
}
}
});
Ok(Response::new(ReceiverStream::new(rx) as Self::BidirectionalStreamingEchoStream))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = EchoServer {};
Server::builder()
.add_service(pb::echo_server::EchoServer::new(server))
.serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
.await?;
Ok(())
}Client Implementation
The client creates an EchoClient, sends unary requests, consumes a limited number of server‑streaming responses, and demonstrates bidirectional streaming with throttled request intervals.
pub mod pb { tonic::include_proto!("stream"); }
use std::time::Duration;
use tokio_stream::{Stream, StreamExt};
use tonic::transport::Channel;
use pb::{echo_client::EchoClient, EchoRequest};
fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { message: format!("msg {:02}", i) })
}
async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) {
for i in 0..num {
let req = tonic::Request::new(EchoRequest { message: format!("msg{}", i) });
let resp = client.unary_echo(req).await.unwrap();
println!("resp:{}", resp.into_inner().message);
}
}
async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let stream = client.server_streaming_echo(EchoRequest { message: "foo".into() }).await.unwrap().into_inner();
let mut stream = stream.take(num);
while let Some(item) = stream.next().await { println!("\treceived: {}", item.unwrap().message); }
}
async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let in_stream = echo_requests_iter().take(num);
let response = client.bidirectional_streaming_echo(in_stream).await.unwrap();
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await { println!("\treceived message: `{}`", received.unwrap().message); }
}
async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
let in_stream = echo_requests_iter().throttle(dur);
let response = client.bidirectional_streaming_echo(in_stream).await.unwrap();
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await { println!("\treceived message: `{}`", received.unwrap().message); }
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap();
println!("Unary echo:");
unary_echo(&mut client, 10).await;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Streaming echo:");
streaming_echo(&mut client, 5).await;
tokio::time::sleep(Duration::from_secs(1)).await;
println!("
Bidirectional stream echo:");
bidirectional_streaming_echo(&mut client, 17).await;
println!("
Bidirectional stream echo (kill client with CTRL+C):");
bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
Ok(())
}Testing the Service
Run the server and client binaries with:
cargo run --bin stream-server
cargo run --bin stream-clientDuring development you can use grpcurl to query the service without a client binary. After enabling reflection (see below), the following commands work:
grpcurl -import-path ./proto -proto echo.proto list
grpcurl -import-path ./proto -proto echo.proto describe stream.Echo
grpcurl -plaintext -import-path ./proto -proto echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEchoEnabling Reflection API
Modify build.rs to generate a file descriptor set:
use std::{env, path::PathBuf};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
tonic_build::configure()
.file_descriptor_set_path(out_dir.join("stream_descriptor.bin"))
.compile(&["proto/echo.proto"], &["proto"])?;
Ok(())
}Expose the descriptor in the generated module and register it with tonic_reflection in stream_server.rs:
pub mod pb {
tonic::include_proto!("stream");
pub const STREAM_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("stream_descriptor");
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET)
.with_service_name("stream.Echo")
.build()?;
let addr = "0.0.0.0:50051".parse().unwrap();
let server = EchoServer {};
Server::builder()
.add_service(service)
.add_service(pb::echo_server::EchoServer::new(server))
.serve(addr)
.await?;
Ok(())
}After rebuilding, grpcurl -plaintext 127.0.0.1:50051 list correctly lists the service.
The full source code is available at https://github.com/jiashiwen/wenpanrust/tree/main/tonic_sample .
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
JD Cloud Developers
JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
