Mastering Rust gRPC Streaming with Tonic: Build Server & Client

This guide walks through creating a Rust project that uses the Tonic library to implement gRPC streaming, covering project setup, protobuf definitions, server and client code, testing with grpcurl, and enabling the reflection API for service introspection.

JD Cloud Developers
JD Cloud Developers
JD Cloud Developers
Mastering Rust gRPC Streaming with Tonic: Build Server & Client

Project Planning

We create a single Cargo workspace named tonic_sample that contains both the gRPC server and client binaries. cargo new tonic_sample The Cargo.toml file includes the necessary dependencies such as tokio, tonic, prost, and tonic-build:

[package]
name = "tonic_sample"
version = "0.1.0"
edition = "2021"

[[bin]] # Server binary
name = "stream-server"
path = "src/stream_server.rs"

[[bin]] # Client binary
name = "stream-client"
path = "src/stream_client.rs"

[dependencies]
tokio.workspace = true
ton ic = "0.9"
ton ic-reflection = "0.9.2"
prost = "0.11"
tokio-stream = "0.1"
async-stream = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rand = "0.7"
h2 = { version = "0.3" }
anyhow = "1.0.75"
futures-util = "0.3.28"

[build-dependencies]
ton ic-build = "0.9"

We define the protobuf file proto/echo.proto that describes two messages and four RPC methods, including unary, server‑streaming, client‑streaming, and bidirectional streaming.

syntax = "proto3";
package stream;

message EchoRequest { string message = 1; }
message EchoResponse { string message = 1; }

service Echo {
  rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
  rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
  rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
  rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}

The build.rs script invokes tonic_build::compile_protos to generate Rust code from the protobuf definition.

use std::{env, path::PathBuf};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/echo.proto")?;
    Ok(())
}

Server Implementation

The server imports the generated protobuf module, implements the four RPC methods, and uses tokio_stream to produce a repeating stream for the server‑streaming case. It also sets up a channel to forward streamed items to the client.

pub mod pb { tonic::include_proto!("stream"); }

use anyhow::Result;
use futures_util::FutureExt;
use pb::{EchoRequest, EchoResponse};
use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, thread, time::Duration};
use tokio::{net::TcpListener, sync::{mpsc, oneshot::{self, Receiver, Sender}, Mutex}, task::{self, JoinHandle}};
use tokio_stream::{wrappers::{ReceiverStream, TcpListenerStream}, Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status, Streaming};

type EchoResult<T> = Result<Response<T>, Status>;
type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;

#[derive(Debug)]
pub struct EchoServer {}

#[tonic::async_trait]
impl pb::echo_server::Echo for EchoServer {
    async fn unary_echo(&self, req: Request<EchoRequest>) -> EchoResult<EchoResponse> {
        let req_str = req.into_inner().message;
        Ok(Response::new(EchoResponse { message: req_str }))
    }

    type ServerStreamingEchoStream = ResponseStream;
    async fn server_streaming_echo(&self, req: Request<EchoRequest>) -> EchoResult<Self::ServerStreamingEchoStream> {
        let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message });
        let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
        let (tx, rx) = mpsc::channel(128);
        tokio::spawn(async move {
            while let Some(item) = stream.next().await {
                if tx.send(Ok(item)).await.is_err() { break; }
            }
        });
        Ok(Response::new(ReceiverStream::new(rx) as Self::ServerStreamingEchoStream))
    }

    async fn client_streaming_echo(&self, _: Request<Streaming<EchoRequest>>) -> EchoResult<EchoResponse> {
        Err(Status::unimplemented("not implemented"))
    }

    type BidirectionalStreamingEchoStream = ResponseStream;
    async fn bidirectional_streaming_echo(&self, req: Request<Streaming<EchoRequest>>) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
        let mut in_stream = req.into_inner();
        let (tx, rx) = mpsc::channel(128);
        tokio::spawn(async move {
            while let Some(result) = in_stream.next().await {
                match result {
                    Ok(v) => tx.send(Ok(EchoResponse { message: v.message })).await.unwrap(),
                    Err(err) => { let _ = tx.send(Err(err)).await; }
                }
            }
        });
        Ok(Response::new(ReceiverStream::new(rx) as Self::BidirectionalStreamingEchoStream))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = EchoServer {};
    Server::builder()
        .add_service(pb::echo_server::EchoServer::new(server))
        .serve("0.0.0.0:50051".to_socket_addrs().unwrap().next().unwrap())
        .await?;
    Ok(())
}

