본문으로 건너뛰기

커스텀 인덱서 만들기

이 가이드는 커스텀 인덱서(custom indexer)를 구축하는 실용적인 예시를 제공한다. indexer framework의 개념적 개요는 커스텀 인덱싱 프레임워크인덱서 파이프라인 아키텍처를 참고한다.

완전한 커스텀 인덱서를 구축하려면 sui-indexer-alt-framework를 사용한다. 다음 단계에서는 Sui checkpoint에서 transaction digest를 추출하여 로컬 PostgreSQL에 저장하는 sequential pipeline을 만드는 방법을 보여준다. framework의 source code for the framework는 GitHub의 Sui repo에서 찾을 수 있다.

이 예제는 간결함과 기본 지원을 위해 Diesel(널리 사용되는 Rust ORM 및 query builder)과 함께 PostgreSQL을 사용하지만, sui-indexer-alt-framework는 유연한 storage를 위해 설계되었다. Diesel을 사용하지 않으려면 MongoDB, CouchDB 같은 다른 database를 사용하거나 다른 database client를 활용할 수 있다. 이를 위해 framework의 StoreConnection trait을 구현하고 Handler::commit() 메서드 안에 database write logic을 직접 정의한다.

Click to open

Check installation

시스템에 필요한 소프트웨어가 제대로 설치되어 있는지 확신이 서지 않는다면 다음 명령으로 설치를 확인할 수 있다.

$ psql --version
$ diesel --version

다음 단계에서는 다음과 같은 indexer를 생성하는 방법을 보여준다:

  • Sui Testnet에 연결한다: https://checkpoints.testnet.sui.io 에 있는 remote checkpoint store를 fallback source로 사용한다. Testnet은 여전히 전체 checkpoint history를 제공한다.
  • checkpoint를 처리한다: checkpoint 데이터를 지속적으로 스트리밍한다.
  • transaction 데이터를 추출한다: 각 checkpoint에서 transaction digest를 끌어온다.
  • 로컬 PostgreSQL에 저장한다: 데이터를 로컬 PostgreSQL 데이터베이스에 commit한다.
  • sequential pipeline을 구현한다: 최적의 일관성과 성능을 위해 in-order 처리와 batching을 사용한다.

결국에는 모든 핵심 framework 개념을 보여 주고 더 복잡한 커스텀 indexer의 기반으로 사용할 수 있는 동작하는 indexer를 갖게 된다.

정보

Sui는 Mainnet과 Testnet 모두에 대해 checkpoint store를 제공한다.

  • Testnet: https://checkpoints.testnet.sui.io with full history
  • Mainnet: https://checkpoints.mainnet.sui.io for the most recent 30 days only

##step Project setup

먼저 indexer 프로젝트를 저장하려는 디렉터리에서 콘솔을 연다. cargo new 명령을 사용해 새 Rust 프로젝트를 만든 다음 해당 디렉터리로 이동한다.

$ cargo new simple-sui-indexer
$ cd simple-sui-indexer

##step Configure dependencies

Cargo.toml 코드를 다음 설정으로 교체한 뒤 저장한다.

[package]
name = "basic-sui-indexer"
version = "0.1.0"
edition = "2024"

[dependencies]
# Core framework dependencies
sui-indexer-alt-framework = { git = "https://github.com/MystenLabs/sui.git", branch = "testnet" }

# Async runtime
tokio = { version = "1.0", features = ["full"] }

# Error handling
anyhow = "1.0"

# Diesel PostgreSQL
diesel = { version = "2.0", features = ["postgres", "r2d2"] }
diesel-async = { version = "0.5", features = ["bb8", "postgres", "async-connection-wrapper"] }
diesel_migrations = "2.0"

# Async traits
async-trait = "0.1"

# URL parsing
url = "2.0"

# Use .env file
dotenvy = "0.15"

# Command line parsing
clap = { version = "4.0", features = ["derive"] }

이제 manifest에는 다음 dependency가 포함되어 있다:

  • sui-indexer-alt-framework: pipeline 인프라를 제공하는 core framework이다.
  • diesel/diesel-async: asynchronous 지원을 제공하는 type-safe database ORM이다.
  • tokio: framework에 필요한 async runtime이다.
  • clap: 설정을 위한 command-line argument parsing을 제공한다.
  • anyhow: trait 구현을 위한 error handling과 async-trait을 제공한다.
  • dotenvy: PostgreSQL URL을 저장하는 .env 파일을 읽어 들인다.

##step Create database

