본문으로 건너뛰기

커스텀 인덱서 만들기

이 가이드는 커스텀 인덱서(custom indexer)를 구축하는 실용적인 예시를 제공한다. indexer framework의 개념적 개요는 커스텀 인덱싱 프레임워크인덱서 파이프라인 아키텍처를 참고한다.

완전한 커스텀 인덱서를 구축하려면 sui-indexer-alt-framework를 사용한다. 다음 단계에서는 Sui checkpoint에서 transaction digest를 추출하여 로컬 PostgreSQL에 저장하는 sequential pipeline을 만드는 방법을 보여준다. framework의 source code for the framework는 GitHub의 Sui repo에서 찾을 수 있다.

이 예제는 간결함과 기본 지원을 위해 Diesel(널리 사용되는 Rust ORM 및 query builder)과 함께 PostgreSQL을 사용하지만, sui-indexer-alt-framework는 유연한 storage를 위해 설계되었다. Diesel을 사용하지 않으려면 MongoDB, CouchDB 같은 다른 database를 사용하거나 다른 database client를 활용할 수 있다. 이를 위해 framework의 StoreConnection trait을 구현하고 Handler::commit() 메서드 안에 database write logic을 직접 정의한다.

Click to open

Check installation

시스템에 필요한 소프트웨어가 제대로 설치되어 있는지 확신이 서지 않는다면 다음 명령으로 설치를 확인할 수 있다.

$ psql --version
$ diesel --version

What you build

다음 단계에서는 indexer를 생성하는 방법을 보여 준다:

  • Sui Testnet에 연결한다: https://checkpoints.testnet.sui.io 에 있는 remote checkpoint store를 사용한다.
  • checkpoint를 처리한다: checkpoint 데이터를 지속적으로 스트리밍한다.
  • transaction 데이터를 추출한다: 각 checkpoint에서 transaction digest를 끌어온다.
  • 로컬 PostgreSQL에 저장한다: 데이터를 로컬 PostgreSQL 데이터베이스에 commit한다.
  • sequential pipeline을 구현한다: 최적의 일관성과 성능을 위해 in-order 처리와 batching을 사용한다.

결국에는 모든 핵심 framework 개념을 보여 주고 더 복잡한 커스텀 indexer의 기반으로 사용할 수 있는 동작하는 indexer를 갖게 된다.

정보

Sui는 Mainnet과 Testnet 모두에 대해 checkpoint store를 제공한다.

  • Testnet: https://checkpoints.testnet.sui.io
  • Mainnet: https://checkpoints.mainnet.sui.io

##step Project setup

먼저 indexer 프로젝트를 저장하려는 디렉터리에서 콘솔을 연다. cargo new 명령을 사용해 새 Rust 프로젝트를 만든 다음 해당 디렉터리로 이동한다.

$ cargo new simple-sui-indexer
$ cd simple-sui-indexer

##step Configure dependencies

Cargo.toml 코드를 다음 설정으로 교체한 뒤 저장한다.

[package]
name = "basic-sui-indexer"
version = "0.1.0"
edition = "2024"

[dependencies]
# Core framework dependencies
sui-indexer-alt-framework = { git = "https://github.com/MystenLabs/sui.git", branch = "testnet" }

# Async runtime
tokio = { version = "1.0", features = ["full"] }

# Error handling
anyhow = "1.0"

# Diesel PostgreSQL
diesel = { version = "2.0", features = ["postgres", "r2d2"] }
diesel-async = { version = "0.5", features = ["bb8", "postgres", "async-connection-wrapper"] }
diesel_migrations = "2.0"

# Async traits
async-trait = "0.1"

# URL parsing
url = "2.0"

# Use .env file
dotenvy = "0.15"

# Command line parsing
clap = { version = "4.0", features = ["derive"] }

이제 manifest에는 다음 dependency가 포함되어 있다:

  • sui-indexer-alt-framework: pipeline 인프라를 제공하는 core framework이다.
  • diesel/diesel-async: asynchronous 지원을 제공하는 type-safe database ORM이다.
  • tokio: framework에 필요한 async runtime이다.
  • clap: 설정을 위한 command-line argument parsing을 제공한다.
  • anyhow: trait 구현을 위한 error handling과 async-trait을 제공한다.
  • dotenvy: PostgreSQL URL을 저장하는 .env 파일을 읽어 들인다.