Client Implementation

The client creates an EchoClient, sends unary requests, consumes a limited number of server‑streaming responses, and demonstrates bidirectional streaming with throttled request intervals.

pub mod pb { tonic::include_proto!("stream"); }

use std::time::Duration;
use tokio_stream::{Stream, StreamExt};
use tonic::transport::Channel;
use pb::{echo_client::EchoClient, EchoRequest};

fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
    tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest { message: format!("msg {:02}", i) })
}

async fn unary_echo(client: &mut EchoClient<Channel>, num: usize) {
    for i in 0..num {
        let req = tonic::Request::new(EchoRequest { message: format!("msg{}", i) });
        let resp = client.unary_echo(req).await.unwrap();
        println!("resp:{}", resp.into_inner().message);
    }
}

async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
    let stream = client.server_streaming_echo(EchoRequest { message: "foo".into() }).await.unwrap().into_inner();
    let mut stream = stream.take(num);
    while let Some(item) = stream.next().await { println!("\treceived: {}", item.unwrap().message); }
}

async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
    let in_stream = echo_requests_iter().take(num);
    let response = client.bidirectional_streaming_echo(in_stream).await.unwrap();
    let mut resp_stream = response.into_inner();
    while let Some(received) = resp_stream.next().await { println!("\treceived message: `{}`", received.unwrap().message); }
}

async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
    let in_stream = echo_requests_iter().throttle(dur);
    let response = client.bidirectional_streaming_echo(in_stream).await.unwrap();
    let mut resp_stream = response.into_inner();
    while let Some(received) = resp_stream.next().await { println!("\treceived message: `{}`", received.unwrap().message); }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = EchoClient::connect("http://127.0.0.1:50051").await.unwrap();
    println!("Unary echo:");
    unary_echo(&mut client, 10).await;
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("Streaming echo:");
    streaming_echo(&mut client, 5).await;
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("
Bidirectional stream echo:");
    bidirectional_streaming_echo(&mut client, 17).await;
    println!("
Bidirectional stream echo (kill client with CTRL+C):");
    bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
    Ok(())
}

Testing the Service

Run the server and client binaries with:

cargo run --bin stream-server
cargo run --bin stream-client

During development you can use grpcurl to query the service without a client binary. After enabling reflection (see below), the following commands work:

grpcurl -import-path ./proto -proto echo.proto list
grpcurl -import-path ./proto -proto echo.proto describe stream.Echo
grpcurl -plaintext -import-path ./proto -proto echo.proto -d '{"message":"1234"}' 127.0.0.1:50051 stream.Echo/UnaryEcho

Enabling Reflection API

Modify build.rs to generate a file descriptor set:

use std::{env, path::PathBuf};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
    tonic_build::configure()
        .file_descriptor_set_path(out_dir.join("stream_descriptor.bin"))
        .compile(&["proto/echo.proto"], &["proto"])?;
    Ok(())
}

Expose the descriptor in the generated module and register it with tonic_reflection in stream_server.rs:

pub mod pb {
    tonic::include_proto!("stream");
    pub const STREAM_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("stream_descriptor");
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let service = tonic_reflection::server::Builder::configure()
        .register_encoded_file_descriptor_set(pb::STREAM_DESCRIPTOR_SET)
        .with_service_name("stream.Echo")
        .build()?;
    let addr = "0.0.0.0:50051".parse().unwrap();
    let server = EchoServer {};
    Server::builder()
        .add_service(service)
        .add_service(pb::echo_server::EchoServer::new(server))
        .serve(addr)
        .await?;
    Ok(())
}

After rebuilding, grpcurl -plaintext 127.0.0.1:50051 list correctly lists the service.

The full source code is available at https://github.com/jiashiwen/wenpanrust/tree/main/tonic_sample .

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendRustStreaminggRPCAsynctonic
JD Cloud Developers
Written by

JD Cloud Developers

JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.