migrations를 구성하기 전에 로컬 PostgreSQL 데이터베이스를 생성하고 확인해야 한다:

$ createdb sui_indexer

연결 정보를 가져와야 한다:

$ psql sui_indexer -c "\conninfo"

성공하면 콘솔에 다음과 유사한 메시지가 표시되어야 한다:

You are connected to database "sui_indexer" as user "username" via socket in "/tmp" at port "5432".

다음과 비슷한 createdb 오류를 받는 경우이다

createdb: error: connection to server on socket "/tmp/.s.PGSQL.5432" failed: FATAL:  role "username" does not exist

이는 오류 메시지에 제공된 이름으로 username을 바꾸어 사용자를 생성해야 함을 의미한다.

$ sudo -u postgres createuser --superuser username

프롬프트가 표시되면 pgAdmin 계정의 비밀번호를 입력한 다음 createdb 명령을 다시 시도한다.

이제 이후 명령에서 사용될 데이터베이스 URL을 변수로 설정할 수 있으며 username을 실제 사용자 이름으로 변경해야 한다.

$ PSQL_URL=postgres://username@localhost:5432/sui_indexer

이제 다음 명령으로 연결을 테스트할 수 있다:

$ psql $PSQL_URL -c "SELECT 'Connected';"

성공하면 콘솔이나 터미널에 다음과 유사한 메시지가 표시되어야 한다:

?column?
-----------
Connected
(1 row)

##step Database setup

코드를 작성하기 전에 이전 단계에서 로컬 PostgreSQL 데이터베이스를 설정했는지 반드시 확인해야 하며, 이는 indexer가 추출된 transaction 데이터를 저장하는 데 필요하다.

다음 데이터베이스 설정 단계에서는 다음을 수행한다:

  1. 데이터를 저장할 데이터베이스 테이블을 생성한다.
  2. 이 과정을 관리하기 위해 Diesel을 사용한다.
  3. 데이터베이스 테이블에 매핑되는 Rust 코드를 생성한다.

###substep Configure Diesel

먼저 데이터베이스 migration을 구성하기 위해 (cargo.toml과 동일한 폴더에) diesel.toml 파일을 생성한다.

$ touch diesel.toml

파일을 다음 코드로 업데이트하고 저장한다:

[print_schema]
file = "src/schema.rs"

[migrations_directory]
dir = "migrations"

###substep Create database table using Diesel migrations

Diesel migration은 SQL 파일을 사용해 데이터베이스 테이블을 생성하고 관리하는 방식이다. 각 migration은 두 개의 파일로 구성된다:

  • up.sql: 테이블을 생성하고 변경한다.
  • down.sql: 변경 사항을 제거하고 되돌린다.

데이터베이스 URL을 --database-url 인수로 전달하여 필요한 디렉터리 구조를 생성하려면 diesel setup 명령을 사용한다.

$ diesel setup --database-url $PSQL_URL

프로젝트 루트에서 diesel migration 명령을 사용해 migration 파일을 생성한다.

$ diesel migration generate transaction_digests

이제 프로젝트에는 migrations 폴더가 있어야 한다. 이 폴더 안에는 이름 형식이 YYYY-MM-DD-HHMMSS_transaction_digests인 하위 디렉터리가 하나 있어야 한다. 이 폴더에는 up.sql 파일과 down.sql 파일이 들어 있어야 한다.

up.sql을 열어 실제 폴더 이름을 사용해 내용을 다음 코드로 교체한다:

CREATE TABLE IF NOT EXISTS transaction_digests (
tx_digest TEXT PRIMARY KEY,
checkpoint_sequence_number BIGINT NOT NULL
);

이 예제에서는 tx_digest에 대해 TEXT 데이터 타입을 사용하지만, 운영 환경의 indexer에서는 BYTEA 데이터 타입을 사용하는 것이 모범 사례이다.

TEXT 타입은 transaction digest를 쉽게 읽을 수 있고 외부 도구에서 바로 사용할 수 있도록 하기 위해 사용된다. digest는 Base58로 인코딩되며 PostgreSQL은 이 형식으로 BYTEA 데이터를 기본적으로 표시할 수 없기 때문에 이를 TEXT로 저장하면 쿼리에서 digest를 복사해 SuiScan과 같은 explorer에 붙여넣어 바로 사용할 수 있다.

그러나 운영 환경에서는 BYTEA를 강력히 권장한다. 이는 raw byte 표현을 저장함으로써 문자열보다 더 압축된 형태로 공간을 사용하고 비교 역시 훨씬 빠르게 수행할 수 있어 우수한 storage 및 쿼리 효율을 제공하기 때문이다. 더 많은 정보를 확인하려면 CYBERTEC 웹사이트의 Binary data performance in PostgreSQL을 참고한다.