##step Create database

migrations를 구성하기 전에 로컬 PostgreSQL 데이터베이스를 생성하고 확인해야 한다:

$ createdb sui_indexer

연결 정보를 가져와야 한다:

$ psql sui_indexer -c "\conninfo"

성공하면 콘솔에 다음과 유사한 메시지가 표시되어야 한다:

You are connected to database "sui_indexer" as user "username" via socket in "/tmp" at port "5432".

다음과 비슷한 createdb 오류를 받는 경우이다

createdb: error: connection to server on socket "/tmp/.s.PGSQL.5432" failed: FATAL:  role "username" does not exist

이는 오류 메시지에 제공된 이름으로 username을 바꾸어 사용자를 생성해야 함을 의미한다.

$ sudo -u postgres createuser --superuser username

프롬프트가 표시되면 pgAdmin 계정의 비밀번호를 입력한 다음 createdb 명령을 다시 시도한다.

이제 이후 명령에서 사용될 데이터베이스 URL을 변수로 설정할 수 있으며 username을 실제 사용자 이름으로 변경해야 한다.

$ PSQL_URL=postgres://username@localhost:5432/sui_indexer

이제 다음 명령으로 연결을 테스트할 수 있다:

$ psql $PSQL_URL -c "SELECT 'Connected';"

성공하면 콘솔이나 터미널에 다음과 유사한 메시지가 표시되어야 한다:

?column?
-----------
Connected
(1 row)

##step Database setup

코드를 작성하기 전에 이전 단계에서 로컬 PostgreSQL 데이터베이스를 설정했는지 반드시 확인해야 하며, 이는 indexer가 추출된 transaction 데이터를 저장하는 데 필요하다.

다음 데이터베이스 설정 단계에서는 다음을 수행한다:

  1. 데이터를 저장할 데이터베이스 테이블을 생성한다.
  2. 이 과정을 관리하기 위해 Diesel을 사용한다.
  3. 데이터베이스 테이블에 매핑되는 Rust 코드를 생성한다.

###substep Configure Diesel

먼저 데이터베이스 migration을 구성하기 위해 (cargo.toml과 동일한 폴더에) diesel.toml 파일을 생성한다.

$ touch diesel.toml

파일을 다음 코드로 업데이트하고 저장한다:

[print_schema]
file = "src/schema.rs"

[migrations_directory]
dir = "migrations"

###substep Create database table using Diesel migrations

Diesel migration은 SQL 파일을 사용해 데이터베이스 테이블을 생성하고 관리하는 방식이다. 각 migration은 두 개의 파일로 구성된다:

  • up.sql: 테이블을 생성하고 변경한다.
  • down.sql: 변경 사항을 제거하고 되돌린다.

데이터베이스 URL을 --database-url 인수로 전달하여 필요한 디렉터리 구조를 생성하려면 diesel setup 명령을 사용한다.

$ diesel setup --database-url $PSQL_URL

프로젝트 루트에서 diesel migration 명령을 사용해 migration 파일을 생성한다.

$ diesel migration generate transaction_digests

이제 프로젝트에는 migrations 폴더가 있어야 한다. 이 폴더 안에는 이름 형식이 YYYY-MM-DD-HHMMSS_transaction_digests인 하위 디렉터리가 하나 있어야 한다. 이 폴더에는 up.sql 파일과 down.sql 파일이 들어 있어야 한다.

up.sql을 열어 실제 폴더 이름을 사용해 내용을 다음 코드로 교체한다:

CREATE TABLE IF NOT EXISTS transaction_digests (
tx_digest TEXT PRIMARY KEY,
checkpoint_sequence_number BIGINT NOT NULL
);

이 예제에서는 tx_digest에 대해 TEXT 데이터 타입을 사용하지만, 운영 환경의 indexer에서는 BYTEA 데이터 타입을 사용하는 것이 모범 사례이다.

