PR

Rustのtonic(gRPC)でDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ

応用

こんにちは。Tomoyuki(@tomoyuki65)です。

直近にRustのaxumによるDDD(ドメイン駆動設計)に関する記事を書きましたが、tonicを使ったgRPCにおいてもDDDが必要になっていく可能性があると思います。

そこでこの記事では、Rustのtonic(gRPC)でDDD構成のバックエンドAPIを開発する方法についてまとめます。

 

【関連記事】

RustのaxumでDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。Rustについては当ブログなどもきっかけになったりして、これから流行っていく可能性があるプログラミング言語ですが、これまではクリーンアーキテクチャを参考にしてRustのAPIの作り...
RustのtonicのgRPCでバックエンドAPIを開発する方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。マイクロサービスとしてAPIを作る際などに、gRPC(Google Remote Procedure Call)が使われることがあります。gRPCはGoogleが開発した高性能なオ...

 

Rustのtonic(gRPC)でDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ

まずは以下のコマンド実行して各種ファイルを作成します。

$ mkdir rust-grpc-domain && cd rust-grpc-domain
$ mkdir -p docker/local/rust && touch docker/local/rust/Dockerfile
$ touch compose.yml .env

※開発環境の構築にはDockerを使うため、もしまだ使えないという方はDocker Desktopなどをインストールして事前に使えるようにして下さい。

 

次に作成した各種ファイルについて、それぞれ以下のように記述します。

・「docker/local/rust/Dockerfile」

FROM rust:1.87

WORKDIR /app

COPY . .

# インストール可能なパッケージ一覧の更新
RUN apt-get update && \
    # パッケージのインストール
    apt-get install -y \
      protobuf-compiler \
      curl

# gRPCのドキュメント生成やバリデーション用にgoをインストール
# goのバージョンを変更したい場合は以下リンク先のページで必要情報をご確認下さい。
# https://go.dev/dl/
ENV GO_VERSION=1.24.4
ENV GO_SHA256=d5501ee5aca0f258d5fe9bfaed401958445014495dc115f202d43d5210b45241
ENV GO_OS=linux
ENV GO_ARCH=arm64
ENV PATH="/usr/local/go/bin:/root/go/bin:${PATH}"