up.sql을 저장한 다음 편집을 위해 down.sql을 연다. 파일 내용을 다음 코드로 교체하고 저장한다:

###substep Apply migration and generate Rust schema

프로젝트 루트에서 diesel migration 명령을 사용해 테이블을 생성한다.

$ diesel migration run --database-url $PSQL_URL

그런 다음 diesel print-schema 명령을 사용해 실제 데이터베이스에서 schema.rs 파일을 생성한다.

$ diesel print-schema --database-url $PSQL_URL > src/schema.rs

이제 src/schema.rs 파일은 다음과 같은 모습을 하고 있어야 한다:

// @generated automatically by Diesel CLI.

diesel::table! {
transaction_digests (tx_digest) {
tx_digest -> Text,
checkpoint_sequence_number -> Int8,
}
}

이전 명령을 실행한 후 프로젝트는 다음 단계를 수행할 준비가 된 상태이다:

  • PostgreSQL에는 이제 정의된 컬럼을 가진 transaction_digests 테이블이 생성되어 있다.
  • src/schema.rs에는 이 테이블 구조를 나타내는 자동 생성된 Rust 코드가 들어 있다.
  • 이제 이 특정 테이블과 통신하는 type-safe Rust 코드를 작성할 수 있다.

Diesel의 migration 시스템은 구조화되고 버전 관리되는 방식으로 시간이 지나면서 데이터베이스 schema를 발전시킨다. 전체 walkthrough는 공식 Diesel Getting Started guide에서 확인할 수 있다.

##step Create data structure

Diesel에 대한 write 작업을 단순화하기 위해 transaction_digests 테이블의 레코드를 나타내는 struct를 정의할 수 있다.

use diesel::prelude::*;
use sui_indexer_alt_framework::FieldCount;
use crate::schema::transaction_digests;

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = transaction_digests)]
pub struct StoredTransactionDigest {
pub tx_digest: String,
pub checkpoint_sequence_number: i64,
}

주요 annotation:

  • FieldCount: 메모리 최적화와 batch 처리 효율성을 위해 sui-indexer-alt-framework에서 요구되며 단일 SQL 문이 가질 수 있는 bind parameter 수에 대한 postgres 제한을 초과하지 않도록 batch의 최대 크기를 제한하는 데 사용된다.
  • diesel(table_name = transaction_digests): 이 Rust struct를 이전 단계에서 schema가 생성된 transaction_digests 테이블에 매핑한다.
  • Insertable: 이 struct를 Diesel을 사용해 데이터베이스에 insert할 수 있도록 해 준다.

##step Define the Handler struct in handler.rs

src 디렉터리에 handlers.rs 파일을 생성한다.

$ touch ./src/handlers.rs

파일을 열어 ProcessorHandler trait을 구현할 concrete struct를 정의한다.

pub struct TransactionDigestHandler;
#[async_trait::async_trait]
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.summary.sequence_number as i64;

let digests = checkpoint
.transactions
.iter()
.map(|tx| StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

Ok(digests)
}
}

파일을 저장하되 다음 단계에서 이 코드에 내용을 추가하므로 계속 열어 둔다.

##step Implement the Processor

Processor trait은 checkpoint에서 데이터를 추출하고 변환하는 방법을 정의한다.

그 결과 데이터는 Handler::commit으로 전달된다.

파일 상단에 필요한 dependency를 추가한다.

use anyhow::Result;
use std::sync::Arc;
use sui_indexer_alt_framework::pipeline::Processor;
use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;

use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;

TransactionDigestHandler struct 뒤에 Processor 코드를 추가한다.

#[async_trait::async_trait]
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.summary.sequence_number as i64;

let digests = checkpoint
.transactions
.iter()
.map(|tx| StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

Ok(digests)
}
}

핵심 개념:

  • NAME: monitoring과 logging에서 사용되는 이 processor의 고유 식별자이다.
  • type Value: pipeline을 통해 흐르는 데이터가 무엇인지 정의하여 type safety를 보장한다.
  • process(): checkpoint 데이터를 커스텀 데이터 구조로 변환하는 core 로직이다.

handlers.rs 파일을 저장한다.

Click to open

Processor trait 정의

/// Implementors of this trait are responsible for transforming checkpoint into rows for their
/// table.
#[async_trait]
pub trait Processor: Send + Sync + 'static {
/// Used to identify the pipeline in logs and metrics.
const NAME: &'static str;

