こんにちは。Tomoyuki(@tomoyuki65)です。
マイクロサービスとしてAPIを作る際などに、gRPC(Google Remote Procedure Call)が使われることがあります。
gRPCはGoogleが開発した高性能なオープンソースRPC(Remote Procedure Call)フレームワークで、HTTP/2上でバイナリ形式でデータをやり取りすることで高速かつ効率的な通信を実現可能です。
そんなgRPCは主にGo言語のAPIで使われるケースが多いですが、もちろんRustでも作ることが可能です。
そこでこの記事では、RustのgRPCでバックエンドAPIを開発する方法についてまとめます。
RustのtonicのgRPCでバックエンドAPIを開発する方法まとめ
gRPCについてはそもそもまだ一般的によく利用されている技術ではないため、実際にコードを書く前に、具体的にどういった場面で使われるのがを理解しておいた方がいいです。
一つ目は、iOSやAndroidアプリのバックエンドAPIとしてgRPCサーバーが使われます。
アプリ側ではgRPCネイティブクライアントというライブラリを使うことでgRPCサーバーと直接通信が可能であり、通信データのファイルサイズが小さく電力消費を抑える効果があったりするため、モバイルアプリのバックエンドAPIに最適です。
ただし、gRPCサーバーのエンドポイント(アクセス先)をインターネット上に外部公開するのはセキュリティリスクが高いため、間にプロキシサーバー(中継させて内部からgRPCへ接続させる)を置いてそのエンドポイントを外部公開させるようにするのが一般的です。
二つ目は、マイクロサービスの一つとしてgRPCサーバーを使うことです。
各種マイクロサービスの呼び出しをまとめるBFF(Backend For Frontend)から直接呼び出したり、またはマイクロサービス間連携(サーバー間通信)をさせるために使ったりします。
三つ目としてはもちろんWebブラウザからの利用もありますが、アプリのようにブラウザから直接gRPCサーバーに対して通信はできない(通信方法が異なる)ため、Webブラウザからの利用には工夫が必要になります。
その一つの方法としては、別途外部公開用のREST APIのエンドポイント(ストリーミング機能を使うならWebSocketが必要)を立て、そこからgRPCにサーバー間通信させることです。
このように、まずはgRPCが利用される場合の用途について理解しておくと、これからご紹介するgRPCサーバーの開発方法についての理解が深めやすくなると思います。
Rustのプロジェクトの初期化
まずは以下のコマンドを実行し、各種ファイルを作成します。
$ mkdir rust-grpc && cd rust-grpc
$ mkdir -p docker/local/rust && touch docker/local/rust/Dockerfile
$ touch .env compose.yml
次に作成したファイルをそれぞれ以下のように記述します。
・「docker/local/rust/Dockerfile」
FROM rust:1.87
WORKDIR /app
COPY . .
# インストール可能なパッケージ一覧の更新
RUN apt-get update && \
# パッケージのインストール
apt-get install -y \
protobuf-compiler \
curl
# gRPCのドキュメント生成やバリデーション用にgoをインストール
# goのバージョンを変更したい場合は以下リンク先のページで必要情報をご確認下さい。
# https://go.dev/dl/
ENV GO_VERSION=1.24.4
ENV GO_SHA256=d5501ee5aca0f258d5fe9bfaed401958445014495dc115f202d43d5210b45241
ENV GO_OS=linux
ENV GO_ARCH=arm64
ENV PATH="/usr/local/go/bin:/root/go/bin:${PATH}"
RUN curl -fsSLO "https://go.dev/dl/go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
echo "${GO_SHA256} go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" | sha256sum -c - && \
tar -C /usr/local -xzf "go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
rm "go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
rm -rf /var/lib/apt/lists/*
# gRPCのドキュメント生成用にgoのライブラリをインストール
RUN go install github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc@v1.5.1
# バリデーション用にgoのライブラリをインストール
RUN go install github.com/envoyproxy/protoc-gen-validate@v1.2.1
# ホットリロード用のライブラリをインストール
RUN cargo install cargo-watch
# Rust用のリンターをインストール
RUN rustup component add clippy
# Rust用のフォーマッターをインストール
RUN rustup component add rustfmt
※この記事でのRustのバージョンは「1.87」です。gRPCのドキュメントやバリデーション用のコード生成にはGo言語のライブラリを使用するため、Goのインストールもしています。
・「.env」
ENV=local
GRPC_PORT=50051
RUST_LOG=info
※gRPCの一般的なポート番号は「50051」です。
・「compose.yml」
services:
grpc:
container_name: rust-grpc
build:
context: .
dockerfile: ./docker/local/rust/Dockerfile
command: cargo watch -x run
volumes:
- .:/app
ports:
- "50051:50051"
# .env.testing利用時に上書きしたい環境変数を設定する
environment:
- ENV
- GRPC_PORT
- RUST_LOG
tty: true
stdin_open: true
次に以下のコマンドを実行し、コンテナのビルドとRustのプロジェクトの初期化を行います。
$ docker compose build --no-cache
$ docker compose run --rm grpc cargo init rust_grpc
※「cargo init rust_grpc」でCargo.tomlのpackageのnameが「rust_grpc」になります。初期化の際にgitも初期化されますが、メインブランチ名が古いmasterになっているため、mainに変更したい場合はコマンド「git branch -m master main」を実行して下さい。
コマンド実行後、下図のようにRustのプロジェクトが作成されればOKです。
.protoファイルからProtocol Buffersのコードを生成
RustでgRPCを作るには「tonic-build」と「prost-build」というクレートを使い、まずはProtocol Buffersのコードを生成する必要があります。
合わせてバリデーション用のコード生成には「prost-validate-build」というクレートを使います。
まずは以下の以下のコマンドを実行し、使用するクレートをインストールします。
$ docker compose run --rm grpc cargo add tonic@0.13.1
$ docker compose run --rm grpc cargo add tonic-reflection@0.13.1
$ docker compose run --rm grpc cargo add --build tonic-build@0.13.1 --features prost
$ docker compose run --rm grpc cargo add prost@0.13.5
$ docker compose run --rm grpc cargo add prost-validate@0.2.7 --features derive
$ docker compose run --rm grpc cargo add --build prost-build@0.13.5
$ docker compose run --rm grpc cargo add --build prost-validate-build@0.2.7
$ docker compose run --rm grpc cargo add tokio --features full
※tonicとprostなどは組み合わせる際にバージョンによって互換性が無かったりするため、上記では固定しています。
次に以下のコマンドを実行し、各種ファイルを作成します。
$ mkdir -p proto/sample && touch proto/sample/sample.proto
$ mkdir doc
$ touch build.rs
次に作成したファイルをそれぞれ以下のように記述します。
・「proto/sample/sample.proto」
syntax = "proto3";
import "validate/validate.proto";
package sample;
// 空のリクエストパラメータ
message Empty {}
// Helloメソッドのレスポンス結果
message HelloResponseBody {
// メッセージ
string message = 1;
}
// HelloAddTextメソッドのリクエストパラメータ
message HelloAddTextRequestBody {
// テキスト
string text = 1 [(validate.rules).string.min_len = 1];
}
// HelloAddTextメソッドのレスポンス結果
message HelloAddTextResponseBody {
// メッセージ
string message = 1;
}
// サンプルサービス
service SampleService {
// 「Hello World !!」を出力
rpc Hello(Empty) returns (HelloResponseBody) {}
// Returns:
// - 0 OK: HelloResponseBodyを出力
// - 2 Unknown: 不明なエラー
// 「Hello {リクエストパラメータのtext}」を出力
rpc HelloAddText(HelloAddTextRequestBody) returns (HelloAddTextResponseBody) {}
// Returns:
// - 0 OK: HelloAddTextResponseBodyを出力
// - 2 Unknown: 不明なエラー
// - 3 INVALID_ARGUMENT: バリデーションエラー
}
※gRPCはこのprotoファイルを元にAPIを作っていくため、スキーマ駆動開発です。ここではUnary RPC(1リクエスト-1レスポンス)のメソッドを2個定義しています。またドキュメント用のコメントも記述しています。(ただし、Returnsの部分はドキュメントに反映されないため、戻り値の確認はこのprotoファイルを参照することになります。)
・「build.rs」
use std::{env, path::PathBuf};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// コンフィグ設定を定義
let files = &["proto/sample/sample.proto"];
let includes = &[
"proto",
"../root/go/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v1.2.1",
];
let file_descriptor_set_path = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR is not set"))
.join("sample_descriptor.bin");
let mut config = {
let mut c = prost_build::Config::new();
c.service_generator(
tonic_build::configure()
.message_attribute(".", "#[derive(::prost_validate::Validator)]")
.service_generator(),
);
c
};
// コンフィグ設定にバリデーションを適用
prost_validate_build::Builder::new()
.file_descriptor_set_path(file_descriptor_set_path)
.configure(&mut config, files, includes)?;
// コンフィング設定からコード生成
config.compile_protos(files, includes)?;
Ok(())
}
※Rustではルートディレクトリにこのbuild.rsを作り、このファイルでgRPC用のProtocol Buffersのコードを生成します。バリデーション用のコードも合わせて生成するためにprost_validate_buildを使っています。バリデーション用のコード生成にGoのライブラリを使用しているため、includesの「../root/go/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v1.2.1」でコンテナ内のライブラリのパスを指定する必要があります。
次に以下のコマンドを実行し、コンテナを起動します。
$ docker compose up -d
コンテナを起動するとホットリロード(Dockerfile内でインストールしたライブラリを使用)によりビルドされ、targetディレクトリ配下にビルドされたファイルが格納されますが、「debug > build」の直下にあるrust_grpc〜のディレクトリ内に作られます。
※上図のrust_grpcの部分は、cargo init時に指定したパッケージ名
gRPCのドキュメント生成
次に以下のコマンドを実行し、protoファイルからgRPCのドキュメントを生成します。
$ docker compose exec grpc protoc -I=.:../root/go/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v1.2.1 --doc_out=./doc --doc_opt=markdown,docs.md ./proto/sample/sample.proto
※ドキュメント生成にはDockerfileでインストールしたGo言語のライブラリを使います。
コマンド実行後、下図のようにファイル「doc/docs.md」が作成されればOKです。
※VSCodeならmdファイルのプレビューも可能です。
共通処理を追加
次に共通処理に関するものを追加するため、以下のコマンドを実行して必要なクレートを追加します。
$ docker compose exec grpc cargo add envy
$ docker compose exec grpc cargo add env_logger
$ docker compose exec grpc cargo add serde --features derive
$ docker compose exec grpc cargo add thiserror
$ docker compose exec grpc cargo add log
$ docker compose exec grpc cargo add chrono
※コンフィグ、共通コンテキスト、共通エラー、ロガーを追加します。
次に以下のコマンドを実行し、共通処理用の各種ファイルを作成します。
$ mkdir -p src/configs && touch src/configs/config.rs src/configs/mod.rs
$ mkdir -p src/contexts && touch src/contexts/context.rs src/contexts/mod.rs
$ mkdir -p src/errors && src/errors/error.rs src/errors/mod.rs
$ mkdir -p src/loggers && src/loggers/logger.rs src/loggers/mod.rs
次に作成したファイルをそれぞれ以下のように記述します。
・「src/configs/config.rs」
use envy;
use serde::Deserialize;
// 環境変数のデフォルト値を返す関数
fn default_env() -> String {
"local".to_string()
}
fn default_grpc_port() -> u16 {
50051
}
fn default_rust_log() -> String {
"info".to_string()
}
// 環境変数の構造体
#[derive(Deserialize, Debug)]
pub struct Config {
#[serde(default = "default_env")]
pub env: String,
#[serde(default = "default_grpc_port")]
pub grpc_port: u16,
#[allow(dead_code)]
#[serde(default = "default_rust_log")]
pub rust_log: String,
}
// 環境変数を返す関数
pub fn get_config() -> Config {
match envy::from_env::<Config>() {
Ok(config) => config,
Err(err) => {
println!("環境変数の初期化エラー: {}", err);
// 環境変数にデフォルト値を設定して返す
Config {
env: default_env(),
grpc_port: default_grpc_port(),
rust_log: default_rust_log(),
}
}
}
}
※これは環境変数の値を使えるようにするコンフィング設定です。
・「src/configs/mod.rs」
pub mod config;
・「src/contexts/context.rs」
use tonic::Request;
// 共通コンテキストの構造体
#[derive(Clone, Debug)]
pub struct Context {
#[allow(dead_code)]
pub request_id: String,
#[allow(dead_code)]
pub uri: String,
}
// コンテキスト作成関数
pub fn create_context<T>(req: &Request<T>) -> Context {
let request_id = req
.metadata()
.get("x-request-id")
.map(|value| value.to_str().unwrap_or("-"))
.unwrap_or("-");
let uri = req
.metadata()
.get("x-uri")
.map(|value| value.to_str().unwrap_or("-"))
.unwrap_or("-");
Context {
request_id: request_id.to_string(),
uri: uri.to_string(),
}
}
※これはリクエスト単位で共有させたいコンテキスト情報です。
・「src/contexts/mod.rs」
pub mod context;
・「src/errors/error.rs」
use thiserror::Error;
use tonic::Status;
#[derive(Error, Debug)]
pub enum CommonError {
#[error("Internal Server Error")]
InternalServerError,
#[allow(dead_code)]
#[error("{message}")]
CustomError { status: Status, message: String },
}
※これはサービスファイル等での共通エラー設定です。
・「src/errors/mod.rs」
pub mod error;
・「src/loggers/logger.rs」
use chrono::TimeZone;
use log;
use std::io::Write;
// 共通コンテキスト
use crate::contexts::context::Context;
// コンフィング設定
use crate::configs::config::get_config;
// ロガーの初期化用関数
pub fn init_logger() {
// 日本時間を取得
let jst = chrono::offset::FixedOffset::east_opt(9 * 3600)
.unwrap()
.from_utc_datetime(&chrono::Utc::now().naive_utc());
// カスタムロガーの初期化
env_logger::builder()
.format(move |buf, record| {
writeln!(
buf,
"{} {} {}",
jst.format("%Y-%m-%d %H:%M:%S"),
record.level(),
record.args()
)
})
.init();
}
// 共通コンテキストからログに追加する情報の文字列を取得する関数
fn get_info_from_request(ctx: &Context) -> String {
format!("request_id={} uri={}", ctx.request_id, ctx.uri)
}
// ログ出力用関数
pub fn info(ctx: &Context, msg: &str) {
let config = get_config();
// ENV=testing以外の場合にログ出力
if config.env != "testing" {
let info = get_info_from_request(ctx);
log::info!("[{}] {}", info, msg);
}
}
// TODO: 使用する場合にコメントアウトを外す
// pub fn warn(ctx: &Context, msg: &str) {
// let config = get_config();
// // ENV=testing以外の場合にログ出力
// if config.env != "testing" {
// let info = get_info_from_request(ctx);
// log::warn!("[{}] {}", info, msg);
// }
// }
pub fn error(ctx: &Context, msg: &str) {
let config = get_config();
// ENV=testing以外の場合にログ出力
if config.env != "testing" {
let info = get_info_from_request(ctx);
log::error!("[{}] {}", info, msg);
}
}
※これはログ出力用のロガー設定です。
・「src/loggers/mod.rs」
pub mod logger;
ミドルウェアとインターセプターを追加
次にミドルウェア(リクエストの処理前に実行させる処理)を追加しますが、gRPCにおけるミドルウェアとしてはインターセプターになります。
ただし、Rustの場合はHTTPリクエスト用のミドルウェアを併用した方がいい可能性もある(インターセプターだけだとgRPCのメソッド名が取得できなかった)ため、ミドルウェアとインターセプターの両方を追加する例を示します。
まずは以下のコマンドを実行し、必要なクレートを追加します。
$ docker compose exec grpc cargo add hyper
$ docker compose exec grpc cargo add tower
$ docker compose exec grpc cargo add uuid --features v4
次に以下のコマンドを実行し、各種ファイルを作成します。
$ mkdir -p src/middleware && touch src/middleware/request_middleware.rs src/middleware/mod.rs
$ mkdir -p src/interceptors && touch src/interceptors/interceptor.rs src/interceptors/mod.rs
次に作成したファイルをそれぞれ以下のように記述します。
・「src/middleware/request_middleware.rs」
use hyper::header::HeaderValue;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tonic::server::NamedService;
use tower::{Layer, Service};
use uuid::Uuid;
// 共通コンテキスト
use crate::contexts::context::Context as CommonContext;
// ロガー
use crate::loggers::logger;
// リクエスト用のミドルウェア
#[derive(Clone)]
pub struct RequestMiddleware<S> {
inner: S,
}
impl<S> RequestMiddleware<S> {
fn new(inner: S) -> Self {
RequestMiddleware { inner }
}
}
// リクエスト用のミドルウェアにServiceトレイトを実装
impl<B, S> Service<hyper::Request<B>> for RequestMiddleware<S>
where
S: Service<hyper::Request<B>> + Clone + Send + 'static,
S::Future: Send,
B: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: hyper::Request<B>) -> Self::Future {
// リクエストからuriを取得
let uri_path = req.uri().path();
// uriをリクエストヘッダーに設定
let uri_string = match HeaderValue::from_str(uri_path) {
Ok(value) => {
req.headers_mut().insert("x-uri", value.clone());
value.to_str().unwrap().to_string()
}
Err(_) => {
// エラーの場合は「-」を設定
let header_value = HeaderValue::from_str("-").unwrap();
req.headers_mut().insert("x-uri", header_value.clone());
header_value.to_str().unwrap().to_string()
}
};
// request-idをリクエストヘッダーに設定
let uuid = Uuid::new_v4().to_string();
req.headers_mut()
.insert("x-request-id", uuid.clone().parse().unwrap());
// リクエスト開始ログ
let ctx = CommonContext {
request_id: uuid,
uri: uri_string,
};
logger::info(&ctx, "Start gRPC request !!");
// 非同期処理のためself.innerをコピー
let mut inner = self.inner.clone();
// 非同期処理
Box::pin(async move {
let res = inner.call(req).await;
// 処理完了後にリクエスト終了ログ
logger::info(&ctx, "Finish gRPC request !!");
res
})
}
}
// リクエスト用のミドルウェアにtonic::server::NamedServiceを実装
impl<S> NamedService for RequestMiddleware<S>
where
S: NamedService,
{
const NAME: &'static str = S::NAME;
}
// リクエスト用のミドルウェアのレイヤー
#[derive(Clone)]
pub struct RequestMiddlewareLayer;
// リクエスト用のミドルウェアのレイヤーにLayerトレイトの実装
impl<S> Layer<S> for RequestMiddlewareLayer {
type Service = RequestMiddleware<S>;
fn layer(&self, inner: S) -> Self::Service {
RequestMiddleware::new(inner)
}
}
※「req.uri().path()」で実行されたメソッドのパスが取得可能です。そしてリクエスト単位で一意のIDを共有できるように「x-request-id」を設定しています。またミドルウェアでリクエストヘッダーに情報を追加すると、gRPCのリクエストのメタデータに情報が追加される。(実行されたメソッド名をメタデータに追加するならこの方法が必要な模様。)
・「src/middleware/mod.rs」
pub mod request_middleware;
・「src/interceptors/interceptor.rs」
use tonic::{Code, Request, Status};
#[allow(clippy::result_large_err)]
pub fn auth_interceptor(req: tonic::Request<()>) -> Result<Request<()>, Status> {
// メタデータからx-uriを取得
let uri = match req.metadata().get("x-uri") {
Some(value) => value.to_str().unwrap_or_default(),
None => "-",
};
// 対象のuriの場合はスキップ
let skip_uri = vec![
"/sample.SampleService/Hello",
];
if skip_uri.contains(&uri) {
return Ok(req);
}
// リクエストヘッダーからトークン取得
let token = match req.metadata().get("authorization") {
Some(value) => {
let bearer_token = value.to_str().unwrap_or_default();
bearer_token.trim_start_matches("Bearer ")
}
None => "",
};
// 認証トークンが設定されていない場合はエラー
if token.is_empty() {
let status = Status::new(
Code::InvalidArgument,
"認証用トークンが設定されていません。",
);
return Err(status);
}
// TODO: 認証チェック処理を追加
// 戻り値
Ok(req)
}
※これは認証処理用のインターセプターです。特定のメソッドに対してだけインターセプターを適用することはできないため、インターセプターでは必要に応じてスキップする処理が必要になる。
・「src/interceptors/mod.rs」
pub mod interceptor;
RustのgRPCでUnary RPCのAPIを作成
次にUnary RPC(1リクエスト-1レスポンス)のAPIを作成します。
まずは以下のコマンドを実行し、必要なクレートを追加します。
$ docker compose exec grpc cargo add async-trait
$ docker compose exec grpc cargo add mockall
次に以下のコマンドを実行し、各種ファイルを作成します。
$ mkdir -p src/repositories/sample
$ touch src/repositories/mod.rs src/repositories/sample/sample_repository.rs src/repositories/sample/mod.rs
$ mkdir -p src/services/sample
$ touch src/services/mod.rs src/services/sample/sample_service.rs src/services/sample/mod.rs
$ mkdir -p src/usecases/sample
$ touch src/usecases/mod.rs src/usecases/sample/hello_usecase.rs src/usecases/sample/hello_add_text_usecase.rs src/usecases/sample/mod.rs
$ mkdir -p src/server/grpc/sample
$ touch src/server/mod.rs src/server/grpc/mod.rs src/server/grpc/sample/sample_server.rs src/server/grpc/sample/sample_server_1_test.rs src/server/grpc/sample/mod.rs
$ mkdir -p src/routers && touch src/routers/router.rs src/routers/mod.rs
$ touch .env.testing
次に作成したファイルをそれぞれ以下のように記述します。
・「src/repositories/mod.rs」
pub mod sample;
・「src/repositories/sample/sample_repository.rs」
// 共通コンテキスト
use crate::contexts::context::Context;
// コンフィング設定
use crate::configs::config::get_config;
// ロガー
use crate::loggers::logger;
// 共通エラー用モジュール
use crate::errors::error::CommonError;
// サンプルリポジトリーの構造体
pub struct SampleRepository;
impl SampleRepository {
// 初期化用メソッド
pub fn new() -> Self {
SampleRepository
}
}
// サンプルリポジトリー用のトレイト(モック化もできるように定義)
#[mockall::automock]
#[async_trait::async_trait]
pub trait SampleRepositoryTrait {
async fn sample_hello(&self, ctx: &Context) -> Result<String, CommonError>;
}
#[async_trait::async_trait]
impl SampleRepositoryTrait for SampleRepository {
// 文字列「Sample Hello !!」を返す関数
async fn sample_hello(&self, ctx: &Context) -> Result<String, CommonError> {
let mut text = "Hello World !!".to_string();
let config = get_config();
if config.env == "testing" {
text = "Hello World !! [ENV=testing]".to_string();
}
if text.is_empty() {
logger::error(ctx, "textが空です");
return Err(CommonError::InternalServerError);
}
Ok(text)
}
}
・「src/repositories/sample/mod.rs」
pub mod sample_repository;
・「src/services/mod.rs」
pub mod sample;
・「src/services/sample/sample_service.rs」
// 共通コンテキスト
use crate::contexts::context::Context;
// 共通エラー用モジュール
use crate::errors::error::CommonError;
// ロガー
use crate::loggers::logger;
// リポジトリ用のモジュール
use crate::repositories::sample::sample_repository::SampleRepositoryTrait;
// 使用するリポジトリーをまとめる構造体
pub struct SampleCommonRepository {
// Box<T>型で動的にメモリ領域確保
// Send: オブジェクトが異なるスレッド間で安全に送信できることを保証
// Sync: オブジェクトが複数のスレッドから同時にアクセスできることを保証
// 'static: オブジェクトのライフタイムがプログラムが終了するまで破棄されない
pub sample_repo: Box<dyn SampleRepositoryTrait + Send + Sync + 'static>,
}
// サンプルサービス
pub struct SampleService {
repo: SampleCommonRepository,
}
impl SampleService {
pub fn new(repo: SampleCommonRepository) -> Self {
SampleService { repo }
}
}
// サンプルサービス用のトレイト(モック化もできるように定義)
#[mockall::automock]
#[async_trait::async_trait]
pub trait SampleServiceTrait {
async fn sample_get_text_hello(&self, ctx: &Context) -> Result<String, CommonError>;
}
#[async_trait::async_trait]
impl SampleServiceTrait for SampleService {
async fn sample_get_text_hello(&self, ctx: &Context) -> Result<String, CommonError> {
let text = match self.repo.sample_repo.sample_hello(ctx).await {
Ok(text) => text,
Err(err) => {
logger::error(ctx, "sample_get_text_helloのsample_hello処理でエラー");
return Err(err);
}
};
Ok(text)
}
}
・「src/services/sample/mod.rs」
pub mod sample_service;
・「src/usecases/mod.rs」
pub mod sample;
・「src/usecases/sample/hello_usecase.rs」
use tonic::{Response, Status};
// 共通コンテキスト
use crate::contexts::context::Context;
// ロガー
use crate::loggers::logger;
// ビルドしたprotoファイルのモジュール
use crate::server::grpc::sample::sample_server::sample_proto;
// サービスのモジュール
use crate::services::sample::sample_service::{SampleService, SampleServiceTrait};
// 使用するサービスをまとめる構造体
pub struct SampleCommonService {
pub sample_service: SampleService,
}
// 実行するユースケースの構造体
pub struct SampleHelloUsecase {
pub service: SampleCommonService,
}
impl SampleHelloUsecase {
pub async fn exec(
&self,
ctx: Context,
) -> Result<Response<sample_proto::HelloResponseBody>, Status> {
// サンプルテキストを取得するサービスを実行
let text = match self
.service
.sample_service
.sample_get_text_hello(&ctx)
.await
{
Ok(text) => text,
Err(e) => {
let msg = format!("Internal Server Error: {}", e);
logger::error(&ctx, &msg);
return Err(Status::unknown(msg));
}
};
// レスポンスボディの設定
let res_body = sample_proto::HelloResponseBody { message: text };
// メタデータにrequest-idを追加
let mut res = Response::new(res_body);
res.metadata_mut()
.insert("x-request-id", ctx.request_id.parse().unwrap());
// 戻り値
Ok(res)
}
}
・「src/usecases/sample/hello_add_text_usecase.rs」
// tonic
use tonic::{Response, Status};
// 共通コンテキスト
use crate::contexts::context::Context;
// ビルドしたprotoファイルのモジュール
use crate::server::grpc::sample::sample_server::sample_proto;
// 実行するユースケースの構造体
pub struct SampleHelloAddTextUsecase {}
impl SampleHelloAddTextUsecase {
pub async fn exec(
&self,
ctx: Context,
req_body: sample_proto::HelloAddTextRequestBody,
) -> Result<Response<sample_proto::HelloAddTextResponseBody>, Status> {
// レスポンスメッセージの設定
let msg = format!("Hello {}", req_body.text);
// レスポンスボディの設定
let res_body = sample_proto::HelloAddTextResponseBody { message: msg };
// メタデータにrequest-idを追加
let mut res = Response::new(res_body);
res.metadata_mut()
.insert("x-request-id", ctx.request_id.parse().unwrap());
// 戻り値
Ok(res)
}
}
・「src/usecases/sample/mod.rs」
pub mod hello_add_text_usecase;
pub mod hello_usecase;
・「src/server/mod.rs」
pub mod grpc;
・「src/server/grpc/mod.rs」
pub mod sample;
・「src/server/grpc/sample/sample_server.rs」
// tonic
use tonic::{Request, Response, Status};
// バリデーション用のトレイト
use prost_validate::Validator;
// 共通コンテキスト
use crate::contexts::context::create_context;
// ロガー
use crate::loggers::logger;
// ビルドしたprotoファイルのインポート
pub mod sample_proto {
// protoファイルに定義したpakage名を指定
tonic::include_proto!("sample");
}
// リポジトリ
use crate::repositories::sample::sample_repository::SampleRepository;
// サービス
use crate::services::sample::sample_service::{SampleCommonRepository, SampleService};
// ユースケース
use crate::usecases::sample::hello_add_text_usecase::SampleHelloAddTextUsecase;
use crate::usecases::sample::hello_usecase::{SampleCommonService, SampleHelloUsecase};
// 構造体定義
#[derive(Debug, Default)]
pub struct SampleServer {}
// protoファイルの関数の実装をメソッド定義
#[tonic::async_trait]
impl sample_proto::sample_service_server::SampleService for SampleServer {
async fn hello(
&self,
request: Request<sample_proto::Empty>,
) -> Result<Response<sample_proto::HelloResponseBody>, Status> {
// コンテキスト設定
let ctx = create_context(&request);
// インスタンス化
let sample_repo = Box::new(SampleRepository::new());
let sample_common_repo = SampleCommonRepository { sample_repo };
let sample_service = SampleService::new(sample_common_repo);
let sample_common_service = SampleCommonService { sample_service };
let usecase = SampleHelloUsecase {
service: sample_common_service,
};
// ユースケースの実行
usecase.exec(ctx).await
}
async fn hello_add_text(
&self,
request: Request<sample_proto::HelloAddTextRequestBody>,
) -> Result<Response<sample_proto::HelloAddTextResponseBody>, Status> {
// コンテキスト設定
let ctx = create_context(&request);
// リクエストボディを取得
let req_body = request.into_inner();
// バリデーションチェック
match req_body.validate() {
Ok(_) => {}
Err(e) => {
let msg = format!("バリデーションエラー: {}", e);
logger::error(&ctx, &msg);
return Err(Status::invalid_argument(msg));
}
};
// インスタンス化
let usecase = SampleHelloAddTextUsecase {};
// ユースケースの実行
usecase.exec(ctx, req_body).await
}
}
// ルーターに設定するサーバー定義を返す関数
pub fn get_sample_server() -> sample_proto::sample_service_server::SampleServiceServer<SampleServer>
{
let sample_server = SampleServer::default();
sample_proto::sample_service_server::SampleServiceServer::new(sample_server)
}
・「src/server/grpc/sample/sample_server_1_test.rs」
#[cfg(test)]
// sample_serverのテスト
mod sample_server_test {
use crate::configs::config::get_config;
use crate::contexts::context::Context;
use crate::server::grpc::sample::sample_server::sample_proto::{
HelloAddTextRequestBody, sample_service_client::SampleServiceClient,
};
use tonic::Request;
// use crate::repositories::sample::sample_repository::SampleRepository;
use crate::repositories::sample::sample_repository::MockSampleRepositoryTrait;
use crate::services::sample::sample_service::{SampleCommonRepository, SampleService};
use crate::usecases::sample::hello_usecase::{SampleCommonService, SampleHelloUsecase};
#[tokio::test]
async fn hello_should_return_succeed() {
/* ユースケースを実行して検証する場合 */
// サンプルリポシトリーのインスタンス化
// リポジトリーのモック化が必要な場合
let mut mock_repo = MockSampleRepositoryTrait::new();
mock_repo
.expect_sample_hello()
.returning(|_| Ok("Mock Hello World !!".to_string()));
let sample_repo = Box::new(mock_repo);
// インスタンス化
let sample_common_repo = SampleCommonRepository { sample_repo };
let sample_service = SampleService::new(sample_common_repo);
let sample_common_service = SampleCommonService { sample_service };
let usecase = SampleHelloUsecase {
service: sample_common_service,
};
// コンテキストの設定
let ctx = Context {
request_id: "5ccba39d-fc9e-482a-aa6b-b94a450a53d0".to_string(),
uri: "/sample.SampleService/Hello".to_string(),
};
// ユースケースの実行
let res = usecase.exec(ctx).await;
// 検証
assert!(res.is_ok());
let res_body = res.unwrap().into_inner();
assert_eq!(res_body.message, "Mock Hello World !!");
}
#[tokio::test]
async fn hello_add_text_should_succeed() {
let config = get_config();
// サーバーのインスタンス作成
let mut client =
SampleServiceClient::connect(format!("http://localhost:{}", config.grpc_port))
.await
.unwrap();
// リクエストの作成
let mut request = Request::new(HelloAddTextRequestBody {
text: "Add World !!".to_string(),
});
// メタデータにBearerトークンを追加
request
.metadata_mut()
.insert("authorization", "Bearer token".parse().unwrap());
// テストの実行
let response = client.hello_add_text(request).await;
// 検証
assert!(response.is_ok());
let response_body = response.unwrap().into_inner();
assert_eq!(response_body.message, "Hello Add World !!");
}
#[tokio::test]
async fn hello_add_text_should_fail_with_empty_text() {
let config = get_config();
// サーバーのインスタンス作成
let mut client =
SampleServiceClient::connect(format!("http://localhost:{}", config.grpc_port))
.await
.unwrap();
// リクエストの作成
let mut request = Request::new(HelloAddTextRequestBody {
text: "".to_string(),
});
// インターセプターがチェックする無効な認証ヘッダーを追加
request
.metadata_mut()
.insert("authorization", "Bearer token".parse().unwrap());
let response = client.hello_add_text(request).await;
// 検証
assert!(response.is_err());
let status = response.err().unwrap();
assert_eq!(status.code(), tonic::Code::InvalidArgument);
assert!(status.message().contains("バリデーションエラー"));
}
}
・「src/server/grpc/sample/mod.rs」
pub mod sample_server;
// テストコード用のモジュール
mod sample_server_1_test;
・「src/routers/router.rs」
use tonic::{service::InterceptorLayer, transport::Server};
use tonic_reflection::server::Builder as ReflectionBuilder;
use tower::ServiceBuilder;
// コンフィング設定
use crate::configs::config::get_config;
// ミドルウェア
use crate::middleware::request_middleware::RequestMiddlewareLayer;
// インターセプター
use crate::interceptors::interceptor::auth_interceptor;
// protoファイル
pub mod proto {
pub const SAMPLE_DESCRIPTOR: &[u8] = tonic::include_file_descriptor_set!("sample_descriptor");
}
// gRPCサーバーのサービス
use crate::server::grpc::sample::sample_server::get_sample_server;
pub async fn router() -> Result<(), Box<dyn std::error::Error>> {
// 環境変数取得
let config = get_config();
// アドレス設定
let grpc_port = config.grpc_port;
let addr = format!("[::]:{}", grpc_port).parse()?;
// サーバー設定
let sample_server = get_sample_server();
// サーバーリフレクション設定(旧ツールではv1alphaを使う)
let reflection_service_v1 = ReflectionBuilder::configure()
.register_encoded_file_descriptor_set(proto::SAMPLE_DESCRIPTOR)
.build_v1()?;
let reflection_service_v1alpha = ReflectionBuilder::configure()
.register_encoded_file_descriptor_set(proto::SAMPLE_DESCRIPTOR)
.build_v1alpha()?;
// サービス設定
let service = ServiceBuilder::new()
.layer(RequestMiddlewareLayer)
.layer(InterceptorLayer::new(auth_interceptor))
.service(sample_server);
// サーバー起動
Server::builder()
.add_service(reflection_service_v1)
.add_service(reflection_service_v1alpha)
.add_service(service)
.serve(addr)
.await?;
Ok(())
}
※サーバーリフレクション設定をすることで、protoファイルを直接読み込まなくても、利用可能なメソッドを読み込むことができるようになります。Rustのサーバーリフレクション用のメソッドは2025年6月時点で2種類(build_v1()、build_v1alpha())ありますが、後述のPostmanでは「build_v1alpha()」を設定する必要があったので両方を設定しています。
・「src/routers/mod.rs」
pub mod router;
・「.env.testing」
ENV=testing
GRPC_PORT=50051
RUST_LOG=info
次にファイル「src/main.rs」を以下のように修正します。
// モジュールのインポート
mod configs;
mod contexts;
mod errors;
mod interceptors;
mod loggers;
mod middleware;
mod repositories;
mod routers;
mod server;
mod services;
mod usecases;
use configs::config::get_config;
use loggers::logger::init_logger;
use routers::router::router;
#[tokio::main]
async fn main() {
// 環境変数取得
let config = get_config();
// ロガーの初期化
init_logger();
// サーバー起動のログ出力
log::info!("[ENV={}] Start rust_grpc !!", config.env);
// サーバー起動
router().await.unwrap();
}
※main.rsでモジュールのインポートをすることで、同階層の別のディレクトリ内でモジュールを使えるようになります。
次に以下のコマンドを実行し、コンテナのログを確認します。
$ docker compose logs
コマンド実行後、下図のように想定通りのログが出力されればOKです。
コード修正後のフォーマット修正およびコード解析について
実務ではコード修正後などにはライブラリを使ってフォーマットの統一化や静的コード解析で警告が出ていないことをチェックした方がいいです。
Rust用のライブラリはDockerfileでライブラリをインストールしているため、以下のコマンドを実行してチェックできます。
$ docker compose exec grpc cargo fmt
$ docker compose exec grpc cargo clippy
PostmanでgRPCのAPIを試す
次に作成したgRPCのAPIをPostmanを使って試します。
まずはgRPC用のリクエスト画面を表示し、URLに「localhost:50051」を入力後、サービス定義にあるサーバーリフレクションのボタンなどから利用できるメソッドを読み込みます。
※src/routers/router.rsの「ReflectionBuilder」の処理でサーバーリフレクションの機能が使えます。コードにエラーがあったりするとメソッドの読み込みに失敗するので、その場合はコードの修正をして下さい。
サーバーリフレクション等により利用できるメソッドを読み込みできたら、「SampleService/Hello」選択し、「呼び出す」をクリックします。
実行後、想定通りのレスポンス結果が出力されればOKです。
次にメソッドで「SampleService/HelloAddText」を選択し、メッセージタブからリクエストボディを入力して実行します。
実行後、インターセプターの設定によりエラーになればOKです。
次にBearerトークンを設定して再度実行し、正常終了すればOKです。
gRPCのレスポンスのステータスコード一覧
ステータスコード | ステータス名 | 概要 |
---|---|---|
0 | OK | 処理が正常に完了したことを示します。 |
1 | CANCELLED | 呼び出し元によって処理がキャンセルされたことを示します。 |
2 | UNKNOWN | 不明なエラーが発生したことを示します。他のどのエラーにも分類できない場合に使用されます。 |
3 | INVALID_ARGUMENT | クライアントが無効な引数を指定したことを示します (例: フォーマットが不正なリソース名)。 |
4 | DEADLINE_EXCEEDED | 処理が完了する前にタイムアウト(デッドライン)したことを示します。 |
5 | NOT_FOUND | 要求されたリソースが見つからなかったことを示します。 |
6 | ALREADY_EXISTS | 作成しようとしたリソースがすでに存在していることを示します。 |
7 | PERMISSION_DENIED | 呼び出し元に、指定された処理を実行する権限がないことを示します。 |
8 | RESOURCE_EXHAUSTED | リソースが枯渇したことを示します (例: ディスク容量不足、割り当て超過など)。 |
9 | FAILED_PRECONDITION | 処理を実行するために必要なシステムの状態が満たされていないことを示します。 |
10 | ABORTED | 競合状態 (例: トランザクションの中断) などが原因で処理が中断されたことを示します。 |
11 | OUT_OF_RANGE | 範囲外の操作を試みたことを示します (例: ファイルの終端を超えて読み込もうとした)。 |
12 | UNIMPLEMENTED | その操作が実装されていないか、サービスで有効になっていないことを示します。 |
13 | INTERNAL | 予期しない内部エラーが発生したことを示します。回復が困難な深刻なエラーの場合に使用されます。 |
14 | UNAVAILABLE | サービスが一時的に利用できないことを示します。リトライによって解決する可能性があります。 |
15 | DATA_LOSS | 回復不能なデータの損失や破損が発生したことを示します。 |
16 | UNAUTHENTICATED | リクエストに有効な認証情報が含まれていないことを示します。 |
※gRPCでは上記のようなステータスを使います。
テストコードを実行して試す
上記ではテストコードも追加したため、それを試します。
テストコード実行の際にはテスト用の環境変数ファイル「.env.testing」を使いたいので、以下のコマンドを実行してコンテナを再起動します。
$ docker compose down
$ docker compose --env-file ./.env.testing up -d
次に以下のコマンドを実行し、コンテナのログを確認します。
$ docker compose logs
コマンド実行後、ログに「ENV=testing」が出力されていればOKです。
次に以下のコマンドを実行し、テストを実行します。
$ docker compose exec -e CARGO_TEST=testing grpc cargo test -- --nocapture --test-threads=1
テスト実行後、全てのテストがPASSすればOKです。
※今回は最低限必要になるハンドラー層(サーバー層)の部分のみテストを書きましたが、必要に応じてユースケース層、サービス層のテストも追加して下さい。(テストコードのカバレッジ率は80%以上にするのを推薦。)
gRPCのストリーミング機能を追加する
次にgRPCを使うメリットであるストリーミング機能を追加して試します。
ストリーミング機能については、サーバーストリーミング(1リクエスト-複数レスポンス)、クライアントストリーミング(複数リクエスト-1レスポンス)、双方向ストリーミング(複数リクエスト-複数レスポンス)の三種類があります。
まずは以下のコマンドを実行し、コンテナを再起動します。
$ docker compose down
$ docker compose up -d
次に以下のコマンドを実行し、必要なクレートを追加します。
$ docker compose exec grpc cargo add tokio-stream --features full
次に.protoファイル「proto/sample/sample.proto」を以下のように修正します。
syntax = "proto3";
import "validate/validate.proto";
package sample;
// 空のリクエストパラメータ
message Empty {}
// Helloメソッドのレスポンス結果
message HelloResponseBody {
// メッセージ
string message = 1;
}
// HelloAddTextメソッドのリクエストパラメータ
message HelloAddTextRequestBody {
// テキスト
string text = 1 [(validate.rules).string.min_len = 1];
}
// HelloAddTextメソッドのレスポンス結果
message HelloAddTextResponseBody {
// メッセージ
string message = 1;
}
// HelloServerStreamメソッドのリクエストパラメータ
message HelloServerStreamRequestBody {
// テキスト
string text = 1 [(validate.rules).string.min_len = 1];
}
// HelloServerStreamメソッドのレスポンス結果
message HelloServerStreamResponseBody {
// メッセージ
string message = 1;
}
// HelloClientStreamメソッドのリクエストパラメータ
message HelloClientStreamRequestBody {
// テキスト
string text = 1 [(validate.rules).string.min_len = 1];
}
// HelloClientStreamメソッドのレスポンス結果
message HelloClientStreamResponseBody {
// メッセージ
string message = 1;
}
// HelloBidirectionalStreamメソッドのリクエストパラメータ
message HelloBidirectionalStreamRequestBody {
// テキスト
string text = 1 [(validate.rules).string.min_len = 1];
}
// HelloBidirectionalStreamメソッドのレスポンス結果
message HelloBidirectionalStreamResponseBody {
// メッセージ
string message = 1;
}
// サンプルサービス
service SampleService {
// 「Hello World !!」を出力
rpc Hello(Empty) returns (HelloResponseBody) {}
// Returns:
// - 0 OK: HelloResponseBodyを出力
// - 2 Unknown: 不明なエラー
// 「Hello {リクエストパラメータのtext}」を出力
rpc HelloAddText(HelloAddTextRequestBody) returns (HelloAddTextResponseBody) {}
// Returns:
// - 0 OK: HelloAddTextResponseBodyを出力
// - 2 Unknown: 不明なエラー
// - 3 INVALID_ARGUMENT: バリデーションエラー
// サーバーストリーミング(1リクエスト-複数のレスポンス)
rpc HelloServerStream(HelloServerStreamRequestBody) returns (stream HelloServerStreamResponseBody) {}
// Returns:
// - 0 OK: HelloServerStreamResponseBodyを出力(複数回)
// - 2 Unknown: 不明なエラー
// - 3 INVALID_ARGUMENT: バリデーションエラー
// クライアントストリーミング(複数のリクエスト-1レスポンス)
rpc HelloClientStream(stream HelloClientStreamRequestBody) returns (HelloClientStreamResponseBody) {}
// Returns:
// - 0 OK: HelloClientStreamResponseBodyを出力
// - 2 Unknown: 不明なエラー
// - 3 INVALID_ARGUMENT: バリデーションエラー)
// 双方向ストリーミング(複数のリクエスト-複数のレスポンス)
rpc HelloBidirectionalStream(stream HelloBidirectionalStreamRequestBody) returns (stream HelloBidirectionalStreamResponseBody) {}
// Returns:
// - 0 OK: HelloClientStreamResponseBodyを出力
// - 2 Unknown: 不明なエラー
// - 3 INVALID_ARGUMENT: バリデーションエラー)
}
次に以下のコマンドを実行し、各種ファイルを作成します。
$ touch src/usecases/sample/hello_server_stream_usecase.rs
$ touch src/usecases/sample/hello_client_stream_usecase.rs
$ touch src/usecases/sample/hello_bidirectional_stream_usecase.rs
$ touch src/server/grpc/sample/sample_server_2_test.rs
次に作成したファイルをそれぞれ以下のように記述します。
・「src/usecases/sample/hello_server_stream_usecase.rs」
// tonic
use tonic::{
Response, Status,
metadata::{Ascii, MetadataValue},
};
// tokio
use tokio::{
spawn,
sync::mpsc,
time::{Duration, sleep},
};
use tokio_stream::wrappers::ReceiverStream;
// 変換用
use std::str::FromStr;
// 共通コンテキスト
use crate::contexts::context::Context;
// ロガー
use crate::loggers::logger;
// ビルドしたprotoファイルのモジュール
use crate::server::grpc::sample::sample_server::sample_proto;
// 実行するユースケースの構造体
pub struct SampleHelloServerStreamUsecase {}
// ストリーミング用の型定義
type HelloServerStreamStream =
ReceiverStream<Result<sample_proto::HelloServerStreamResponseBody, Status>>;
impl SampleHelloServerStreamUsecase {
pub async fn exec(
&self,
ctx: Context,
req_body: sample_proto::HelloServerStreamRequestBody,
) -> Result<Response<HelloServerStreamStream>, Status> {
// トレーラー用
let x_request_id = MetadataValue::<Ascii>::from_str(ctx.request_id.as_str()).expect("-");
// mpsc (multi-producer, single-consumer) チャンネルの作成
// サーバーはこのチャンネルにデータ送信し、その後クライアントにストリーミングする
// バッファサイズは適宜調整が必要だが、サーバーストリーミング機能なら1などでOK
let (tx, rx) = mpsc::channel(1);
spawn(async move {
logger::info(&ctx, "Start Server Stream !!");
// n件のデータをクライアントに返す(今回は3件)
for i in 1..=3 {
// レスポンスの設定
let msg = format!("[{}] Hello {} !", i, req_body.text);
let res_body = sample_proto::HelloServerStreamResponseBody { message: msg };
// Okでラップしたレスポンスをtxで送信
if let Err(e) = tx.send(Ok(res_body)).await {
// クライアントの接続切れなどでエラーの場合
let msg = format!("Failed to send data: {:?}", e);
logger::error(&ctx, &msg);
let mut status = Status::invalid_argument(msg);
status
.metadata_mut()
.insert("x-request-id", x_request_id.clone());
let _ = tx.send(Err(status)).await;
break;
}
// 1秒間待機処理
sleep(Duration::from_secs(1)).await;
}
// トレーラーの設定
let mut status = Status::ok("Stream finished successfully");
status
.metadata_mut()
.insert("x-request-id", x_request_id.clone());
// Errでラップしたステータスを送信
if let Err(e) = tx.send(Err(status)).await {
// クライアントの接続切れなどでエラーの場合
let msg = format!("Failed to send data: {:?}", e);
logger::error(&ctx, &msg);
let mut status = Status::invalid_argument(msg);
status.metadata_mut().insert("x-request-id", x_request_id);
let _ = tx.send(Err(status)).await;
}
logger::info(&ctx, "Finish Server Stream !!");
});
// 戻り値
Ok(Response::new(ReceiverStream::new(rx)))
}
}
・「src/usecases/sample/hello_client_stream_usecase.rs」
// tonic
use tonic::{
Request, Response, Status, Streaming,
metadata::{Ascii, MetadataValue},
};
// バリデーション用のトレイト
use prost_validate::Validator;
// 変換用
use std::str::FromStr;
// 共通コンテキスト
use crate::contexts::context::create_context;
// ロガー
use crate::loggers::logger;
// ビルドしたprotoファイルのモジュール
use crate::server::grpc::sample::sample_server::sample_proto;
// 実行するユースケースの構造体
pub struct SampleHelloClientStreamUsecase {}
impl SampleHelloClientStreamUsecase {
pub async fn exec(
&self,
request: Request<Streaming<sample_proto::HelloClientStreamRequestBody>>,
) -> Result<Response<sample_proto::HelloClientStreamResponseBody>, Status> {
// コンテキスト設定
let ctx = create_context(&request);
// トレーラー用
let x_request_id = MetadataValue::<Ascii>::from_str(ctx.request_id.as_str()).expect("-");
// リクエストからストリームを取り出す
let mut stream = request.into_inner();
logger::info(&ctx, "Start Client Stream !!");
// ストリームからデータを1件ずつ受信し、ストリームが閉じるまでループ
let mut lists = Vec::new();
while let Some(req) = stream.message().await? {
// バリデーションチェック
match req.validate() {
Ok(_) => {}
Err(e) => {
let msg = format!("バリデーションエラー: {}", e);
logger::error(&ctx, &msg);
let mut status = Status::invalid_argument(msg);
status
.metadata_mut()
.insert("x-request-id", x_request_id.clone());
return Err(status);
}
}
// データを配列に格納
lists.push(req.text);
}
// 配列のデータから文字列を作成
let msg = lists.join(",");
// レスポンスを設定
let mut res = Response::new(sample_proto::HelloClientStreamResponseBody { message: msg });
// トレーラー設定
res.metadata_mut().insert("x-request-id", x_request_id);
logger::info(&ctx, "Finish Client Stream !!");
// 戻り値
Ok(res)
}
}
・「src/usecases/sample/hello_bidirectional_stream_usecase.rs」
// tonic
use tonic::{
Request, Response, Status, Streaming,
metadata::{Ascii, MetadataValue},
};
// tokio
use tokio::{
spawn,
sync::mpsc,
time::{Duration, sleep},
};
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
// 変換用
use std::str::FromStr;
// バリデーション用のトレイト
use prost_validate::Validator;
// 共通コンテキスト
use crate::contexts::context::create_context;
// ロガー
use crate::loggers::logger;
// ビルドしたprotoファイルのモジュール
use crate::server::grpc::sample::sample_server::sample_proto;
// 実行するユースケースの構造体
pub struct SampleHelloBidirectionalStreamUsecase {}
// ストリーミング用の型定義
type HelloBidirectionalStreamStream =
ReceiverStream<Result<sample_proto::HelloBidirectionalStreamResponseBody, Status>>;
impl SampleHelloBidirectionalStreamUsecase {
pub async fn exec(
&self,
request: Request<Streaming<sample_proto::HelloBidirectionalStreamRequestBody>>,
) -> Result<Response<HelloBidirectionalStreamStream>, Status> {
// コンテキスト設定
let ctx = create_context(&request);
// トレーラー用
let x_request_id = MetadataValue::<Ascii>::from_str(ctx.request_id.as_str()).expect("-");
// リクエストからストリームを取り出す
let mut stream = request.into_inner();
// mpsc (multi-producer, single-consumer) チャンネルの作成
// サーバーはこのチャンネルにデータ送信し、その後クライアントにストリーミングする
// バッファサイズは適宜調整が必要
let (tx, rx) = mpsc::channel(1);
spawn(async move {
logger::info(&ctx, "Start Bidirectional Stream !!");
while let Some(result) = stream.next().await {
match result {
Ok(req) => {
// バリデーションチェック
match req.validate() {
Ok(_) => {}
Err(e) => {
let msg = format!("バリデーションエラー: {}", e);
logger::error(&ctx, &msg);
let mut status = Status::invalid_argument(msg);
status
.metadata_mut()
.insert("x-request-id", x_request_id.clone());
let _ = tx.send(Err(status)).await;
break;
}
}
// レスポンスを設定
let res = sample_proto::HelloBidirectionalStreamResponseBody {
message: req.text,
};
// Okでラップしたレスポンスをtxで送信
if let Err(e) = tx.send(Ok(res)).await {
// クライアントの接続切れなどでエラーの場合
let msg = format!("Failed to send data: {:?}", e);
logger::error(&ctx, &msg);
let mut status = Status::invalid_argument(msg);
status
.metadata_mut()
.insert("x-request-id", x_request_id.clone());
let _ = tx.send(Err(status)).await;
break;
}
// 1秒間待機処理
sleep(Duration::from_secs(1)).await;
}
Err(e) => {
// クライアントの接続切れなどでエラーの場合
let msg = format!("Failed to send data: {:?}", e);
logger::error(&ctx, &msg);
let mut status = Status::invalid_argument(msg);
status
.metadata_mut()
.insert("x-request-id", x_request_id.clone());
let _ = tx.send(Err(status)).await;
break;
}
}
}
// トレーラーの設定
let mut status = Status::ok("Stream finished successfully");
status
.metadata_mut()
.insert("x-request-id", x_request_id.clone());
// Errでラップしたステータスを送信
if let Err(e) = tx.send(Err(status)).await {
// クライアントの接続切れなどでエラーの場合
let msg = format!("Failed to send data: {:?}", e);
logger::error(&ctx, &msg);
let mut status = Status::invalid_argument(msg);
status.metadata_mut().insert("x-request-id", x_request_id);
let _ = tx.send(Err(status)).await;
}
logger::info(&ctx, "Finish Bidirectional Stream !!");
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}
・「src/server/grpc/sample/sample_server_2_test.rs」
#[cfg(test)]
// sample_serverのストリーミングのテスト
mod sample_server_streaming_test {
use crate::configs::config::get_config;
use crate::contexts::context::Context;
use crate::server::grpc::sample::sample_server::sample_proto::{
HelloBidirectionalStreamRequestBody, HelloClientStreamRequestBody,
HelloServerStreamRequestBody, sample_service_client::SampleServiceClient,
};
use crate::usecases::sample::hello_server_stream_usecase::SampleHelloServerStreamUsecase;
use tokio_stream::{StreamExt, iter};
use tonic::Request;
#[tokio::test]
async fn hello_server_stream_should_return_succeed_by_usecase() {
/* ユースケースを実行して検証する場合 */
// インスタンス化
let usecase = SampleHelloServerStreamUsecase {};
// コンテキストの設定
let ctx = Context {
request_id: "5ccba39d-fc9e-482a-aa6b-b94a450a53d0".to_string(),
uri: "/sample.SampleService/HelloServerStream".to_string(),
};
// リクエストの設定
let req = HelloServerStreamRequestBody {
text: "Server Stream".to_string(),
};
// ユースケースの実行
let res = usecase.exec(ctx, req).await;
// 検証
assert!(res.is_ok());
// ストリームの検証
let mut stream = res.unwrap().into_inner();
let mut i = 1;
while let Some(result) = stream.next().await {
match result {
Ok(res) => {
let msg = format!("[{}] Hello Server Stream !", i);
assert_eq!(res.message, msg);
}
Err(_) => {}
}
i += 1;
}
}
#[tokio::test]
async fn hello_server_stream_should_return_succeed_by_client() {
let config = get_config();
// サーバーのインスタンス作成
let mut client =
SampleServiceClient::connect(format!("http://localhost:{}", config.grpc_port))
.await
.unwrap();
// リクエストの作成
let mut request = Request::new(HelloServerStreamRequestBody {
text: "Server Stream".to_string(),
});
// メタデータにBearerトークンを追加
request
.metadata_mut()
.insert("authorization", "Bearer token".parse().unwrap());
// テストの実行
let res = client.hello_server_stream(request).await;
// 検証
assert!(res.is_ok());
// ストリームの検証
let mut stream = res.unwrap().into_inner();
let mut i = 1;
while let Some(result) = stream.next().await {
match result {
Ok(res) => {
let msg = format!("[{}] Hello Server Stream !", i);
assert_eq!(res.message, msg);
}
Err(_) => {}
}
i += 1;
}
}
#[tokio::test]
async fn hello_client_stream_should_return_succeed() {
let config = get_config();
// サーバーのインスタンス作成
let mut client =
SampleServiceClient::connect(format!("http://localhost:{}", config.grpc_port))
.await
.unwrap();
// 送信するメッセージリスト作成
let message_lists = vec![
HelloClientStreamRequestBody {
text: "A".to_string(),
},
HelloClientStreamRequestBody {
text: "B".to_string(),
},
HelloClientStreamRequestBody {
text: "C".to_string(),
},
];
// ストリームの作成
let mut request = Request::new(iter(message_lists));
// メタデータにBearerトークンを追加
request
.metadata_mut()
.insert("authorization", "Bearer token".parse().unwrap());
// テストの実行
let res = client.hello_client_stream(request).await;
// 検証
assert!(res.is_ok());
let res_body = res.unwrap().into_inner();
assert_eq!(res_body.message, "A,B,C");
}
#[tokio::test]
async fn hello_bidirectional_stream_should_return_succeed() {
let config = get_config();
// サーバーのインスタンス作成
let mut client =
SampleServiceClient::connect(format!("http://localhost:{}", config.grpc_port))
.await
.unwrap();
// 送信するメッセージリスト作成
let message_lists = vec![
HelloBidirectionalStreamRequestBody {
text: "A".to_string(),
},
HelloBidirectionalStreamRequestBody {
text: "B".to_string(),
},
];
// ストリームの作成
let mut request = Request::new(iter(message_lists.clone()));
// メタデータにBearerトークンを追加
request
.metadata_mut()
.insert("authorization", "Bearer token".parse().unwrap());
// テストの実行
let res = client.hello_bidirectional_stream(request).await;
// 検証
assert!(res.is_ok());
// ストリームの検証
let mut stream = res.unwrap().into_inner();
let mut i = 0;
while let Some(result) = stream.next().await {
match result {
Ok(res) => {
let msg = message_lists[i].text.clone();
assert_eq!(res.message, msg);
}
Err(_) => {}
}
i += 1;
}
}
}
次にファイル「src/usecases/sample/mod.rs」、「src/server/grpc/sample/mod.rs」、「src/server/grpc/sample/sample_server.rs」をそれぞれ以下のように修正します。
・「src/usecases/sample/mod.rs」
pub mod hello_add_text_usecase;
pub mod hello_bidirectional_stream_usecase;
pub mod hello_client_stream_usecase;
pub mod hello_server_stream_usecase;
pub mod hello_usecase;
・「src/server/grpc/sample/mod.rs」
pub mod sample_server;
// テストコード用のモジュール
mod sample_server_1_test;
mod sample_server_2_test;
・「src/server/grpc/sample/sample_server.rs」
// tonic
use tonic::{Request, Response, Status, Streaming};
// tokio
use tokio_stream::wrappers::ReceiverStream;
// バリデーション用のトレイト
use prost_validate::Validator;
// 共通コンテキスト
use crate::contexts::context::create_context;
// ロガー
use crate::loggers::logger;
// ビルドしたprotoファイルのインポート
pub mod sample_proto {
// protoファイルに定義したpakage名を指定
tonic::include_proto!("sample");
}
// リポジトリ
use crate::repositories::sample::sample_repository::SampleRepository;
// サービス
use crate::services::sample::sample_service::{SampleCommonRepository, SampleService};
// ユースケース
use crate::usecases::sample::hello_add_text_usecase::SampleHelloAddTextUsecase;
use crate::usecases::sample::hello_bidirectional_stream_usecase::SampleHelloBidirectionalStreamUsecase;
use crate::usecases::sample::hello_client_stream_usecase::SampleHelloClientStreamUsecase;
use crate::usecases::sample::hello_server_stream_usecase::SampleHelloServerStreamUsecase;
use crate::usecases::sample::hello_usecase::{SampleCommonService, SampleHelloUsecase};
// 構造体定義
#[derive(Debug, Default)]
pub struct SampleServer {}
// protoファイルの関数の実装をメソッド定義
#[tonic::async_trait]
impl sample_proto::sample_service_server::SampleService for SampleServer {
async fn hello(
&self,
request: Request<sample_proto::Empty>,
) -> Result<Response<sample_proto::HelloResponseBody>, Status> {
// コンテキスト設定
let ctx = create_context(&request);
// インスタンス化
let sample_repo = Box::new(SampleRepository::new());
let sample_common_repo = SampleCommonRepository { sample_repo };
let sample_service = SampleService::new(sample_common_repo);
let sample_common_service = SampleCommonService { sample_service };
let usecase = SampleHelloUsecase {
service: sample_common_service,
};
// ユースケースの実行
usecase.exec(ctx).await
}
async fn hello_add_text(
&self,
request: Request<sample_proto::HelloAddTextRequestBody>,
) -> Result<Response<sample_proto::HelloAddTextResponseBody>, Status> {
// コンテキスト設定
let ctx = create_context(&request);
// リクエストボディを取得
let req_body = request.into_inner();
// バリデーションチェック
match req_body.validate() {
Ok(_) => {}
Err(e) => {
let msg = format!("バリデーションエラー: {}", e);
logger::error(&ctx, &msg);
return Err(Status::invalid_argument(msg));
}
}
// インスタンス化
let usecase = SampleHelloAddTextUsecase {};
// ユースケースの実行
usecase.exec(ctx, req_body).await
}
// サーバーストリーミングの追加(typeの定義必須)
// ※protoファイルで定義した名称+Streamという型の定義が必要になる
type HelloServerStreamStream =
ReceiverStream<Result<sample_proto::HelloServerStreamResponseBody, Status>>;
async fn hello_server_stream(
&self,
request: Request<sample_proto::HelloServerStreamRequestBody>,
) -> Result<Response<Self::HelloServerStreamStream>, Status> {
// コンテキスト設定
let ctx = create_context(&request);
// リクエストボディを取得
let req_body = request.into_inner();
// バリデーションチェック
match req_body.validate() {
Ok(_) => {}
Err(e) => {
let msg = format!("バリデーションエラー: {}", e);
logger::error(&ctx, &msg);
return Err(Status::invalid_argument(msg));
}
}
// インスタンス化
let usecase = SampleHelloServerStreamUsecase {};
// ユースケースの実行
usecase.exec(ctx, req_body).await
}
// クライアントストリーミングの追加
async fn hello_client_stream(
&self,
request: Request<Streaming<sample_proto::HelloClientStreamRequestBody>>,
) -> Result<Response<sample_proto::HelloClientStreamResponseBody>, Status> {
// インスタンス化
let usecase = SampleHelloClientStreamUsecase {};
// ユースケースの実行
usecase.exec(request).await
}
// 双方向ストリーミングの追加(typeの定義必須)
// ※protoファイルで定義した名称+Streamという型の定義が必要になる
type HelloBidirectionalStreamStream =
ReceiverStream<Result<sample_proto::HelloBidirectionalStreamResponseBody, Status>>;
async fn hello_bidirectional_stream(
&self,
request: Request<Streaming<sample_proto::HelloBidirectionalStreamRequestBody>>,
) -> Result<Response<Self::HelloBidirectionalStreamStream>, Status> {
// インスタンス化
let usecase = SampleHelloBidirectionalStreamUsecase {};
// ユースケースの実行
usecase.exec(request).await
}
}
// ルーターに設定するサーバー定義を返す関数
pub fn get_sample_server() -> sample_proto::sample_service_server::SampleServiceServer<SampleServer>
{
let sample_server = SampleServer::default();
sample_proto::sample_service_server::SampleServiceServer::new(sample_server)
}
サーバーストリーミングを試す
次にPostmanを使ってサーバーストリーミング機能(1リクエスト-複数レスポンス)を試します。
サーバーリフレクションの再実行すると追加したメソッドが表示されるので、メソッド「SampleService/HelloServerStream」を選択して実行します。
実行後、想定通りに3件のレスポンスが返ってこればOKです。
※Bearerトークンも付与して実行して下さい。
クライアントストリーミングを試す
次にPostmanを使ってクライアントストリーミング機能(複数リクエスト-1レスポンス)を試します。
追加されたメソッド「SampleService/HelloClientStream」を選択して実行し、「呼び出す」をクリックして接続します。
接続後に画面右下の「送信」からリクエストを送信できます。
リクエスト送信後、画面下のレスポンスタブに送信結果が表示されるので、複数回リクエスト送信後に画面右下の「ストリーミング終了」をクリックします。
ストリーミング終了後、想定通りのレスポンスが返ってこればOKです。
双方向ストリーミングを試す
次にPostmanを使って双方向ストリーミング機能(複数リクエスト-複数レスポンス)を試します。
追加されたメソッド「SampleService/HelloBidirectionalStream」を選択して実行し、「呼び出す」をクリックして接続します。
次にリクエストを送信すると、すぐにレスポンス結果が返ってこればOKです。
※接続を終了する場合は「ストリーミングを終了」をクリックして下さい。
本番環境用のDockerコンテナを作る
本番環境にデプロイする際は、デプロイ用のDockerコンテナを作る必要があるため、それについても試します。
まず上記でコンテナを起動中なら以下のコマンドを実行して停止させます。
$ docker compose down
次に以下のコマンドを実行し、各種ファイルを作成します。
$ touch .env.production
$ mkdir -p docker/prod && touch docker/prod/Dockerfile
次に作成したファイルをそれぞれ以下のように記述します。
・「.env.production」
ENV=production
GRPC_PORT=50051
RUST_LOG=info
※本番環境用の機密情報を含まない環境変数の設定用として「.env.production」を使いますが、実際の本番環境における機密情報を含む環境変数についてはインフラの方のサービスなどで設定するようにして下さい。
・「docker/prod/Dockerfile」
# ####################
# # ビルドステージ
# ####################
FROM rust:1.87.0-alpine3.21 AS builder
WORKDIR /build
# ビルドに必要なパッケージをインストール
RUN apk update && \
apk add --no-cache \
openssl-dev \
alpine-sdk \
protobuf-dev \
curl
# gRPCのバリデーション用にgoをインストール
# goのバージョンを変更したい場合は以下リンク先のページで必要情報をご確認下さい。
# https://go.dev/dl/
ENV GO_VERSION=1.24.4
ENV GO_SHA256=d5501ee5aca0f258d5fe9bfaed401958445014495dc115f202d43d5210b45241
ENV GO_OS=linux
ENV GO_ARCH=arm64
ENV PATH="/usr/local/go/bin:/root/go/bin:${PATH}"
RUN curl -fsSLO "https://go.dev/dl/go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
echo "${GO_SHA256} go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" | sha256sum -c - && \
tar -C /usr/local -xzf "go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
rm "go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
rm -rf /var/lib/apt/lists/*
# バリデーション用にgoのライブラリをインストール
RUN go install github.com/envoyproxy/protoc-gen-validate@v1.2.1
COPY . .
# ビルド
RUN cargo build --release --locked
# ####################
# # 実行ステージ
# ####################
FROM alpine:3.21 AS runner
WORKDIR /app
# コンテナ用ユーザー作成
RUN addgroup --system --gid 1001 appuser && \
adduser --system --uid 1001 appuser
# ビルドステージで作成したバイナリをコピー
COPY --from=builder --chown=appuser:appuser /build/target/release/rust_grpc .
# ポートを設定
EXPOSE 50051
# コンテナ起動ユーザー設定
USER appuser
# APIサーバー起動コマンド
CMD ["./rust_grpc"]
※バリデーション用のコード生成にGo言語のライブラリを使用しているため、ビルドステージでGoのインストールが必要です。
次に以下のコマンドを実行し、コンテナをビルドします。
$ docker build --no-cache -f ./docker/prod/Dockerfile -t rust-grpc:latest .
次に以下のコマンドを実行し、ビルドしたコンテナを起動します。
$ docker run -d -p 50051:50051 --env-file .env.production rust-grpc:latest
コンテナ起動後、Docker Desktopなどでコンテナが起動されているのを確認できればOKです。
次に上記で作成した各種APIを実行して試し、想定通りに動作すればOKです。
データベース関連について
データベース関連については、REST API用の以下の記事と同様の方法で実装できるため、この記事では割愛させていただきます。

最後に
今回はRustのgRPCでバックエンドAPIを開発する方法について解説しました。
まだgRPC自体が一般的ではなく、Rust自体もこれから普及しているプログラミグ言語なため、現時点で使う可能性があるとすればマイクロサービスの一つとして利用するケースだと思います。
調べても情報が少なくてRustでgRPCを実現するのに非常に苦労しましたが、一通り情報をまとめられたので、よければぜひ参考にしてみて下さい!
コメント