RUN curl -fsSLO "https://go.dev/dl/go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
    echo "${GO_SHA256} go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" | sha256sum -c - && \
    tar -C /usr/local -xzf "go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
    rm "go${GO_VERSION}.${GO_OS}-${GO_ARCH}.tar.gz" && \
    rm -rf /var/lib/apt/lists/*

# gRPCのドキュメント生成用にgoのライブラリをインストール
RUN go install github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc@v1.5.1

# バリデーション用にgoのライブラリをインストール
RUN go install github.com/envoyproxy/protoc-gen-validate@v1.2.1

# ホットリロード用のライブラリをインストール
RUN cargo install cargo-watch

# Rust用のリンターをインストール
RUN rustup component add clippy

# Rust用のフォーマッターをインストール
RUN rustup component add rustfmt

※Rustのバージョンについては、以前の記事と同様に2025年5月時点で最新の「1.87」を使います。また後で使う各種ライブラリも記述しています。

 

・「.env」
ENV=local
GRPC_PORT=50051
RUST_LOG=info

 

・「compose.yml」

services:
  grpc:
    container_name: rust-grpc-domain
    build:
      context: .
      dockerfile: ./docker/local/rust/Dockerfile
    command: cargo watch -x run
    volumes:
      - .:/app
    ports:
      - "50051:50051"
    # .env.testing利用時に上書きしたい環境変数を設定する
    environment:
      - ENV
      - GRPC_PORT
      - RUST_LOG
    tty: true
    stdin_open: true

※環境変数はデフォルトで「.env」ファイルを読み込みますが、テスト用の「.env.testing」を使う場合は「environment」の定義も必要です。

 

次に以下のコマンドを実行し、DockerコンテナのビルドおよびRustのプロジェクトの初期化を行います。

$ docker compose build --no-cache
$ docker compose run --rm grpc cargo init --name rust_grpc_domain

 

コマンド実行後、下図のようにRustの各種ファイルが作成されればOKです。

 

合わせて作成されたファイル「.gitignore」に「.env」を追加しておきます。

 

スポンサーリンク

共通設定のファイルを追加

次に以下のコマンドを実行し、必要になる各種クレート(ライブラリ)を追加します。

$ docker compose run --rm grpc cargo add tonic@0.13.1
$ docker compose run --rm grpc cargo add tonic-reflection@0.13.1
$ docker compose run --rm grpc cargo add tower
$ docker compose run --rm grpc cargo add serde --features derive
$ docker compose run --rm grpc cargo add hyper
$ docker compose run --rm grpc cargo add envy
$ docker compose run --rm grpc cargo add env_logger
$ docker compose run --rm grpc cargo add log
$ docker compose run --rm grpc cargo add mockall
$ docker compose run --rm grpc cargo add async-trait
$ docker compose run --rm grpc cargo add chrono --features serde
$ docker compose run --rm grpc cargo add thiserror
$ docker compose run --rm grpc cargo add uuid --features v4

※この記事では以前と同様にtonicのバージョンは0.13.1を使います。

 

次に以下のコマンドを実行し、共通設定のファイルを追加します。

$ mkdir -p src/config && touch src/config/config_settings.rs src/config/mod.rs
$ mkdir -p src/application/usecase/context && touch src/application/usecase/context/context_request.rs src/application/usecase/context/mod.rs
$ mkdir -p src/application/usecase/logger && touch src/application/usecase/logger/logger_trait.rs src/application/usecase/logger/mod.rs
$ touch src/application/usecase/mod.rs src/application/mod.rs
$ mkdir -p src/infrastructure/logger && touch src/infrastructure/logger/logger_log.rs src/infrastructure/logger/mod.rs
$ touch src/infrastructure/mod.rs
$ mkdir -p src/presentation/middleware && touch src/presentation/middleware/request_middleware.rs src/presentation/middleware/mod.rs
$ mkdir -p src/presentation/interceptor && touch src/presentation/interceptor/auth_interceptor.rs src/presentation/interceptor/mod.rs
$ touch src/presentation/mod.rs
$ mkdir -p src/domain/error && touch src/domain/error/error_common.rs src/domain/error/mod.rs
$ touch src/domain/mod.rs

 

次に作成したファイルをそれぞれ以下のように記述します。

・「src/config/config_settings.rs」

use envy;
use serde::Deserialize;

// 環境変数のデフォルト値を返す関数
fn default_env() -> String {
    "local".to_string()
}

fn default_grpc_port() -> u16 {
    50051
}

fn default_rust_log() -> String {
    "info".to_string()
}

// 環境変数の構造体
#[derive(Deserialize, Debug)]
pub struct Config {
    #[serde(default = "default_env")]
    pub env: String,
    #[serde(default = "default_grpc_port")]
    pub grpc_port: u16,
    #[allow(dead_code)]
    #[serde(default = "default_rust_log")]
    pub rust_log: String,
}

// 環境変数を返す関数
pub fn get_config() -> Config {
    match envy::from_env::<Config>() {
        Ok(config) => config,
        Err(err) => {
            println!("環境変数の初期化エラー: {}", err);

            // 環境変数にデフォルト値を設定して返す
            Config {
                env: default_env(),
                grpc_port: default_grpc_port(),
                rust_log: default_rust_log(),
            }
        }
    }
}

※環境変数の値をまとめたコンフィグ設定です。

 

・「src/config/mod.rs」

pub mod config_settings;

 

・「src/application/usecase/context/context_request.rs」

use tonic::Request;

// 共通コンテキストの構造体
#[derive(Clone, Debug)]
pub struct ContextRequest {
    pub request_id: String,
    pub uri: String,
}

// リクエスト用コンテキストの作成
pub fn new_context_request<T>(req: &Request<T>) -> ContextRequest {
    let request_id = req
        .metadata()
        .get("x-request-id")
        .map(|value| value.to_str().unwrap_or("-"))
        .unwrap_or("-");

    let uri = req
        .metadata()
        .get("x-uri")
        .map(|value| value.to_str().unwrap_or("-"))
        .unwrap_or("-");

    ContextRequest {
        request_id: request_id.to_string(),
        uri: uri.to_string(),
    }
}

※リクエスト単位で共有させたいコンテキスト情報です。

 

・「src/application/usecase/context/mod.rs」

pub mod context_request;

 

・「src/application/usecase/logger/logger_trait.rs」

// 共通コンテキスト
use crate::application::usecase::context::context_request::ContextRequest;

// ロガーのトレイト(モック化もできるように定義)
#[mockall::automock]
#[async_trait::async_trait]
pub trait LoggerTrait: Send + Sync {
    fn info(&self, ctx: &ContextRequest, msg: &str);
    #[allow(dead_code)]
    fn warn(&self, ctx: &ContextRequest, msg: &str);
    #[allow(dead_code)]
    fn error(&self, ctx: &ContextRequest, msg: &str);
}

※ロガーの実装はインフラストラクチャ層で行います。

 

・「src/application/usecase/logger/mod.rs」

pub mod logger_trait;

 

・「src/application/usecase/mod.rs」

pub mod context;
pub mod logger;

 

・「src/application/mod.rs」

pub mod usecase;

 

・「src/infrastructure/logger/logger_log.rs」

use chrono::TimeZone;
use std::io::Write;

// ロガー用のトレイト
use crate::application::usecase::logger::logger_trait::LoggerTrait;

// 共通コンテキスト
use crate::application::usecase::context::context_request::ContextRequest;

// ロガーの構造体
#[derive(Clone, Debug)]
pub struct Logger {}

impl Logger {
    // ロガーの初期化処理
    pub fn init() {
        // 日本時間を取得
        let jst = chrono::offset::FixedOffset::east_opt(9 * 3600)
            .unwrap()
            .from_utc_datetime(&chrono::Utc::now().naive_utc());

        // カスタムロガーの初期化
        env_logger::builder()
            .format(move |buf, record| {
                writeln!(
                    buf,
                    "{} {} {}",
                    jst.format("%Y-%m-%d %H:%M:%S"),
                    record.level(),
                    record.args()
                )
            })
            .init();
    }

    // インスタンス生成
    pub fn new() -> Self {
        Logger {}
    }

    // コンテキストからリクエスト情報取得
    fn get_req_info_from_ctx(ctx: &ContextRequest) -> String {
        format!("request_id={} uri={}", ctx.request_id, ctx.uri)
    }
}

#[async_trait::async_trait]
impl LoggerTrait for Logger {
    fn info(&self, ctx: &ContextRequest, msg: &str) {
        let req_info = Logger::get_req_info_from_ctx(ctx);
        log::info!("[{}] {}", req_info, msg);
    }

    fn warn(&self, ctx: &ContextRequest, msg: &str) {
        let req_info = Logger::get_req_info_from_ctx(ctx);
        log::warn!("[{}] {}", req_info, msg);
    }

    fn error(&self, ctx: &ContextRequest, msg: &str) {
        let req_info = Logger::get_req_info_from_ctx(ctx);
        log::error!("[{}] {}", req_info, msg);
    }
}

 

・「src/infrastructure/logger/mod.rs」

pub mod logger_log;

 

・「src/infrastructure/mod.rs」

pub mod logger;

 

・「src/presentation/middleware/request_middleware.rs」

use hyper::header::HeaderValue;
use std::{
    pin::Pin,
    task::{Context, Poll},
};
use tonic::server::NamedService;
use tower::{Layer, Service};
use uuid::Uuid;

// 共通コンテキスト
use crate::application::usecase::context::context_request::ContextRequest;

// ロガー
use crate::application::usecase::logger::logger_trait::LoggerTrait;
use crate::infrastructure::logger::logger_log::Logger;

// リクエスト用のミドルウェア
#[derive(Clone)]
pub struct RequestMiddleware<S> {
    inner: S,
}

impl<S> RequestMiddleware<S> {
    fn new(inner: S) -> Self {
        RequestMiddleware { inner }
    }
}

// リクエスト用のミドルウェアにServiceトレイトを実装
impl<B, S> Service<hyper::Request<B>> for RequestMiddleware<S>
where
    S: Service<hyper::Request<B>> + Clone + Send + 'static,
    S::Future: Send,
    B: Send + 'static,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, mut req: hyper::Request<B>) -> Self::Future {
        // リクエストからuriを取得
        let uri_path = req.uri().path();

        // uriをリクエストヘッダーに設定
        let uri_string = match HeaderValue::from_str(uri_path) {
            Ok(value) => {
                req.headers_mut().insert("x-uri", value.clone());
                value.to_str().unwrap().to_string()
            }
            Err(_) => {
                // エラーの場合は「-」を設定
                let header_value = HeaderValue::from_str("-").unwrap();
                req.headers_mut().insert("x-uri", header_value.clone());
                header_value.to_str().unwrap().to_string()
            }
        };

        // request-idをリクエストヘッダーに設定
        let uuid = Uuid::new_v4().to_string();
        req.headers_mut()
            .insert("x-request-id", uuid.clone().parse().unwrap());

        // リクエスト開始ログ
        let ctx = ContextRequest {
            request_id: uuid,
            uri: uri_string,
        };
        let logger = Logger::new();
        logger.info(&ctx, "Execute gRPC request !!");

        // 非同期処理のためself.innerをコピー
        let mut inner = self.inner.clone();

        // 非同期処理
        Box::pin(async move { inner.call(req).await })
    }
}

// リクエスト用のミドルウェアにtonic::server::NamedServiceを実装
impl<S> NamedService for RequestMiddleware<S>
where
    S: NamedService,
{
    const NAME: &'static str = S::NAME;
}

// リクエスト用のミドルウェアのレイヤー
#[derive(Clone)]
pub struct RequestMiddlewareLayer;

// リクエスト用のミドルウェアのレイヤーにLayerトレイトの実装
impl<S> Layer<S> for RequestMiddlewareLayer {
    type Service = RequestMiddleware<S>;

    fn layer(&self, inner: S) -> Self::Service {
        RequestMiddleware::new(inner)
    }
}

※「req.uri().path()」で実行されたメソッドのパスが取得可能です。そしてリクエスト単位で一意のIDを共有できるように「x-request-id」を設定しています。またミドルウェアでリクエストヘッダーに情報を追加すると、gRPCのリクエストのメタデータに情報が追加される。

 

・「src/presentation/middleware/mod.rs」

pub mod request_middleware;

 

・「src/presentation/interceptor/auth_interceptor.rs」

use tonic::{Code, Request, Status};

#[allow(clippy::result_large_err)]
pub fn auth_interceptor(req: tonic::Request<()>) -> Result<Request<()>, Status> {
    // メタデータからx-uriを取得
    let uri = match req.metadata().get("x-uri") {
        Some(value) => value.to_str().unwrap_or_default(),
        None => "-",
    };

    // 対象のuriの場合はスキップ
    let skip_uri: &[&str] = &[
        // "/chat.ChatService/Bidirectional",
    ];
    if skip_uri.contains(&uri) {
        return Ok(req);
    }

    // リクエストヘッダーからトークン取得
    let token = match req.metadata().get("authorization") {
        Some(value) => {
            let bearer_token = value.to_str().unwrap_or_default();
            bearer_token.trim_start_matches("Bearer ")
        }
        None => "",
    };

    // 認証トークンが設定されていない場合はエラー
    if token.is_empty() {
        let status = Status::new(
            Code::InvalidArgument,
            "認証用トークンが設定されていません。",
        );
        return Err(status);
    }

    // TODO: 認証チェック処理を追加

    // 戻り値
    Ok(req)
}

※これは認証処理用のインターセプターです。特定のメソッドに対してだけインターセプターを適用することはできないため、インターセプターでは必要に応じてスキップする処理が必要になる。

 

・「src/presentation/interceptor/mod.rs」

pub mod auth_interceptor;

 

・「src/presentation/mod.rs」

pub mod interceptor;
pub mod middleware;

 

・「src/domain/error/error_common.rs」

use thiserror::Error;
use tonic::Status;

#[derive(Clone, Error, Debug)]
pub enum ErrorCommon {
    #[allow(dead_code)]
    #[error("Internal Server Error")]
    InternalServerError,
    #[allow(dead_code)]
    #[error("{message}")]
    CustomError { status: Status, message: String },
}

※リポジトリの戻り値などで使う共通エラー定義

 

・「src/domain/error/mod.rs」

pub mod error_common;

 

・「src/domain/mod.rs」

pub mod error;

 

スポンサーリンク

DDD(ドメイン駆動設計)のディレクトリ構成について

この後にDDD(ドメイン駆動設計)でAPIを作成していきますが、ディレクトリ構成としてはDDDの思想に基づいたレイヤードアーキテクチャを採用しています。

/proto(スキーマ定義)
/src
 ├── /application(アプリケーション層)
 |    └── usecase(ユースケース層)
 |
 ├── /config(コンフィグ設定)
 |
 ├── /domain(ドメイン層)
 |    ├── model(ドメインモデルの定義。ビジネスロジックは可能な限りドメインに集約させる。)
 |    ├── (仮)repository(リポジトリのインターフェース定義)
 |    └── (仮)service(外部サービスのインターフェース定義)
 |
 ├── /infrastructure(インフラストラクチャ層)
 |    ├── logger(ロガーの実装。インターフェース部分はユースケース層で定義。)
 |    ├── (仮)database(データベース設定)
 |    ├── (仮)persistence(リポジトリの実装。DB操作による永続化層。)
 |    ├── (仮)cache(キャッシュを含めたリポジトリの実装。インターフェースはリポジトリと同一。)
 |    └── (仮)externalapi(外部サービスの実装)
 |
 ├── /presentation(プレゼンテーション層)
 |    ├── interceptor(インターセプターの定義)
 |    ├── middleware(ミドルウェアの定義)
 |    ├── router(ルーター設定)
 |    └── server(サーバー層(ハンドラー層))
 |
 └── /registry(レジストリ。依存注入によるユースケースのインスタンスをAppStateにまとめる。)

※(仮)のものは将来的に追加する想定の例です。

 

Chatドメインを例にAPIを作る

次に以下の手順でChatドメインを例に、gRPCの双方向ストリーミングを利用したAPIを作成します。

 

ドメインの定義

まずは以下のコマンドを実行し、各種ファイルを作成します。

$ mkdir -p src/domain/chat
$ touch src/domain/chat/chat_model.rs src/domain/chat/chat_model_test.rs src/domain/chat/mod.rs

 

次に作成したファイルをそれぞれ以下のように記述します。

・「src/domain/chat/chat_model.rs」

use serde::{Deserialize, Serialize};

// chatモデル
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Chat {
    pub input_text: String,
}

impl Chat {
    // 新規作成
    pub fn new(input_text: String) -> Self {
        Self { input_text }
    }

    // 大文字変換
    pub fn text_to_upper(&self) -> String {
        self.input_text.to_uppercase()
    }
}

 

・「src/domain/chat/chat_model_test.rs」

#[cfg(test)]
mod tests {
    use crate::domain::chat::chat_model::Chat;

    #[test]
    fn test_new_chat() {
        // textパラメータ
        let text = "hello".to_string();

        // テスト実行
        let chat = Chat::new(text.clone());

        // 検証
        assert_eq!(chat.input_text, text);
    }

    #[test]
    fn test_text_to_upper() {
        // textパラメータ
        let text = "hello".to_string();

        // テスト実行
        let chat = Chat::new(text.clone());

        // 検証
        assert_eq!(chat.text_to_upper(), "HELLO");
    }
}

 

・「src/domain/chat/mod.rs」

pub mod chat_model;

// テストコード用のモジュール
pub mod chat_model_test;

 

次にファイル「src/domain/mod.rs」を以下のように修正します。

pub mod chat;
pub mod error;

 

リポジトリやサービスの実装

今回は省略しているのでありませんが、DB操作や外部サービスを利用する場合、infrastructure層に実装します。

 

スキーマ定義

次にAPIのスキーマを定義します。まずは以下のコマンドを実行し、必要なクレート(ライブラリ)を追加します。

$ docker compose run --rm grpc cargo add --build tonic-build@0.13.1 --features prost
$ docker compose run --rm grpc cargo add prost@0.13.5
$ docker compose run --rm grpc cargo add prost-validate@0.2.7 --features derive
$ docker compose run --rm grpc cargo add --build prost-build@0.13.5
$ docker compose run --rm grpc cargo add --build prost-validate-build@0.2.7
$ docker compose run --rm grpc cargo add tokio --features full

※tonicのバージョン0.13.1に合わせてprostのバージョンを合わせるようにします。バージョンの組み合わせによって互換性が無かったりするため、上記のように固定推薦です。

 

次に以下のコマンドを実行し、各種ファイルを作成します。

$ mkdir -p proto/chat && touch proto/chat/chat.proto
$ mkdir doc
$ touch build.rs

 

次に作成したファイルをそれぞれ以下のように記述します。

・「proto/chat/chat.proto」

syntax = "proto3";

import "validate/validate.proto";

package chat;

option go_package="pb/chat";

message TextInput {
  string text = 1 [(validate.rules).string.min_len = 1];
}

message TextOutput {
  string text = 1;
}

// サンプルサービス
service ChatService {
  // 双方向ストリーミング(複数のリクエスト-複数のレスポンス)
  rpc Bidirectional(stream TextInput) returns (stream TextOutput) {}
    // Returns:
    // - 0 OK: TextOutputを出力
    // - 2 Unknown: 不明なエラー
    // - 3 INVALID_ARGUMENT: バリデーションエラー)
}

※gRPCはこのprotoファイルを元にAPIを作っていくため、スキーマ駆動開発です。ここでは双方向ストリーミング(複数のリクエスト-複数のレスポンス)のメソッドを定義しています。

 

・「build.rs」

use std::{env, path::PathBuf};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // コンフィグ設定を定義
    let files = &["proto/chat/chat.proto"];
    let includes = &[
        "proto",
        "../root/go/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v1.2.1",
    ];
    let file_descriptor_set_path =
        PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR is not set")).join("chat_descriptor.bin");
    let mut config = {
        let mut c = prost_build::Config::new();
        c.service_generator(
            tonic_build::configure()
                .message_attribute(".", "#[derive(::prost_validate::Validator)]")
                .service_generator(),
        );
        c
    };

    // コンフィグ設定にバリデーションを適用
    prost_validate_build::Builder::new()
        .file_descriptor_set_path(file_descriptor_set_path)
        .configure(&mut config, files, includes)?;

    // コンフィング設定からコード生成
    config.compile_protos(files, includes)?;

    Ok(())
}

※Rustではルートディレクトリにこのbuild.rsを作り、このファイルでgRPC用のProtocol Buffersのコードを生成します。バリデーション用のコードも合わせて生成するためにprost_validate_buildを使っています。バリデーション用のコード生成にGoのライブラリを使用しているため、includesの「../root/go/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v1.2.1」でコンテナ内のライブラリのパスを指定する必要があります。

 

次に以下のコマンドを実行し、protoファイルからgRPCのドキュメントも生成しておきます。

$ docker compose run --rm grpc protoc -I=.:../root/go/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v1.2.1 --doc_out=./doc --doc_opt=markdown,docs.md ./proto/chat/chat.proto

※ドキュメント生成にはDockerfileでインストールしたGo言語のライブラリを使います。

 

ユースケースの定義

次にユースケースを定義します。まずは以下のコマンドを実行し、必要になるクレート(ライブラリ)を追加しておきます。

$ docker compose run --rm grpc cargo add tokio-stream --features full
$ docker compose run --rm grpc cargo add tonic-mock

 

次に以下のコマンドを実行して各種ファイルを作成します。

$ mkdir -p src/application/usecase/chat
$ touch src/application/usecase/chat/chat_bidirectional.rs src/application/usecase/chat/chat_bidirectional_test.rs src/application/usecase/chat/mod.rs

 

次に作成したファイルをそれぞれ以下のように記述します。

・「src/application/usecase/chat/chat_bidirectional.rs」

// tonic
use tonic::{
    Request, Response, Status, Streaming,
    metadata::{Ascii, MetadataValue},
};

// tokio
use tokio::{
    spawn,
    sync::mpsc,
    time::{Duration, sleep},
};
use tokio_stream::{StreamExt, wrappers::ReceiverStream};

// 変換用
use std::str::FromStr;

// バリデーション用のトレイト
use prost_validate::Validator;

// Arc(ヒープ上に確保されたある値の所有権を、複数のスレッド間で安全に共有するためのスマートポインタ)
use std::sync::Arc;

// 共通コンテキスト
use crate::application::usecase::context::context_request::new_context_request;

// ロガー
use crate::application::usecase::logger::logger_trait::LoggerTrait;

// Chatドメイン
use crate::domain::chat::chat_model::Chat;

// ビルドしたprotoファイルのインポート
pub mod chat_proto {
    // protoファイルに定義したpakage名を指定
    tonic::include_proto!("chat");
}

// ストリーミング用の型定義
type BidirectionalStream = ReceiverStream<Result<chat_proto::TextOutput, Status>>;

// ユースケース用のトレイト(モック化もできるように定義)
#[mockall::automock]
#[async_trait::async_trait]
pub trait ChatBidirectionalUsecaseTrait {
    async fn exec(
        &self,
        request: Request<Streaming<chat_proto::TextInput>>,
    ) -> Result<Response<BidirectionalStream>, Status>;
}

// 実行するユースケースの構造体
#[derive(Clone)]
pub struct ChatBidirectionalUsecase {
    pub logger: Arc<dyn LoggerTrait + 'static>,
}

impl ChatBidirectionalUsecase {
    pub fn new(logger: Arc<dyn LoggerTrait + 'static>) -> Self {
        ChatBidirectionalUsecase { logger }
    }
}

#[async_trait::async_trait]
impl ChatBidirectionalUsecaseTrait for ChatBidirectionalUsecase {
    async fn exec(
        &self,
        request: Request<Streaming<chat_proto::TextInput>>,
    ) -> Result<Response<BidirectionalStream>, Status> {
        // コンテキスト設定
        let ctx = new_context_request(&request);

        // トレーラー用
        let x_request_id = MetadataValue::<Ascii>::from_str(ctx.request_id.as_str()).expect("-");

        // リクエストからストリームを取り出す
        let mut stream = request.into_inner();

        // mpsc (multi-producer, single-consumer) チャンネルの作成
        // サーバーはこのチャンネルにデータ送信し、その後クライアントにストリーミングする
        // バッファサイズは適宜調整が必要
        let (tx, rx) = mpsc::channel(1);

        // ロガー
        let logger = self.logger.clone();

        spawn(async move {
            logger.info(&ctx, "Start Bidirectional Stream !!");
            while let Some(result) = stream.next().await {
                match result {
                    Ok(req) => {
                        // バリデーションチェック
                        match req.validate() {
                            Ok(_) => {}
                            Err(e) => {
                                let msg = format!("バリデーションエラー: {}", e);
                                logger.error(&ctx, &msg);
                                let mut status = Status::invalid_argument(msg);
                                status
                                    .metadata_mut()
                                    .insert("x-request-id", x_request_id.clone());
                                let _ = tx.send(Err(status)).await;
                                break;
                            }
                        }

                        // Chatモデルの設定
                        let chat = Chat::new(req.text);

                        // レスポンスの設定
                        let res = chat_proto::TextOutput {
                            text: chat.text_to_upper(),
                        };

                        // Okでラップしたレスポンスをtxで送信
                        if let Err(e) = tx.send(Ok(res)).await {
                            // クライアントの接続切れなどでエラーの場合
                            let msg = format!("Failed to send data: {:?}", e);
                            logger.error(&ctx, &msg);
                            let mut status = Status::invalid_argument(msg);
                            status
                                .metadata_mut()
                                .insert("x-request-id", x_request_id.clone());
                            let _ = tx.send(Err(status)).await;
                            break;
                        }

                        // 1秒間待機処理
                        sleep(Duration::from_secs(1)).await;
                    }
                    Err(e) => {
                        // クライアントの接続切れなどでエラーの場合
                        let msg = format!("Failed to send data: {:?}", e);
                        logger.error(&ctx, &msg);
                        let mut status = Status::invalid_argument(msg);
                        status
                            .metadata_mut()
                            .insert("x-request-id", x_request_id.clone());
                        let _ = tx.send(Err(status)).await;
                        break;
                    }
                }
            }

            // トレーラーの設定
            let mut status = Status::ok("Stream finished successfully");
            status
                .metadata_mut()
                .insert("x-request-id", x_request_id.clone());

            // Errでラップしたステータスを送信
            if let Err(e) = tx.send(Err(status)).await {
                // クライアントの接続切れなどでエラーの場合
                let msg = format!("Failed to send data: {:?}", e);
                logger.error(&ctx, &msg);
                let mut status = Status::invalid_argument(msg);
                status.metadata_mut().insert("x-request-id", x_request_id);
                let _ = tx.send(Err(status)).await;
            }

            logger.info(&ctx, "Finish Bidirectional Stream !!");
        });

        Ok(Response::new(ReceiverStream::new(rx)))
    }
}

 

・「src/application/usecase/chat/chat_bidirectional_test.rs」

#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use tokio_stream::StreamExt;
    use tonic_mock::streaming_request;

    // ロガーのモック
    use crate::application::usecase::logger::logger_trait::MockLoggerTrait;

    // ユースケース
    use crate::application::usecase::chat::chat_bidirectional::{
        ChatBidirectionalUsecase, ChatBidirectionalUsecaseTrait, chat_proto,
    };

    #[tokio::test]
    async fn test_exec_success() {
        // ロガーのモック化
        let mut mock_logger = MockLoggerTrait::new();
        mock_logger.expect_info().returning(|_, _| ());

        // ユースケースのインスタンス化
        let chat_bidirectional_usecase = ChatBidirectionalUsecase {
            logger: Arc::new(mock_logger),
        };

        // 送信するメッセージリスト作成
        let message_lists = vec![chat_proto::TextInput {
            text: "hello".to_string(),
        }];

        // tonic-mockを使ってストリーミングリクエストを作成
        let request = streaming_request(message_lists);

        // テスト実行
        let res = chat_bidirectional_usecase.exec(request).await;

        // 検証
        assert!(res.is_ok());

        // ストリームの検証
        let mut stream = res.unwrap().into_inner();
        let mut i = 0;
        let outputs = vec![chat_proto::TextOutput {
            text: "HELLO".to_string(),
        }];
        while let Some(result) = stream.next().await {
            match result {
                Ok(res) => {
                    let msg = outputs[i].text.clone();
                    assert_eq!(res.text, msg);
                }
                Err(_) => {}
            }
            i += 1;
        }
    }

    #[tokio::test]
    async fn test_exec_validation_error() {
        // ロガーのモック化
        let mut mock_logger = MockLoggerTrait::new();
        mock_logger.expect_info().returning(|_, _| ());
        mock_logger.expect_error().returning(|_, _| ());

        // ユースケースのインスタンス化
        let chat_bidirectional_usecase = ChatBidirectionalUsecase {
            logger: Arc::new(mock_logger),
        };

        // 送信するメッセージリスト作成
        let message_lists = vec![chat_proto::TextInput {
            text: "".to_string(),
        }];

        // tonic-mockを使ってストリーミングリクエストを作成
        let request = streaming_request(message_lists);

        // テスト実行
        let res = chat_bidirectional_usecase.exec(request).await;

        // 検証
        assert!(res.is_ok());

        // ストリームの検証
        let mut stream = res.unwrap().into_inner();
        while let Some(result) = stream.next().await {
            match result {
                Ok(_) => {}
                Err(err) => {
                    assert_eq!(err.code(), tonic::Code::InvalidArgument);
                    assert!(err.message().contains("バリデーションエラー"));
                    break;
                }
            }
        }
    }
}

 

・「src/application/usecase/chat/mod.rs」

pub mod chat_bidirectional;

// テストコード用のモジュール
pub mod chat_bidirectional_test;

 

次にファイル「src/application/usecase/mod.rs」を以下のように修正します。

pub mod chat;
pub mod context;
pub mod logger;

 

レジストリ登録

次にサーバー定義(ハンドラー定義)で利用するユースケースのインスタンスをまとめるため、以下のコマンドを実行してレジストリ登録用のファイルを作成します。

$ mkdir -p src/registry
$ touch src/registry/registry_settings.rs src/registry/mod.rs

 

次に作成したファイルをそれぞれ以下のように記述します。

・「src/registry/registry_settings.rs」

use std::sync::Arc;

// ロガー
use crate::infrastructure::logger::logger_log::Logger;

// ユースケース
use crate::application::usecase::chat::chat_bidirectional::ChatBidirectionalUsecase;

// Chatユースケース
#[derive(Clone)]
pub struct ChatUsecase {
    pub chat_bidirectional: ChatBidirectionalUsecase,
}

// アプリケーション全体で共有する状態(DIコンテナ)
#[derive(Clone)]
pub struct AppState {
    pub chat_usecase: ChatUsecase,
}

impl AppState {
    pub async fn new() -> Self {
        // ロガー設定
        let logger = Arc::new(Logger::new());

        // Chatユースケースのインスタンス化とまとめ
        let chat_bidirectional_usecase = ChatBidirectionalUsecase::new(logger.clone());
        let chat_usecase = ChatUsecase {
            chat_bidirectional: chat_bidirectional_usecase,
        };

        // 戻り値の設定
        Self { chat_usecase }
    }
}

 

・「src/registry/mod.rs」

pub mod registry_settings;

 

サーバーの定義

次にサーバー定義(ハンドラー定義)をするため、以下のコマンドを実行して各種ファイルを作成します。

$ mkdir -p src/presentation/server/grpc/chat
$ touch src/presentation/server/grpc/chat/chat_server.rs src/presentation/server/grpc/chat/chat_server_test.rs src/presentation/server/grpc/chat/mod.rs
$ touch src/presentation/server/grpc/mod.rs src/presentation/server/mod.rs

 

次に作成したファイルをそれぞれ以下のように記述します。

・「src/presentation/server/grpc/chat/chat_server.rs」

// tonic
use tonic::{Request, Response, Status, Streaming};

// tokio
use tokio_stream::wrappers::ReceiverStream;

// Arc(ヒープ上に確保されたある値の所有権を、複数のスレッド間で安全に共有するためのスマートポインタ)
use std::sync::Arc;

// レジストリ
use crate::registry::registry_settings::AppState;

// Chatユースケース
use crate::application::usecase::chat::chat_bidirectional::{
    ChatBidirectionalUsecaseTrait, chat_proto,
};

// 構造体定義
pub struct ChatServer {
    state: Arc<AppState>,
}

// protoファイルの関数の実装をメソッド定義
#[tonic::async_trait]
impl chat_proto::chat_service_server::ChatService for ChatServer {
    // 双方向ストリーミングの追加(typeの定義必須)
    // ※protoファイルで定義した名称+Streamという型の定義が必要になる
    type BidirectionalStream = ReceiverStream<Result<chat_proto::TextOutput, Status>>;

    async fn bidirectional(
        &self,
        request: Request<Streaming<chat_proto::TextInput>>,
    ) -> Result<Response<Self::BidirectionalStream>, Status> {
        // ユースケースの実行
        self.state
            .chat_usecase
            .chat_bidirectional
            .exec(request)
            .await
    }
}

// ルーターに設定するサーバー定義を返す関数
pub fn get_chat_server(
    state: Arc<AppState>,
) -> chat_proto::chat_service_server::ChatServiceServer<ChatServer> {
    let chat_server = ChatServer { state };
    chat_proto::chat_service_server::ChatServiceServer::new(chat_server)
}

 

・「src/presentation/server/grpc/chat/chat_server_test.rs」

#[cfg(test)]
mod tests {
    use tokio_stream::{StreamExt, iter};
    use tonic::Request;

    // コンフィグ設定
    use crate::config::config_settings::get_config;

    use crate::application::usecase::chat::chat_bidirectional::chat_proto::{
        TextInput, TextOutput, chat_service_client::ChatServiceClient,
    };

    #[tokio::test]
    async fn bidirectional_should_return_succeed() {
        let config = get_config();

        // サーバーのインスタンス作成
        let mut client =
            ChatServiceClient::connect(format!("http://localhost:{}", config.grpc_port))
                .await
                .unwrap();

        // 送信するメッセージリスト作成
        let message_lists = vec![TextInput {
            text: "hello".to_string(),
        }];

        // ストリームの作成
        let mut request = Request::new(iter(message_lists.clone()));

        // メタデータにBearerトークンを追加
        request
            .metadata_mut()
            .insert("authorization", "Bearer token".parse().unwrap());

        // テストの実行
        let res = client.bidirectional(request).await;

        // 検証
        assert!(res.is_ok());

        // ストリームの検証
        let mut stream = res.unwrap().into_inner();
        let mut i = 0;
        let outputs = vec![TextOutput {
            text: "HELLO".to_string(),
        }];
        while let Some(result) = stream.next().await {
            match result {
                Ok(res) => {
                    let msg = outputs[i].text.clone();
                    assert_eq!(res.text, msg);
                }
                Err(_) => {}
            }
            i += 1;
        }
    }

    #[tokio::test]
    async fn bidirectional_should_return_auth_error() {
        let config = get_config();

        // サーバーのインスタンス作成
        let mut client =
            ChatServiceClient::connect(format!("http://localhost:{}", config.grpc_port))
                .await
                .unwrap();

        // 送信するメッセージリスト作成
        let message_lists = vec![TextInput {
            text: "hello".to_string(),
        }];

        // ストリームの作成
        let request = Request::new(iter(message_lists.clone()));

        // テストの実行
        let res = client.bidirectional(request).await;

        // 検証
        assert!(res.is_err());
        let err = res.unwrap_err();
        assert_eq!(err.code(), tonic::Code::InvalidArgument);
        assert!(
            err.message()
                .contains("認証用トークンが設定されていません。")
        );
    }
}

 

・「src/presentation/server/grpc/chat/mod.rs」

pub mod chat_server;

// テストコード用のモジュール
pub mod chat_server_test;

 

・「src/presentation/server/grpc/mod.rs」

pub mod chat;

 

・「src/presentation/server/mod.rs」

pub mod grpc;

 

ルーター設定の追加

次にルーター設定をするため、以下のコマンドを実行してファイルを作成します。

$ mkdir -p src/presentation/router && touch src/presentation/router/router_settings.rs src/presentation/router/mod.rs

 

・「src/presentation/router/router_settings.rs」

// tonic
use tonic::{service::InterceptorLayer, transport::Server};
use tonic_reflection::server::Builder as ReflectionBuilder;

// tower
use tower::ServiceBuilder;

// Arc(ヒープ上に確保されたある値の所有権を、複数のスレッド間で安全に共有するためのスマートポインタ)
use std::sync::Arc;

// レジストリ
use crate::registry::registry_settings::AppState;

// コンフィング設定
use crate::config::config_settings::get_config;

// ミドルウェア
use crate::presentation::middleware::request_middleware::RequestMiddlewareLayer;

// インターセプター
use crate::presentation::interceptor::auth_interceptor::auth_interceptor;

// gRPCサーバーのサービス
use crate::presentation::server::grpc::chat::chat_server::get_chat_server;

// protoファイル
pub mod proto {
    pub const CHAT_DESCRIPTOR: &[u8] = tonic::include_file_descriptor_set!("chat_descriptor");
}

pub async fn router(state: Arc<AppState>) -> Result<(), Box<dyn std::error::Error>> {
    // 環境変数取得
    let config = get_config();

    // アドレス設定
    let grpc_port = config.grpc_port;
    let addr = format!("[::]:{}", grpc_port).parse()?;

    // サーバー設定
    let chat_server = get_chat_server(state.clone());

    // サーバーリフレクション設定(旧ツールではv1alphaを使う)
    let reflection_service_v1 = ReflectionBuilder::configure()
        .register_encoded_file_descriptor_set(proto::CHAT_DESCRIPTOR)
        .build_v1()?;

    let reflection_service_v1alpha = ReflectionBuilder::configure()
        .register_encoded_file_descriptor_set(proto::CHAT_DESCRIPTOR)
        .build_v1alpha()?;

    // サービス設定
    let service = ServiceBuilder::new()
        .layer(RequestMiddlewareLayer)
        .layer(InterceptorLayer::new(auth_interceptor))
        .service(chat_server);

    // サーバー起動
    Server::builder()
        .add_service(reflection_service_v1)
        .add_service(reflection_service_v1alpha)
        .add_service(service)
        .serve(addr)
        .await?;

    Ok(())
}

 

・「src/presentation/router/mod.rs」

pub mod router_settings;

 

次にファイル「src/presentation/mod.rs」を以下のように修正します。

pub mod interceptor;
pub mod middleware;
pub mod router;
pub mod server;

 

main.goの修正

次にファイル「src/main.go」を以下のように修正します。

// Arc(ヒープ上に確保されたある値の所有権を、複数のスレッド間で安全に共有するためのスマートポインタ)
use std::sync::Arc;

// モジュールのインポート
mod application;
mod config;
mod domain;
mod infrastructure;
mod presentation;
mod registry;

// コンフィグ設定
use crate::config::config_settings::get_config;

// ルーター設定
use crate::presentation::router::router_settings::router;

// ロガー設定
use crate::infrastructure::logger::logger_log::Logger;

// レジストリ設定
use crate::registry::registry_settings::AppState;

#[tokio::main]
async fn main() {
    // 環境変数取得
    let config = get_config();

    // ロガーの初期化
    Logger::init();

    // サーバー起動のログ出力
    log::info!("[ENV={}] Start rust_grpc_domain !!", config.env);

    // サーバー起動
    let state = Arc::new(AppState::new().await);
    router(state).await.unwrap();
}

 

次に以下のコマンドを実行し、フォーマット修正および静的コード解析を行い、警告が出ないことを確認します。

$ docker compose run --rm grpc cargo fmt
$ docker compose run --rm grpc cargo clippy

 

コンテナの再ビルドと起動

次に以下のコマンドを実行し、コンテナを再びルドします。

$ docker compose build --no-cache

 

次に以下のコマンドを実行し、コンテナを起動します。

$ docker compose up -d

 

次に以下のコマンドを実行し、ログ出力を確認します。

$ docker compose logs

 

ログ出力を確認し、エラーがなければOKです。

 

スポンサーリンク

ChatドメインのAPIを試す

次に上記で作成したChatドメインのAPIをPostmanを使って試します。

まずはgRPC用のリクエスト画面を表示し、URLに「localhost:50051」を入力後、タブ「認可」からBearerトークンを設定して下さい。

※インターセプターで認証チェックを入れているため

 

次にタブ「サービス定義」などからサーバーリフレクションを使って利用できるメソッドを読み込みます。

※「src/presentation/router/router_settings.rs」の「ReflectionBuilder::configure()」部分でサーバーリフレクションの設定をしています。

 

次にメソッド「Bidirectional」を選択し、「呼び出す」をクリックします。

 

次にリクエストボディを設定し、画面右下の「送信」をクリックしてリクエストを送信します。

リクエスト送信後、想定通りのレスポンス結果が返ってこればOKです。

※接続を終了する場合は「ストリーミングを終了」をクリックして下さい。

 

データベース関連について

今回はデータベースに関する部分は省略しています。必要な場合は以下の記事を参考にしてみて下さい。

RustのaxumでバックエンドAPIを開発する方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。パフォーマンスやメモリ安全性を最重視してAPIを作りたい場合、「Rust」というプログラミング言語が候補に上がります。ただRustについてはまだまだ普及しておらず、学習コストも非常...
RustのaxumでDDD(ドメイン駆動設計)構成のバックエンドAPIを開発する方法まとめ
こんにちは。Tomoyuki(@tomoyuki65)です。Rustについては当ブログなどもきっかけになったりして、これから流行っていく可能性があるプログラミング言語ですが、これまではクリーンアーキテクチャを参考にしてRustのAPIの作り...

 

スポンサーリンク

最後に

今回はRustのtonic(gRPC)でDDD構成のバックエンドAPIを開発する方法について解説しました。

実務ではドメイン駆動設計での開発が求められることが多いと思うので、Rustのtonic(gRPC)でDDD(ドメイン駆動設計)が必要になる場合は、ぜひ参考にしてみて下さい!

 

この記事を書いた人
Tomoyuki

SE→ブロガーを経て、現在はWeb系エンジニアをしています!

Tomoyukiをフォローする
応用
スポンサーリンク
Tomoyukiをフォローする

コメント

タイトルとURLをコピーしました