/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static;

/// The processing logic for turning a checkpoint into rows of the table.
///
/// All errors returned from this method are treated as transient and will be retried
/// indefinitely with exponential backoff.
///
/// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
/// format, unsupported protocol version), you should panic! This stops the indexer and alerts
/// operators that manual intervention is required. Do not return permanent errors as they will
/// cause infinite retries and block the pipeline.
///
/// For transient errors (e.g., network issues, rate limiting), simply return the error and
/// let the framework retry automatically.
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
}

##step Implement the Handler

Handler trait은 데이터를 데이터베이스에 commit하는 방법을 정의한다.

이전 단계에서 만든 dependency 목록의 하단에 Handler 관련 dependency를 추가한다.

use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
pipeline::sequential::Handler,
postgres::{Connection, Db},
};

Processor 코드 뒤에 Handler 로직을 추가한다.

완전한 코드는 이 단계의 끝에서 확인할 수 있다.

#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;

fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
batch.extend(values);
}

async fn commit<'a>(&self, batch: &Self::Batch, conn: &mut Connection<'a>) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;

Ok(inserted)
}
}

sequential batching이 동작하는 방식은 다음과 같다:

  1. process()는 각 checkpoint에 대한 값을 반환한다.
  2. batch()는 여러 checkpoint에서 나온 값을 누적한다.
  3. commit()는 framework가 H::MAX_BATCH_CHECKPOINTS 한도에 도달했을 때 batch를 write한다.
while batch_checkpoints < max_batch_checkpoints {
let Some(entry) = pending.first_entry() else {
break;
};

match next_checkpoint.cmp(entry.key()) {
// Next pending checkpoint is from the future.
Ordering::Less => break,

// This is the next checkpoint -- include it.
Ordering::Equal => {
let indexed = entry.remove();
batch_rows += indexed.len();
batch_checkpoints += 1;
handler.batch(&mut batch, indexed.values.into_iter());
watermark = Some(indexed.watermark);
next_checkpoint += 1;
}

// Next pending checkpoint is in the past, ignore it to avoid double
// writes.
Ordering::Greater => {
metrics
.total_watermarks_out_of_order
.with_label_values(&[H::NAME])
.inc();

let indexed = entry.remove();
pending_rows -= indexed.len();
}
}
}

Handler 안에서 상수를 구현해 기본 batch 한도를 재정의할 수 있다.

이제 handlers.rs 파일이 완성되었다. 파일을 저장한다.

Click to open

완성된 handler.rs 파일

use anyhow::Result;
use std::sync::Arc;
use sui_indexer_alt_framework::pipeline::Processor;
use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;

use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
pipeline::sequential::Handler,
postgres::{Connection, Db},
};

pub struct TransactionDigestHandler;
#[async_trait::async_trait]
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.summary.sequence_number as i64;

let digests = checkpoint
.transactions
.iter()
.map(|tx| StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

Ok(digests)
}
}
#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;

fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
batch.extend(values);
}

async fn commit<'a>(&self, batch: &Self::Batch, conn: &mut Connection<'a>) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;

Ok(inserted)
}
}
Click to open

Handler trait 정의

Trait not found. If code is formatted correctly, consider using code comments instead.

##step Create .env file

다음 단계에서 생성하는 main 함수는 shell 변수 $PSQL_URL에 저장한 값이 필요하므로, 이 값을 포함하는 .env 파일을 생성해 사용할 수 있도록 해야 한다.

echo "DATABASE_URL=$PSQL_URL" > .env

환경에 맞는 명령을 실행한 후 프로젝트 루트에 .env 파일이 존재하고 올바른 데이터가 들어 있는지 확인해야 한다.

##step Create main function

이제 모든 요소를 main 함수에서 하나로 묶기 위해 main.rs 파일을 열고 기본 코드를 다음 코드로 교체한 뒤 파일을 저장해야 한다.

mod handlers;
mod models;

use handlers::TransactionDigestHandler;

pub mod schema;

use anyhow::{Result, bail};
use clap::Parser;
use diesel_migrations::{EmbeddedMigrations, embed_migrations};
use sui_indexer_alt_framework::{
cluster::{Args, IndexerCluster},
pipeline::sequential::SequentialConfig,
service::Error,
};
use tokio;
use url::Url;

// Embed database migrations into the binary so they run automatically on startup
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