TEXT 타입은 transaction digest를 쉽게 읽을 수 있고 외부 도구에서 바로 사용할 수 있도록 하기 위해 사용된다. digest는 Base58로 인코딩되며 PostgreSQL은 이 형식으로 BYTEA 데이터를 기본적으로 표시할 수 없기 때문에 이를 TEXT로 저장하면 쿼리에서 digest를 복사해 SuiScan과 같은 explorer에 붙여넣어 바로 사용할 수 있다.

그러나 운영 환경에서는 BYTEA를 강력히 권장한다. 이는 raw byte 표현을 저장함으로써 문자열보다 더 압축된 형태로 공간을 사용하고 비교 역시 훨씬 빠르게 수행할 수 있어 우수한 storage 및 쿼리 효율을 제공하기 때문이다. 더 많은 정보를 확인하려면 CYBERTEC 웹사이트의 Binary data performance in PostgreSQL을 참고한다.

up.sql을 저장한 다음 편집을 위해 down.sql을 연다.

###substep Apply migration and generate Rust schema

프로젝트 루트에서 diesel migration 명령을 사용해 테이블을 생성한다.

$ diesel migration run --database-url $PSQL_URL

그런 다음 diesel print-schema 명령을 사용해 실제 데이터베이스에서 schema.rs 파일을 생성한다.

$ diesel print-schema --database-url $PSQL_URL > src/schema.rs

이제 src/schema.rs 파일은 다음과 같은 모습을 하고 있어야 한다:

// @generated automatically by Diesel CLI.

diesel::table! {
transaction_digests (tx_digest) {
tx_digest -> Text,
checkpoint_sequence_number -> Int8,
}
}

이전 명령을 실행한 후 프로젝트는 다음 단계를 수행할 준비가 된 상태이다:

  • PostgreSQL에는 이제 정의된 컬럼을 가진 transaction_digests 테이블이 생성되어 있다.
  • src/schema.rs에는 이 테이블 구조를 나타내는 자동 생성된 Rust 코드가 들어 있다.
  • 이제 이 특정 테이블과 통신하는 type-safe Rust 코드를 작성할 수 있다.

Diesel의 migration 시스템은 구조화되고 버전 관리되는 방식으로 시간이 지나면서 데이터베이스 schema를 발전시킨다. 전체 walkthrough는 공식 Diesel Getting Started guide에서 확인할 수 있다.

##step Create data structure

Diesel에 대한 write 작업을 단순화하기 위해 transaction_digests 테이블의 레코드를 나타내는 struct를 정의할 수 있다.

use diesel::prelude::*;
use sui_indexer_alt_framework::FieldCount;
use crate::schema::transaction_digests;

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = transaction_digests)]
pub struct StoredTransactionDigest {
pub tx_digest: String,
pub checkpoint_sequence_number: i64,
}

주요 annotation:

  • FieldCount: 메모리 최적화와 batch 처리 효율성을 위해 sui-indexer-alt-framework에서 요구되며 단일 SQL 문이 가질 수 있는 bind parameter 수에 대한 postgres 제한을 초과하지 않도록 batch의 최대 크기를 제한하는 데 사용된다.
  • diesel(table_name = transaction_digests): 이 Rust struct를 이전 단계에서 schema가 생성된 transaction_digests 테이블에 매핑한다.
  • Insertable: 이 struct를 Diesel을 사용해 데이터베이스에 insert할 수 있도록 해 준다.

##step Define the Handler struct in handler.rs

src 디렉터리에 handlers.rs 파일을 생성한다.

$ touch ./src/handlers.rs

파일을 열어 ProcessorHandler trait을 구현할 concrete struct를 정의한다.

pub struct TransactionDigestHandler;
#[async_trait::async_trait]
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.summary.sequence_number as i64;

let digests = checkpoint
.transactions
.iter()
.map(|tx| StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

Ok(digests)
}
}

파일을 저장하되 다음 단계에서 이 코드에 내용을 추가하므로 계속 열어 둔다.

##step Implement the Processor

