자체 스토어 사용하기 (BYOS)
IndexerCluster는 PostgreSQL로 시작하기에 편리한 방법을 제공하지만, 다른 database나 storage system을 사용하고 싶을 수도 있다.
이를 위해서는 수동 Indexer class를 사용하고 sui-indexer-alt-framework-store-traits의 custom Store와 Connection trait를 구현해야 한다.
lib.rs in sui-indexer-alt-framework-store-traits
lib.rs in sui-indexer-alt-framework-store-traits/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send {
/// Initializes a watermark by either returning the existing watermark or, if the impl supports it, attempting to create it.
/// Returns `Ok(Some(_))` if a watermark existed or was created by this impl.
/// Returns `Ok(None)` if a watermark does not exist and this impl does not attempt to create one.
/// Returns `Err(_)` if the store encountered an error while trying to read or create the watermark.
async fn init_watermark(
&mut self,
pipeline_task: &str,
checkpoint_hi_inclusive: Option<u64>,
) -> anyhow::Result<Option<InitWatermark>>;
/// Checks if the store can accept a `chain_id`.
/// Returns `Ok(true)` if the store accepts this `chain_id` thereby allowing processing to continue.
/// Returns `Ok(false)` if the store does not accept the `chain_id` thereby halting processing with an error.
/// Returns `Err(_)` if the store encountered an error while trying to determine if it could accept
/// the `chain_id` which will cause `accepts_chain_id` to be retried.
async fn accepts_chain_id(
&mut self,
pipeline_task: &str,
chain_id: [u8; 32],
) -> anyhow::Result<bool>;
/// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
/// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
/// from the `Store`. The indexer fetches this value for each pipeline added to determine which
/// checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<CommitterWatermark>>;
/// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
/// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
/// long as it raises the watermark stored in the database. Returns a boolean indicating whether
/// the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;
}
/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;
async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>>;
}
When to use BYOS:
- Different database: MongoDB, CouchDB, 또는 그 밖의 비-PostgreSQL database를 사용할 때이다. 기본 Diesel ORM 없이 PostgreSQL을 사용하고 싶을 때도 여기에 해당한다.
- Custom requirements: 특수한 storage logic, partitioning, 또는 성능 최적화가 필요할 때이다.
Core implementation requirements
BYOS를 구현하려면 다음이 필요하다:
- connection을 관리하는
Store와Connectionstruct를 정의한다. - connection 관리를 위해
Storetrait를 구현한다. - pipeline coordination을 위한 watermark 연산에 대해
Connectiontrait를 구현한다. IndexerCluster대신 수동Indexer를 사용한다.
###step Define your store structure
use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;
#[derive(Clone)]
pub struct MyCustomStore {
// 데이터베이스 connection 상세 정보
connection_pool: MyDatabasePool,
config: MyConfig,
}
pub struct MyCustomConnection<'a> {
// connection instance
conn: MyDatabaseConnection<'a>,
}
###step Implement the Store trait
Store trait는 connection lifecycle을 관리한다:
#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;
async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// 구현 내용
}
}
###step Implement the Connection trait
Connection trait는 pipeline coordination을 위한 watermark 연산을 처리한다:
#[async_trait]
impl Connection for MyCustomConnection<'_> {
// pipeline이 처리한 가장 높은 checkpoint를 가져온다
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// watermark 데이터를 위해 데이터베이스를 질의한다
todo!("스토리지 시스템에 맞게 구현")
}
// reader가 사용할 수 있는 가장 낮은 checkpoint를 가져온다
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// 구현은 데이터베이스 스키마에 따라 달라진다
todo!("스토리지 시스템에 맞게 구현")
}
// 필요한 다른 method를 구현한다...
}
완전한 reference는 Connection에 대한 sui-pg-db 구현을 살펴본다:
#[async_trait]
impl store::Connection for Connection<'_> {
async fn init_watermark(
&mut self,
pipeline_task: &str,
checkpoint_hi_inclusive: Option<u64>,
) -> anyhow::Result<Option<store::InitWatermark>> {
let checkpoint_hi_inclusive = checkpoint_hi_inclusive.map_or(-1, |c| c as i64);
let stored_watermark = StoredWatermark::for_init(
pipeline_task,
checkpoint_hi_inclusive,
checkpoint_hi_inclusive + 1,
);
use diesel::pg::upsert::excluded;
let (checkpoint_hi_inclusive, reader_lo): (i64, i64) =
diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// If there is an existing row, return it without updating it.
.on_conflict(watermarks::pipeline)
// Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
.do_update()
// When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
// `excluded` is a virtual table containing the existing row that there was a conflict with.
.set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
.returning((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
.get_result(self)
.await?;
Ok(Some(store::InitWatermark {
checkpoint_hi_inclusive: u64::try_from(checkpoint_hi_inclusive).ok(),
reader_lo: Some(reader_lo as u64),
}))
}
async fn accepts_chain_id(
&mut self,
pipeline_task: &str,
chain_id: [u8; 32],
) -> anyhow::Result<bool> {
let stored_chain_id: Option<Vec<u8>> = diesel::update(watermarks::table)
.filter(watermarks::pipeline.eq(pipeline_task))
// "coalesce" only updates the value if it is null in the existing row
.set(watermarks::chain_id.eq(coalesce(watermarks::chain_id, chain_id)))
.returning(watermarks::chain_id)
.get_result(self)
.await?;
let stored_chain_id = stored_chain_id.context("missing chain id after update")?;
let stored_chain_id: [u8; 32] = stored_chain_id
.try_into()
.map_err(|v: Vec<u8>| anyhow::anyhow!("chain id has wrong length: {}", v.len()))?;
Ok(stored_chain_id == chain_id)
}
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<store::CommitterWatermark>> {
let (
epoch_hi_inclusive,
checkpoint_hi_inclusive,
tx_hi,
timestamp_ms_hi_inclusive,
reader_lo,
): (i64, i64, i64, i64, i64) = watermarks::table
.select((
watermarks::epoch_hi_inclusive,
watermarks::checkpoint_hi_inclusive,
watermarks::tx_hi,
watermarks::timestamp_ms_hi_inclusive,
watermarks::reader_lo,
))
.filter(watermarks::pipeline.eq(pipeline_task))
.first(self)
.await?;
Ok(
(reader_lo <= checkpoint_hi_inclusive).then_some(store::CommitterWatermark {
epoch_hi_inclusive: epoch_hi_inclusive as u64,
checkpoint_hi_inclusive: checkpoint_hi_inclusive as u64,
tx_hi: tx_hi as u64,
timestamp_ms_hi_inclusive: timestamp_ms_hi_inclusive as u64,
}),
)
}
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: store::CommitterWatermark,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set((
watermarks::epoch_hi_inclusive.eq(watermark.epoch_hi_inclusive as i64),
watermarks::checkpoint_hi_inclusive.eq(watermark.checkpoint_hi_inclusive as i64),
watermarks::tx_hi.eq(watermark.tx_hi as i64),
watermarks::timestamp_ms_hi_inclusive
.eq(watermark.timestamp_ms_hi_inclusive as i64),
))
.filter(watermarks::pipeline.eq(pipeline_task))
.filter(
watermarks::checkpoint_hi_inclusive.lt(watermark.checkpoint_hi_inclusive as i64),
)
.execute(self)
.await?
> 0)
}
}
###step Use manual indexer
IndexerCluster를 수동 Indexer로 교체한다:
use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{
ClientArgs, IngestionConfig,
ingestion_client::IngestionClientArgs,
};
async fn main() -> anyhow::Result<()> {
// custom store를 초기화한다
let store = MyCustomStore::new(config).await?;
// indexer를 수동으로 구성한다
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
ingestion: IngestionClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
..Default::default()
},
..Default::default()
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;
// pipeline을 추가한다
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;
// indexer를 시작한다
indexer.run().await?;
Ok(())
}
Example: ClickHouse implementation
ClickHouse(분석용 고성능 columnar database)를 사용하는 완전한 BYOS working example은 example project in the Sui repo를 참조한다.
ClickHouse example README
ClickHouse Sui Indexer
A simple example of how to build a custom Sui indexer that writes transaction data to ClickHouse.
Quick Start
1. Start ClickHouse
docker run -d --name clickhouse-dev -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse/clickhouse-server
2. Set up database user
docker exec clickhouse-dev clickhouse-client --query "CREATE USER IF NOT EXISTS dev IDENTIFIED WITH no_password"
docker exec clickhouse-dev clickhouse-client --query "GRANT CREATE, INSERT, SELECT, ALTER, UPDATE, DELETE ON *.* TO dev"
3. Run the indexer
cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --last-checkpoint=10
That’s it! The indexer will:
- Create the necessary tables automatically
- Fetch checkpoints from the Sui testnet
- Write transaction data to ClickHouse
Verify Data
Check that data was written:
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT COUNT(*) FROM transactions"
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT * FROM transactions LIMIT 5"
Clean Up
Stop and remove the ClickHouse container:
docker stop clickhouse-dev && docker rm clickhouse-dev
What This Example Shows
- Custom Store Implementation: How to implement the
Storetrait for ClickHouse - Concurrent Pipeline: Uses the concurrent pipeline for better pruning and watermark testing
- Watermark Management: Tracking indexer progress with committer, reader, and pruner watermarks
- Transaction Processing: Extracting and storing transaction digests from checkpoints
- Simple Setup: Minimal configuration for local development
Architecture
Sui Network → Checkpoints → Concurrent Pipeline → ClickHouse Store → ClickHouse DB
The indexer uses a concurrent pipeline that processes checkpoints out-of-order with separate reader, committer, and pruner components. This is ideal for testing watermark functionality and pruning behavior.
이 예시는 다음을 보여준다:
- Custom store implementation using the ClickHouse Rust client.
- Watermark persistence with ClickHouse-specific SQL syntax.
- Transaction digest indexing similar to the built-in PostgreSQL handler.
예시에는 3개의 주요 구성 요소가 포함되어 있다:
-
store.rs-Store와Connectiontrait를 구현하는 ClickHouseStore.Click to openstore.rsuse anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use clickhouse::{Client, Row};
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use sui_indexer_alt_framework::store::{
CommitterWatermark, Connection, PrunerWatermark, ReaderWatermark, Store, TransactionalStore,
};
use url::Url;
#[derive(Clone)]
pub struct ClickHouseStore {
client: Client,
}
pub struct ClickHouseConnection {
pub client: Client,
}
/// Row structure for watermark table operations
#[derive(Row, Serialize, Deserialize, Debug, Default)]
struct WatermarkRow {
pipeline_task: String,
epoch_hi_inclusive: u64,
checkpoint_hi_inclusive: u64,
tx_hi: u64,
timestamp_ms_hi_inclusive: u64,
reader_lo: u64,
pruner_hi: u64,
pruner_timestamp: u64, // Unix timestamp in milliseconds
}
impl ClickHouseStore {
pub fn new(url: Url) -> Self {
let client = Client::default()
.with_url(url.as_str())
.with_user("dev") // Simple user for local development
.with_compression(clickhouse::Compression::Lz4);
Self { client }
}
/// Create tables if they don't exist
pub async fn create_tables_if_not_exists(&self) -> Result<()> {
// Create watermarks table for pipeline state management
self.client
.query(
"
CREATE TABLE IF NOT EXISTS watermarks
(
pipeline String,
epoch_hi_inclusive UInt64,
checkpoint_hi_inclusive UInt64,
tx_hi UInt64,
timestamp_ms_hi_inclusive UInt64,
reader_lo UInt64,
pruner_hi UInt64,
pruner_timestamp UInt64
)
ENGINE = MergeTree()
ORDER BY pipeline
",
)
.execute()
.await?;
// Create transactions table for the actual indexing data
self.client
.query(
"
CREATE TABLE IF NOT EXISTS transactions
(
checkpoint_sequence_number UInt64,
transaction_digest String,
indexed_at DateTime64(3, 'UTC') DEFAULT now()
)
ENGINE = MergeTree()
ORDER BY checkpoint_sequence_number
",
)
.execute()
.await?;
Ok(())
}
}
#[async_trait]
impl Store for ClickHouseStore {
type Connection<'c> = ClickHouseConnection;
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>> {
Ok(ClickHouseConnection {
client: self.client.clone(),
})
}
}
#[async_trait]
impl TransactionalStore for ClickHouseStore {
async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
where
R: Send + 'a,
F: Send + 'a,
F: for<'r> FnOnce(
&'r mut Self::Connection<'_>,
) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
{
let mut conn = self.connect().await?;
f(&mut conn).await
}
}
#[async_trait]
impl Connection for ClickHouseConnection {
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>> {
let existing = self.committer_watermark(pipeline_task).await?;
let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
return Ok(existing.map(|w| w.checkpoint_hi_inclusive));
};
if let Some(existing) = existing {
return Ok(Some(existing.checkpoint_hi_inclusive));
}
let mut inserter = self.client.inserter("watermarks")?;
inserter.write(&WatermarkRow {
pipeline_task: pipeline_task.to_string(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
reader_lo: default_next_checkpoint,
pruner_hi: default_next_checkpoint,
pruner_timestamp: 0,
})?;
inserter.end().await?;
Ok(Some(checkpoint_hi_inclusive))
}
async fn committer_watermark(&mut self, pipeline: &str) -> Result<Option<CommitterWatermark>> {
let mut cursor = self
.client
.query(
"SELECT epoch_hi_inclusive, checkpoint_hi_inclusive, tx_hi, timestamp_ms_hi_inclusive
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(pipeline)
.fetch::<(u64, u64, u64, u64)>()?;
let row: Option<(u64, u64, u64, u64)> = cursor.next().await?;
Ok(row.map(
|(epoch_hi, checkpoint_hi, tx_hi, timestamp_hi)| CommitterWatermark {
epoch_hi_inclusive: epoch_hi,
checkpoint_hi_inclusive: checkpoint_hi,
tx_hi,
timestamp_ms_hi_inclusive: timestamp_hi,
},
))
}
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> Result<Option<ReaderWatermark>> {
let mut cursor = self
.client
.query(
"SELECT checkpoint_hi_inclusive, reader_lo
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1",
)
.bind(pipeline)
.fetch::<(u64, u64)>()?;
let row: Option<(u64, u64)> = cursor.next().await?;
Ok(row.map(|(checkpoint_hi, reader_lo)| ReaderWatermark {
checkpoint_hi_inclusive: checkpoint_hi,
reader_lo,
}))
}
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> Result<Option<PrunerWatermark>> {
// Follow PostgreSQL pattern: calculate wait_for_ms on database side
// We do this so that we can rely on the database to keep a consistent sense of time.
// Using own clocks can potentially be subject to some clock skew.
let delay_ms = delay.as_millis() as i64;
let mut cursor = self
.client
.query(
"SELECT reader_lo, pruner_hi,
toInt64(? + (pruner_timestamp - toUnixTimestamp64Milli(now64()))) as wait_for_ms
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(delay_ms)
.bind(pipeline)
.fetch::<(u64, u64, i64)>()?;
let row: Option<(u64, u64, i64)> = cursor.next().await?;
Ok(
row.map(|(reader_lo, pruner_hi, wait_for_ms)| PrunerWatermark {
wait_for_ms,
reader_lo,
pruner_hi,
}),
)
}
async fn set_committer_watermark(
&mut self,
pipeline: &str,
watermark: CommitterWatermark,
) -> Result<bool> {
// Follow PostgreSQL pattern: check if row exists, then UPDATE or INSERT accordingly
// First check if pipeline exists and get current checkpoint
let mut cursor = self
.client
.query("SELECT checkpoint_hi_inclusive FROM watermarks WHERE pipeline = ? LIMIT 1")
.bind(pipeline)
.fetch::<u64>()?;
let existing_checkpoint: Option<u64> = cursor.next().await?;
if let Some(existing_checkpoint) = existing_checkpoint {
// Row exists - only update if checkpoint advances
if existing_checkpoint < watermark.checkpoint_hi_inclusive {
self.client
.query(
"ALTER TABLE watermarks
UPDATE
epoch_hi_inclusive = ?,
checkpoint_hi_inclusive = ?,
tx_hi = ?,
timestamp_ms_hi_inclusive = ?
WHERE pipeline = ?",
)
.bind(watermark.epoch_hi_inclusive)
.bind(watermark.checkpoint_hi_inclusive)
.bind(watermark.tx_hi)
.bind(watermark.timestamp_ms_hi_inclusive)
.bind(pipeline)
.execute()
.await?;
}
} else {
// No existing row - insert new one
let mut inserter = self.client.inserter("watermarks")?;
inserter.write(&WatermarkRow {
pipeline_task: pipeline.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
tx_hi: watermark.tx_hi,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
reader_lo: 0, // Will be updated by reader
pruner_hi: 0, // Will be updated by pruner
pruner_timestamp: Utc::now().timestamp_millis() as u64,
})?;
inserter.end().await?;
}
Ok(true)
}
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE with timestamp update and advancement check
self.client
.query(
"ALTER TABLE watermarks
UPDATE reader_lo = ?, pruner_timestamp = toUnixTimestamp64Milli(now64())
WHERE pipeline = ? AND reader_lo < ?",
)
.bind(reader_lo)
.bind(pipeline)
.bind(reader_lo)
.execute()
.await?;
Ok(true)
}
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE statement
self.client
.query(
"ALTER TABLE watermarks
UPDATE pruner_hi = ?
WHERE pipeline = ?",
)
.bind(pruner_hi)
.bind(pipeline)
.execute()
.await?;
Ok(true)
}
} -
handlers.rs- checkpoint 데이터를 처리하는TxDigesthandler.Click to openhandlers.rsuse anyhow::Result;
use clickhouse::Row;
use serde::Serialize;
use std::sync::Arc;
use sui_indexer_alt_framework::{
FieldCount,
pipeline::{
Processor,
concurrent::{BatchStatus, Handler},
},
store::Store,
types::full_checkpoint_content::Checkpoint,
};
use crate::store::ClickHouseStore;
/// Structure representing a transaction digest record in ClickHouse
/// Aligned with sui-indexer-alt's StoredTxDigest structure
#[derive(Row, Serialize, Clone, Debug, FieldCount)]
pub struct StoredTxDigest {
pub tx_sequence_number: i64,
pub tx_digest: Vec<u8>,
}
/// Handler that processes checkpoint data and extracts transaction digests
/// Named to align with sui-indexer-alt's TxDigests handler
#[derive(Clone, Default)]
pub struct TxDigests;
#[async_trait::async_trait]
impl Processor for TxDigests {
const NAME: &'static str = "tx_digests";
type Value = StoredTxDigest;
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let Checkpoint {
transactions,
summary,
..
} = checkpoint.as_ref();
let first_tx = summary.network_total_transactions as usize - transactions.len();
Ok(transactions
.iter()
.enumerate()
.map(|(i, tx)| StoredTxDigest {
tx_sequence_number: (first_tx + i) as i64,
tx_digest: tx.transaction.digest().inner().to_vec(),
})
.collect())
}
}
#[async_trait::async_trait]
impl Handler for TxDigests {
type Store = ClickHouseStore;
type Batch = Vec<Self::Value>;
fn batch(
&self,
batch: &mut Self::Batch,
values: &mut std::vec::IntoIter<Self::Value>,
) -> BatchStatus {
batch.extend(values);
BatchStatus::Pending
}
async fn commit<'a>(
&self,
values: &Self::Batch,
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> Result<usize> {
let row_count = values.len();
if row_count == 0 {
return Ok(0);
}
// Use ClickHouse inserter for efficient bulk inserts
let mut inserter = conn.client.inserter("tx_digests")?;
for tx_digest in values {
inserter.write(tx_digest)?;
}
inserter.end().await?;
Ok(row_count)
}
} -
main.rs- ClickHouse backend를 사용하는 수동 indexer 설정.Click to openmain.rsmod handlers;
mod store;
use anyhow::{Result, bail};
use clap::Parser;
use sui_indexer_alt_framework::{
Indexer, IndexerArgs,
ingestion::{ClientArgs, IngestionConfig},
pipeline::concurrent::ConcurrentConfig,
service::Error,
};
use url::Url;
use handlers::TxDigests;
use store::ClickHouseStore;
#[derive(clap::Parser, Debug, Clone)]
struct Args {
#[clap(flatten)]
pub indexer_args: IndexerArgs,
#[clap(flatten)]
pub client_args: ClientArgs,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize crypto provider for HTTPS connections (needed for remote checkpoint fetching)
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install crypto provider");
// Parse command-line arguments
let args = Args::parse();
// ClickHouse connection (uses 'dev' user by default for local development)
let clickhouse_url = "http://localhost:8123".parse::<Url>()?;
println!("Connecting to ClickHouse at: {}", clickhouse_url);
// Create our custom ClickHouse store
let store = ClickHouseStore::new(clickhouse_url);
// Ensure the database tables are created before starting the indexer
store.create_tables_if_not_exists().await?;
// Manually build the indexer with our custom ClickHouse store
// This is the key difference from basic-sui-indexer which uses IndexerCluster::builder()
let mut indexer = Indexer::new(
store.clone(),
args.indexer_args,
args.client_args,
IngestionConfig::default(),
None, // No metrics prefix
&Default::default(), // Empty prometheus registry
)
.await?;
// Register our concurrent pipeline handler (better for testing pruning)
// This processes checkpoints with separate reader and pruner components
indexer
.concurrent_pipeline(
TxDigests,
// ConcurrentConfig default comes with no pruning.
ConcurrentConfig::default(),
)
.await?;
println!("Starting ClickHouse Sui indexer...");
// Start the indexer and wait for it to complete
match indexer.run().await?.main().await {
Ok(()) | Err(Error::Terminated) => Ok(()),
Err(Error::Aborted) => {
bail!("Indexer aborted due to an unexpected error")
}
Err(Error::Task(e)) => {
bail!(e)
}
}
}
Deserializing Move events
Move smart contract가 Sui에서 실행되면 sui::event module을 사용해 event를 발생시킬 수 있다.
이 event는 checkpoint 안에 BCS-serialized bytes로 저장되므로, indexer는 의미 있는 데이터를 추출하기 위해 이를 deserialize해야 한다.
Why deserialization is needed
Move contract는 다음과 같은 event를 발생시킨다:
// Move smart contract
use sui::event;
public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}
checkpoint 데이터에서 이 event는 raw BCS bytes로 도착하므로, 처리하려면 다시 Rust struct로 변환해야 한다.
Step-by-step deserialization
-
BCS dependency를 추가한다.
[dependencies]
bcs = "0.1.6"
serde = { version = "1.0", features = ["derive"] } -
Rust에서
Eventstruct를 정의한다.Move에 선언한 것과 같은 구조를 Rust에 정의한다. 수동으로 할 수도 있고,
move-binding을 사용해 온체인 package에서 자동 생성할 수도 있다.use serde::Deserialize;
use sui_indexer_alt_framework::types::::base_types::ObjectID;
#[derive(Deserialize, Debug)]
struct BalanceEvent {
balance_manager_id: ObjectID,
asset: ObjectID,
amount: u64,
deposit: bool,
}importantfield 순서와 type은 Move event와 정확히 일치해야 한다.
-
processor에서 event byte를 추출한다.
impl Processor for YourHandler {
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
let mut results = Vec::new();
for transaction in &checkpoint.transactions {
for event in &transaction.events {
// raw BCS bytes를 가져온다
let event_bytes = &event.contents;
// Rust struct로 deserialize한다
if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
// 작업을 수행한다
}
}
}
Ok(results)
}
}