#[tokio::main]
async fn main() -> Result<()> {
// Load .env data
dotenvy::dotenv().ok();

// Local database URL created in step 3 above
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set in the environment")
.parse::<Url>()
.expect("Invalid database URL");

// Parse command-line arguments (checkpoint range, URLs, performance settings)
let args = Args::parse();

// Build and configure the indexer cluster
let mut cluster = IndexerCluster::builder()
.with_args(args) // Apply command-line configuration
.with_database_url(database_url) // Set up database URL
.with_migrations(&MIGRATIONS) // Enable automatic schema migrations
.build()
.await?;

// Register our custom sequential pipeline with the cluster
cluster
.sequential_pipeline(
TransactionDigestHandler, // Our processor/handler implementation
SequentialConfig::default(), // Use default batch sizes and checkpoint lag
)
.await?;

// Start the indexer and wait for completion
match cluster.run().await?.main().await {
Ok(()) | Err(Error::Terminated) => Ok(()),
Err(Error::Aborted) => {
bail!("Indexer aborted due to an unexpected error")
}
Err(Error::Task(e)) => {
bail!(e)
}
}
}

핵심 구성 요소는 다음과 같다:

  • embed_migrations!: migration 파일을 binary에 포함해 indexer가 시작 시점에 데이터베이스 schema를 자동으로 업데이트하도록 한다.
  • Args::parse(): --first-checkpoint, --remote-store-url 등의 command-line 구성을 제공한다.
  • IndexerCluster::builder(): 데이터베이스 연결, checkpoint 스트리밍, 모니터링 등 framework 인프라를 설정한다.
  • sequential_pipeline(): smart batching이 적용된 순서 기반 처리로 checkpoint를 처리하는 sequential pipeline을 등록한다.
  • SequentialConfig::default(): batch 크기와 checkpoint lag(여러 checkpoint를 함께 batch하는 개수)에 대해 framework의 기본값을 사용한다.
  • cluster.run(): checkpoint 처리를 시작하고 완료될 때까지 블로킹한다.

이제 indexer가 완성되었다. 다음 단계에서는 indexer를 실행하고 기능을 확인하는 방법을 설명한다.

##step Run your indexer

Testnet에 대해 indexer를 실행하려면 cargo run 명령을 사용한다. 일반 운영에서는 full node gRPC를 primary source로 사용하고, remote checkpoint store는 fallback 또는 backfill 용도로만 사용하는 것을 권장한다. 자세한 내용은 Checkpoint Data Sources를 참고한다:

권장: full node에서 gRPC 스트리밍 사용(remote checkpoint store fallback 포함):

$ cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --streaming-url https://fullnode.testnet.sui.io:443

remote checkpoint storage 사용:

$ cargo run -- --remote-store-url https://checkpoints.testnet.sui.io

GCS checkpoint bucket 직접 사용:

$ cargo run -- --remote-store-gcs mysten-mainnet-checkpoints-use4

공개 HTTPS checkpoint endpoint는 가장 최근 30일의 checkpoint만 보관한다. 30일보다 오래된 Mainnet backfill에는 --remote-store-gcs mysten-mainnet-checkpoints-use4를 통해 gs://mysten-mainnet-checkpoints-use4를 사용한다. 이 bucket은 Requester Pays가 활성화되어 있으며 --remote-store-url 대신 indexing framework의 --remote-store-gcs 옵션으로 접근해야 한다. Use Requester PaysRunning a Remote Store를 참고한다.

정보

운영 체제가 basic-sui-indexer 애플리케이션에 대해 요청하는 경우 이 애플리케이션에 대한 incoming network 요청을 허용해야 한다.

성공하면 콘솔에서 indexer가 실행 중임을 알려 준다.

##step Verify results

결과를 확인하려면 새 터미널이나 콘솔을 열고 데이터베이스에 연결해야 한다.

$ psql sui_indexer

연결된 후에는 indexer가 제대로 동작하는지 확인하기 위해 몇 가지 쿼리를 실행해야 한다:

index된 transaction digest 개수를 확인한다:

$ SELECT COUNT(*) FROM transaction_digests;

샘플 레코드를 확인한다:

$ SELECT * FROM transaction_digests LIMIT 5;

데이터가 올바른지 확인하려면 데이터베이스에서 임의의 transaction digest를 복사한 후 SuiScan에서 https://suiscan.xyz/testnet/home을 사용해 검증해야 한다.

동작하는 커스텀 indexer를 구축했다. 🎉

여기에서 다룬 핵심 개념은 모든 커스텀 indexer에 적용되며, 데이터 구조를 정의하고 ProcessorHandler trait을 구현한 다음 framework에 인프라 관리를 맡기면 된다.