Processor trait은 checkpoint에서 데이터를 추출하고 변환하는 방법을 정의한다.

그 결과 데이터는 Handler::commit으로 전달된다.

파일 상단에 필요한 dependency를 추가한다.

use anyhow::Result;
use std::sync::Arc;
use sui_indexer_alt_framework::pipeline::Processor;
use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;

use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;

TransactionDigestHandler struct 뒤에 Processor 코드를 추가한다.

#[async_trait::async_trait]
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.summary.sequence_number as i64;

let digests = checkpoint
.transactions
.iter()
.map(|tx| StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

Ok(digests)
}
}

핵심 개념:

  • NAME: monitoring과 logging에서 사용되는 이 processor의 고유 식별자이다.
  • type Value: pipeline을 통해 흐르는 데이터가 무엇인지 정의하여 type safety를 보장한다.
  • process(): checkpoint 데이터를 커스텀 데이터 구조로 변환하는 core 로직이다.

handlers.rs 파일을 저장한다.

Click to open

Processor trait 정의

/// Implementors of this trait are responsible for transforming checkpoint into rows for their
/// table.
#[async_trait]
pub trait Processor: Send + Sync + 'static {
/// Used to identify the pipeline in logs and metrics.
const NAME: &'static str;

/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static;

/// The processing logic for turning a checkpoint into rows of the table.
///
/// All errors returned from this method are treated as transient and will be retried
/// indefinitely with exponential backoff.
///
/// If you encounter a permanent error that will never succeed on retry (e.g., invalid data
/// format, unsupported protocol version), you should panic! This stops the indexer and alerts
/// operators that manual intervention is required. Do not return permanent errors as they will
/// cause infinite retries and block the pipeline.
///
/// For transient errors (e.g., network issues, rate limiting), simply return the error and
/// let the framework retry automatically.
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>>;
}

##step Implement the Handler

Handler trait은 데이터를 데이터베이스에 commit하는 방법을 정의한다.

이전 단계에서 만든 dependency 목록의 하단에 Handler 관련 dependency를 추가한다.

use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
pipeline::sequential::Handler,
postgres::{Connection, Db},
};

Processor 코드 뒤에 Handler 로직을 추가한다.

완전한 코드는 이 단계의 끝에서 확인할 수 있다.

#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;

fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
batch.extend(values);
}

async fn commit<'a>(&self, batch: &Self::Batch, conn: &mut Connection<'a>) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;

Ok(inserted)
}
}

sequential batching이 동작하는 방식은 다음과 같다:

  1. process()는 각 checkpoint에 대한 값을 반환한다.
  2. batch()는 여러 checkpoint에서 나온 값을 누적한다.
  3. commit()는 framework가 H::MAX_BATCH_CHECKPOINTS 한도에 도달했을 때 batch를 write한다.
while batch_checkpoints < max_batch_checkpoints {
if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
break;
}

let Some(entry) = pending.first_entry() else {
break;
};

match next_checkpoint.cmp(entry.key()) {
// Next pending checkpoint is from the future.
Ordering::Less => break,

// This is the next checkpoint -- include it.
Ordering::Equal => {
let indexed = entry.remove();
batch_rows += indexed.len();
batch_checkpoints += 1;
handler.batch(&mut batch, indexed.values.into_iter());
watermark = Some(indexed.watermark);
next_checkpoint += 1;
}

// Next pending checkpoint is in the past, ignore it to avoid double
// writes.
Ordering::Greater => {
metrics
.total_watermarks_out_of_order
.with_label_values(&[H::NAME])
.inc();

let indexed = entry.remove();
pending_rows -= indexed.len();
}
}
}

Handler 안에서 상수를 구현해 기본 batch 한도를 재정의할 수 있다.

이제 handlers.rs 파일이 완성되었다. 파일을 저장한다.

Click to open

완성된 handler.rs 파일

use anyhow::Result;
use std::sync::Arc;
use sui_indexer_alt_framework::pipeline::Processor;
use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;

use crate::models::StoredTransactionDigest;
use crate::schema::transaction_digests::dsl::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
pipeline::sequential::Handler,
postgres::{Connection, Db},
};

pub struct TransactionDigestHandler;
#[async_trait::async_trait]
impl Processor for TransactionDigestHandler {
const NAME: &'static str = "transaction_digest_handler";

type Value = StoredTransactionDigest;

async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let checkpoint_seq = checkpoint.summary.sequence_number as i64;

let digests = checkpoint
.transactions
.iter()
.map(|tx| StoredTransactionDigest {
tx_digest: tx.transaction.digest().to_string(),
checkpoint_sequence_number: checkpoint_seq,
})
.collect();

Ok(digests)
}
}
#[async_trait::async_trait]
impl Handler for TransactionDigestHandler {
type Store = Db;
type Batch = Vec<Self::Value>;

fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
batch.extend(values);
}

async fn commit<'a>(&self, batch: &Self::Batch, conn: &mut Connection<'a>) -> Result<usize> {
let inserted = diesel::insert_into(transaction_digests)
.values(batch)
.on_conflict(tx_digest)
.do_nothing()
.execute(conn)
.await?;

Ok(inserted)
}
}
Click to open

Handler trait 정의

/// Handlers implement the logic for a given indexing pipeline: How to process checkpoint data (by
/// implementing [Processor]) into rows for their table, and how to write those rows to the database.
///
/// The handler is also responsible for tuning the various parameters of the pipeline (provided as
/// associated values). Reasonable defaults have been chosen to balance concurrency with memory
/// usage, but each handle may choose to override these defaults, e.g.
///
/// - Handlers that produce many small rows may wish to increase their batch/chunk/max-pending
/// sizes).
/// - Handlers that do more work during processing may wish to increase their fanout so more of it
/// can be done concurrently, to preserve throughput.
///
/// Concurrent handlers can only be used in concurrent pipelines, where checkpoint data is
/// processed and committed out-of-order and a watermark table is kept up-to-date with the latest
/// checkpoint below which all data has been committed.
///
/// Back-pressure is handled through the `MAX_PENDING_SIZE` constant -- if more than this many rows
/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
/// back to the ingestion service.
#[async_trait]
pub trait Handler: Processor {
type Store: Store;
type Batch: Default + Send + Sync + 'static;

/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_ROWS: usize = 5000;

/// The maximum number of watermarks that can show up in a single batch.
/// This limit exists to deal with pipelines that produce no data for a majority of
/// checkpoints -- the size of these pipeline's batches will be dominated by watermark updates.
const MAX_WATERMARK_UPDATES: usize = 10_000;

/// Add values from the iterator to the batch. The implementation may take all, some, or none
/// of the values from the iterator by calling `.next()`.
///
/// Returns `BatchStatus::Ready` if the batch is full and should be committed,
/// or `BatchStatus::Pending` if the batch can accept more values.
///
/// Note: The handler can signal batch readiness via `BatchStatus::Ready`, but the framework
/// may also decide to commit a batch based on the trait parameters above.
fn batch(
&self,
batch: &mut Self::Batch,
values: &mut std::vec::IntoIter<Self::Value>,
) -> BatchStatus;

/// Commit the batch to the database, returning the number of rows affected.
async fn commit<'a>(
&self,
batch: &Self::Batch,
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize>;

/// Clean up data between checkpoints `_from` and `_to_exclusive` (exclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune<'a>(
&self,
_from: u64,
_to_exclusive: u64,
_conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize> {
Ok(0)
}
}

##step Create .env file

다음 단계에서 생성하는 main 함수는 shell 변수 $PSQL_URL에 저장한 값이 필요하므로, 이 값을 포함하는 .env 파일을 생성해 사용할 수 있도록 해야 한다.

echo "DATABASE_URL=$PSQL_URL" > .env

환경에 맞는 명령을 실행한 후 프로젝트 루트에 .env 파일이 존재하고 올바른 데이터가 들어 있는지 확인해야 한다.

##step Create main function

이제 모든 요소를 main 함수에서 하나로 묶기 위해 main.rs 파일을 열고 기본 코드를 다음 코드로 교체한 뒤 파일을 저장해야 한다.

mod handlers;
mod models;

use handlers::TransactionDigestHandler;

pub mod schema;

use anyhow::{Result, bail};
use clap::Parser;
use diesel_migrations::{EmbeddedMigrations, embed_migrations};
use sui_indexer_alt_framework::{
cluster::{Args, IndexerCluster},
pipeline::sequential::SequentialConfig,
service::Error,
};
use tokio;
use url::Url;

// Embed database migrations into the binary so they run automatically on startup
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

#[tokio::main]
async fn main() -> Result<()> {
// Load .env data
dotenvy::dotenv().ok();

// Local database URL created in step 3 above
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set in the environment")
.parse::<Url>()
.expect("Invalid database URL");

// Parse command-line arguments (checkpoint range, URLs, performance settings)
let args = Args::parse();

// Build and configure the indexer cluster
let mut cluster = IndexerCluster::builder()
.with_args(args) // Apply command-line configuration
.with_database_url(database_url) // Set up database URL
.with_migrations(&MIGRATIONS) // Enable automatic schema migrations
.build()
.await?;

// Register our custom sequential pipeline with the cluster
cluster
.sequential_pipeline(
TransactionDigestHandler, // Our processor/handler implementation
SequentialConfig::default(), // Use default batch sizes and checkpoint lag
)
.await?;

// Start the indexer and wait for completion
match cluster.run().await?.main().await {
Ok(()) | Err(Error::Terminated) => Ok(()),
Err(Error::Aborted) => {
bail!("Indexer aborted due to an unexpected error")
}
Err(Error::Task(e)) => {
bail!(e)
}
}
}

핵심 구성 요소는 다음과 같다:

  • embed_migrations!: migration 파일을 binary에 포함해 indexer가 시작 시점에 데이터베이스 schema를 자동으로 업데이트하도록 한다.
  • Args::parse(): --first-checkpoint, --remote-store-url 등의 command-line 구성을 제공한다.
  • IndexerCluster::builder(): 데이터베이스 연결, checkpoint 스트리밍, 모니터링 등 framework 인프라를 설정한다.
  • sequential_pipeline(): smart batching이 적용된 순서 기반 처리로 checkpoint를 처리하는 sequential pipeline을 등록한다.
  • SequentialConfig::default(): batch 크기와 checkpoint lag(여러 checkpoint를 함께 batch하는 개수)에 대해 framework의 기본값을 사용한다.
  • cluster.run(): checkpoint 처리를 시작하고 완료될 때까지 블로킹한다.

이제 indexer가 완성되었다. 다음 단계에서는 indexer를 실행하고 기능을 확인하는 방법을 설명한다.

##step Run your indexer

원격 checkpoint storage를 사용하는 Testnet에 대해 indexer를 실행하려면 cargo run 명령을 사용해야 한다.

$ cargo run -- --remote-store-url https://checkpoints.testnet.sui.io
정보

운영 체제가 basic-sui-indexer 애플리케이션에 대해 요청하는 경우 이 애플리케이션에 대한 incoming network 요청을 허용해야 한다.

성공하면 콘솔에서 indexer가 실행 중임을 알려 준다.

##step Verify results

결과를 확인하려면 새 터미널이나 콘솔을 열고 데이터베이스에 연결해야 한다.

$ psql sui_indexer

연결된 후에는 indexer가 제대로 동작하는지 확인하기 위해 몇 가지 쿼리를 실행해야 한다:

index된 transaction digest 개수를 확인한다:

$ SELECT COUNT(*) FROM transaction_digests;

샘플 레코드를 확인한다:

$ SELECT * FROM transaction_digests LIMIT 5;

데이터가 올바른지 확인하려면 데이터베이스에서 임의의 transaction digest를 복사한 후 SuiScan에서 https://suiscan.xyz/testnet/home을 사용해 검증해야 한다.

동작하는 커스텀 indexer를 구축했다. 🎉

여기에서 다룬 핵심 개념은 모든 커스텀 indexer에 적용되며, 데이터 구조를 정의하고 ProcessorHandler trait을 구현한 다음 framework에 인프라 관리를 맡기면 된다.