본문으로 건너뛰기

자체 스토어 사용하기 (BYOS)

IndexerCluster는 PostgreSQL로 시작하기에 편리한 방법을 제공하지만, 다른 database나 storage system을 사용하고 싶을 수도 있다. 이를 위해서는 수동 Indexer class를 사용하고 sui-indexer-alt-framework-store-traits의 custom StoreConnection trait를 구현해야 한다.

Click to open

lib.rs in sui-indexer-alt-framework-store-traits

/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send {
/// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
/// Returns the committer watermark `checkpoint_hi_inclusive`.
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>>;

/// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
/// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
/// from the `Store`. The indexer fetches this value for each pipeline added to determine which
/// checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<CommitterWatermark>>;

/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
/// to determine the new `reader_lo` or inclusive lower bound of available data.
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>>;

/// Get the bounds for the region that the pruner is allowed to prune, and the time in
/// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
/// minimizes the possibility for the pruner to delete data still expected by inflight read
/// requests.
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<PrunerWatermark>>;

/// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
/// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
/// long as it raises the watermark stored in the database. Returns a boolean indicating whether
/// the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;

/// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
/// will reference this as the inclusive lower bound of available data for the corresponding
/// pipeline.
///
/// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
/// the watermark entry to the current time. Ideally, this would be from the perspective of the
/// store. If this is not possible, then it should come from some other common source of time
/// between the indexer and its readers. This timestamp is critical to the indexer's operations,
/// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
/// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
/// before beginning to prune.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool>;

/// Update the pruner watermark, returns true if the watermark was actually updated.
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool>;
}

/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;

/// Delimiter used to separate pipeline names from task identifiers when reading or writing the
/// committer watermark.
const DELIMITER: &'static str = "@";

async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}

When to use BYOS:

  • Different database: MongoDB, CouchDB, 또는 그 밖의 비-PostgreSQL database를 사용할 때이다. 기본 Diesel ORM 없이 PostgreSQL을 사용하고 싶을 때도 여기에 해당한다.
  • Custom requirements: 특수한 storage logic, partitioning, 또는 성능 최적화가 필요할 때이다.

Core implementation requirements

BYOS를 구현하려면 다음이 필요하다:

  1. connection을 관리하는 StoreConnection struct를 정의한다.
  2. connection 관리를 위해 Store trait를 구현한다.
  3. pipeline coordination을 위한 watermark 연산에 대해 Connection trait를 구현한다.
  4. IndexerCluster 대신 수동 Indexer를 사용한다.

###step Define your store structure

use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;

#[derive(Clone)]
pub struct MyCustomStore {
// 데이터베이스 connection 상세 정보
connection_pool: MyDatabasePool,
config: MyConfig,
}

pub struct MyCustomConnection<'a> {
// connection instance
conn: MyDatabaseConnection<'a>,
}

###step Implement the Store trait

Store trait는 connection lifecycle을 관리한다:

#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;

async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// 구현 내용
}
}

###step Implement the Connection trait

Connection trait는 pipeline coordination을 위한 watermark 연산을 처리한다:

#[async_trait]
impl Connection for MyCustomConnection<'_> {
// pipeline이 처리한 가장 높은 checkpoint를 가져온다
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// watermark 데이터를 위해 데이터베이스를 질의한다
todo!("스토리지 시스템에 맞게 구현")
}

// reader가 사용할 수 있는 가장 낮은 checkpoint를 가져온다
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// 구현은 데이터베이스 스키마에 따라 달라진다
todo!("스토리지 시스템에 맞게 구현")
}

// 필요한 다른 method를 구현한다...
}

완전한 reference는 Connection에 대한 sui-pg-db 구현을 살펴본다:

#[async_trait]
impl store::Connection for Connection<'_> {
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>> {
let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
// Do not create a watermark record with checkpoint_hi_inclusive = -1.
return Ok(self
.committer_watermark(pipeline_task)
.await?
.map(|w| w.checkpoint_hi_inclusive));
};

let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
reader_lo: default_next_checkpoint as i64,
pruner_timestamp: Utc::now().naive_utc(),
pruner_hi: default_next_checkpoint as i64,
};

use diesel::pg::upsert::excluded;
let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
// Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
.do_update()
// When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
// `excluded` is a virtual table containing the existing row that there was a conflict with.
.set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
.returning(watermarks::checkpoint_hi_inclusive)
.get_result(self)
.await?;

Ok(Some(checkpoint_hi_inclusive as u64))
}

async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<store::CommitterWatermark>> {
let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
.select((
watermarks::epoch_hi_inclusive,
watermarks::checkpoint_hi_inclusive,
watermarks::tx_hi,
watermarks::timestamp_ms_hi_inclusive,
))
.filter(watermarks::pipeline.eq(pipeline_task))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::CommitterWatermark {
epoch_hi_inclusive: watermark.0 as u64,
checkpoint_hi_inclusive: watermark.1 as u64,
tx_hi: watermark.2 as u64,
timestamp_ms_hi_inclusive: watermark.3 as u64,
}))
} else {
Ok(None)
}
}

async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::ReaderWatermark>> {
let watermark: Option<(i64, i64)> = watermarks::table
.select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::ReaderWatermark {
checkpoint_hi_inclusive: watermark.0 as u64,
reader_lo: watermark.1 as u64,
}))
} else {
Ok(None)
}
}

async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<store::PrunerWatermark>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql!(as BigInt,
"CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis() as i64,
);

let watermark: Option<(i64, i64, i64)> = watermarks::table
.select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::PrunerWatermark {
wait_for_ms: watermark.0,
pruner_hi: watermark.1 as u64,
reader_lo: watermark.2 as u64,
}))
} else {
Ok(None)
}
}

async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: store::CommitterWatermark,
) -> anyhow::Result<bool> {
// Create a StoredWatermark directly from CommitterWatermark
let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
tx_hi: watermark.tx_hi as i64,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
reader_lo: 0,
pruner_timestamp: DateTime::UNIX_EPOCH.naive_utc(),
pruner_hi: 0,
};

use diesel::query_dsl::methods::FilterDsl;
Ok(diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
watermarks::tx_hi.eq(stored_watermark.tx_hi),
watermarks::timestamp_ms_hi_inclusive
.eq(stored_watermark.timestamp_ms_hi_inclusive),
))
.filter(
watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
)
.execute(self)
.await?
> 0)
}

async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set((
watermarks::reader_lo.eq(reader_lo as i64),
watermarks::pruner_timestamp.eq(diesel::dsl::now),
))
.filter(watermarks::pipeline.eq(pipeline))
.filter(watermarks::reader_lo.lt(reader_lo as i64))
.execute(self)
.await?
> 0)
}

async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(pruner_hi as i64))
.filter(watermarks::pipeline.eq(pipeline))
.execute(self)
.await?
> 0)
}
}

###step Use manual indexer

IndexerCluster를 수동 Indexer로 교체한다:

use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{
ClientArgs, IngestionConfig,
ingestion_client::IngestionClientArgs,
};

async fn main() -> anyhow::Result<()> {
// custom store를 초기화한다
let store = MyCustomStore::new(config).await?;

// indexer를 수동으로 구성한다
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
ingestion: IngestionClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
..Default::default()
},
..Default::default()
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;

// pipeline을 추가한다
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;

// indexer를 시작한다
indexer.run().await?;
Ok(())
}

Example: ClickHouse implementation

ClickHouse(분석용 고성능 columnar database)를 사용하는 완전한 BYOS working example은 example project in the Sui repo를 참조한다.

Click to open

ClickHouse example README

ClickHouse Sui Indexer

A simple example of how to build a custom Sui indexer that writes transaction data to ClickHouse.

Quick Start

1. Start ClickHouse

docker run -d --name clickhouse-dev -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse/clickhouse-server

2. Set up database user

docker exec clickhouse-dev clickhouse-client --query "CREATE USER IF NOT EXISTS dev IDENTIFIED WITH no_password"
docker exec clickhouse-dev clickhouse-client --query "GRANT CREATE, INSERT, SELECT, ALTER, UPDATE, DELETE ON *.* TO dev"

3. Run the indexer

cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --last-checkpoint=10

That’s it! The indexer will:

  • Create the necessary tables automatically
  • Fetch checkpoints from the Sui testnet
  • Write transaction data to ClickHouse

Verify Data

Check that data was written:

docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT COUNT(*) FROM transactions"
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT * FROM transactions LIMIT 5"

Clean Up

Stop and remove the ClickHouse container:

docker stop clickhouse-dev && docker rm clickhouse-dev

What This Example Shows

  • Custom Store Implementation: How to implement the Store trait for ClickHouse
  • Concurrent Pipeline: Uses the concurrent pipeline for better pruning and watermark testing
  • Watermark Management: Tracking indexer progress with committer, reader, and pruner watermarks
  • Transaction Processing: Extracting and storing transaction digests from checkpoints
  • Simple Setup: Minimal configuration for local development

Architecture

Sui Network → Checkpoints → Concurrent Pipeline → ClickHouse Store → ClickHouse DB

The indexer uses a concurrent pipeline that processes checkpoints out-of-order with separate reader, committer, and pruner components. This is ideal for testing watermark functionality and pruning behavior.

이 예시는 다음을 보여준다:

  • Custom store implementation using the ClickHouse Rust client.
  • Watermark persistence with ClickHouse-specific SQL syntax.
  • Transaction digest indexing similar to the built-in PostgreSQL handler.

예시에는 3개의 주요 구성 요소가 포함되어 있다:

  1. store.rs - StoreConnection trait를 구현하는 ClickHouseStore.

    Click to open

    store.rs

    use anyhow::Result;
    use async_trait::async_trait;
    use chrono::Utc;
    use clickhouse::{Client, Row};
    use scoped_futures::ScopedBoxFuture;
    use serde::{Deserialize, Serialize};
    use std::time::Duration;
    use sui_indexer_alt_framework::store::{
    CommitterWatermark, Connection, PrunerWatermark, ReaderWatermark, Store, TransactionalStore,
    };
    use url::Url;

    #[derive(Clone)]
    pub struct ClickHouseStore {
    client: Client,
    }

    pub struct ClickHouseConnection {
    pub client: Client,
    }

    /// Row structure for watermark table operations
    #[derive(Row, Serialize, Deserialize, Debug, Default)]
    struct WatermarkRow {
    pipeline_task: String,
    epoch_hi_inclusive: u64,
    checkpoint_hi_inclusive: u64,
    tx_hi: u64,
    timestamp_ms_hi_inclusive: u64,
    reader_lo: u64,
    pruner_hi: u64,
    pruner_timestamp: u64, // Unix timestamp in milliseconds
    }

    impl ClickHouseStore {
    pub fn new(url: Url) -> Self {
    let client = Client::default()
    .with_url(url.as_str())
    .with_user("dev") // Simple user for local development
    .with_compression(clickhouse::Compression::Lz4);
    Self { client }
    }

    /// Create tables if they don't exist
    pub async fn create_tables_if_not_exists(&self) -> Result<()> {
    // Create watermarks table for pipeline state management
    self.client
    .query(
    "
    CREATE TABLE IF NOT EXISTS watermarks
    (
    pipeline String,
    epoch_hi_inclusive UInt64,
    checkpoint_hi_inclusive UInt64,
    tx_hi UInt64,
    timestamp_ms_hi_inclusive UInt64,
    reader_lo UInt64,
    pruner_hi UInt64,
    pruner_timestamp UInt64
    )
    ENGINE = MergeTree()
    ORDER BY pipeline
    ",
    )
    .execute()
    .await?;

    // Create transactions table for the actual indexing data
    self.client
    .query(
    "
    CREATE TABLE IF NOT EXISTS transactions
    (
    checkpoint_sequence_number UInt64,
    transaction_digest String,
    indexed_at DateTime64(3, 'UTC') DEFAULT now()
    )
    ENGINE = MergeTree()
    ORDER BY checkpoint_sequence_number
    ",
    )
    .execute()
    .await?;

    Ok(())
    }
    }

    #[async_trait]
    impl Store for ClickHouseStore {
    type Connection<'c> = ClickHouseConnection;

    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>> {
    Ok(ClickHouseConnection {
    client: self.client.clone(),
    })
    }
    }

    #[async_trait]
    impl TransactionalStore for ClickHouseStore {
    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
    where
    R: Send + 'a,
    F: Send + 'a,
    F: for<'r> FnOnce(
    &'r mut Self::Connection<'_>,
    ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
    {
    let mut conn = self.connect().await?;
    f(&mut conn).await
    }
    }

    #[async_trait]
    impl Connection for ClickHouseConnection {
    async fn init_watermark(
    &mut self,
    pipeline_task: &str,
    default_next_checkpoint: u64,
    ) -> anyhow::Result<Option<u64>> {
    let existing = self.committer_watermark(pipeline_task).await?;

    let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
    return Ok(existing.map(|w| w.checkpoint_hi_inclusive));
    };

    if let Some(existing) = existing {
    return Ok(Some(existing.checkpoint_hi_inclusive));
    }

    let mut inserter = self.client.inserter("watermarks")?;
    inserter.write(&WatermarkRow {
    pipeline_task: pipeline_task.to_string(),
    epoch_hi_inclusive: 0,
    checkpoint_hi_inclusive,
    tx_hi: 0,
    timestamp_ms_hi_inclusive: 0,
    reader_lo: default_next_checkpoint,
    pruner_hi: default_next_checkpoint,
    pruner_timestamp: 0,
    })?;

    inserter.end().await?;
    Ok(Some(checkpoint_hi_inclusive))
    }

    async fn committer_watermark(&mut self, pipeline: &str) -> Result<Option<CommitterWatermark>> {
    let mut cursor = self
    .client
    .query(
    "SELECT epoch_hi_inclusive, checkpoint_hi_inclusive, tx_hi, timestamp_ms_hi_inclusive
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1"
    )
    .bind(pipeline)
    .fetch::<(u64, u64, u64, u64)>()?;

    let row: Option<(u64, u64, u64, u64)> = cursor.next().await?;
    Ok(row.map(
    |(epoch_hi, checkpoint_hi, tx_hi, timestamp_hi)| CommitterWatermark {
    epoch_hi_inclusive: epoch_hi,
    checkpoint_hi_inclusive: checkpoint_hi,
    tx_hi,
    timestamp_ms_hi_inclusive: timestamp_hi,
    },
    ))
    }

    async fn reader_watermark(
    &mut self,
    pipeline: &'static str,
    ) -> Result<Option<ReaderWatermark>> {
    let mut cursor = self
    .client
    .query(
    "SELECT checkpoint_hi_inclusive, reader_lo
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1",
    )
    .bind(pipeline)
    .fetch::<(u64, u64)>()?;

    let row: Option<(u64, u64)> = cursor.next().await?;
    Ok(row.map(|(checkpoint_hi, reader_lo)| ReaderWatermark {
    checkpoint_hi_inclusive: checkpoint_hi,
    reader_lo,
    }))
    }

    async fn pruner_watermark(
    &mut self,
    pipeline: &'static str,
    delay: Duration,
    ) -> Result<Option<PrunerWatermark>> {
    // Follow PostgreSQL pattern: calculate wait_for_ms on database side
    // We do this so that we can rely on the database to keep a consistent sense of time.
    // Using own clocks can potentially be subject to some clock skew.
    let delay_ms = delay.as_millis() as i64;
    let mut cursor = self
    .client
    .query(
    "SELECT reader_lo, pruner_hi,
    toInt64(? + (pruner_timestamp - toUnixTimestamp64Milli(now64()))) as wait_for_ms
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1"
    )
    .bind(delay_ms)
    .bind(pipeline)
    .fetch::<(u64, u64, i64)>()?;

    let row: Option<(u64, u64, i64)> = cursor.next().await?;
    Ok(
    row.map(|(reader_lo, pruner_hi, wait_for_ms)| PrunerWatermark {
    wait_for_ms,
    reader_lo,
    pruner_hi,
    }),
    )
    }

    async fn set_committer_watermark(
    &mut self,
    pipeline: &str,
    watermark: CommitterWatermark,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: check if row exists, then UPDATE or INSERT accordingly

    // First check if pipeline exists and get current checkpoint
    let mut cursor = self
    .client
    .query("SELECT checkpoint_hi_inclusive FROM watermarks WHERE pipeline = ? LIMIT 1")
    .bind(pipeline)
    .fetch::<u64>()?;

    let existing_checkpoint: Option<u64> = cursor.next().await?;

    if let Some(existing_checkpoint) = existing_checkpoint {
    // Row exists - only update if checkpoint advances
    if existing_checkpoint < watermark.checkpoint_hi_inclusive {
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE
    epoch_hi_inclusive = ?,
    checkpoint_hi_inclusive = ?,
    tx_hi = ?,
    timestamp_ms_hi_inclusive = ?
    WHERE pipeline = ?",
    )
    .bind(watermark.epoch_hi_inclusive)
    .bind(watermark.checkpoint_hi_inclusive)
    .bind(watermark.tx_hi)
    .bind(watermark.timestamp_ms_hi_inclusive)
    .bind(pipeline)
    .execute()
    .await?;
    }
    } else {
    // No existing row - insert new one
    let mut inserter = self.client.inserter("watermarks")?;
    inserter.write(&WatermarkRow {
    pipeline_task: pipeline.to_string(),
    epoch_hi_inclusive: watermark.epoch_hi_inclusive,
    checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
    tx_hi: watermark.tx_hi,
    timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
    reader_lo: 0, // Will be updated by reader
    pruner_hi: 0, // Will be updated by pruner
    pruner_timestamp: Utc::now().timestamp_millis() as u64,
    })?;
    inserter.end().await?;
    }

    Ok(true)
    }

    async fn set_reader_watermark(
    &mut self,
    pipeline: &'static str,
    reader_lo: u64,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: simple UPDATE with timestamp update and advancement check
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE reader_lo = ?, pruner_timestamp = toUnixTimestamp64Milli(now64())
    WHERE pipeline = ? AND reader_lo < ?",
    )
    .bind(reader_lo)
    .bind(pipeline)
    .bind(reader_lo)
    .execute()
    .await?;

    Ok(true)
    }

    async fn set_pruner_watermark(
    &mut self,
    pipeline: &'static str,
    pruner_hi: u64,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: simple UPDATE statement
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE pruner_hi = ?
    WHERE pipeline = ?",
    )
    .bind(pruner_hi)
    .bind(pipeline)
    .execute()
    .await?;

    Ok(true)
    }
    }
  2. handlers.rs - checkpoint 데이터를 처리하는 TxDigest handler.

    Click to open

    handlers.rs

    use anyhow::Result;
    use clickhouse::Row;
    use serde::Serialize;
    use std::sync::Arc;

    use sui_indexer_alt_framework::{
    FieldCount,
    pipeline::{
    Processor,
    concurrent::{BatchStatus, Handler},
    },
    store::Store,
    types::full_checkpoint_content::Checkpoint,
    };

    use crate::store::ClickHouseStore;

    /// Structure representing a transaction digest record in ClickHouse
    /// Aligned with sui-indexer-alt's StoredTxDigest structure
    #[derive(Row, Serialize, Clone, Debug, FieldCount)]
    pub struct StoredTxDigest {
    pub tx_sequence_number: i64,
    pub tx_digest: Vec<u8>,
    }

    /// Handler that processes checkpoint data and extracts transaction digests
    /// Named to align with sui-indexer-alt's TxDigests handler
    #[derive(Clone, Default)]
    pub struct TxDigests;

    #[async_trait::async_trait]
    impl Processor for TxDigests {
    const NAME: &'static str = "tx_digests";
    type Value = StoredTxDigest;

    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
    let Checkpoint {
    transactions,
    summary,
    ..
    } = checkpoint.as_ref();

    let first_tx = summary.network_total_transactions as usize - transactions.len();

    Ok(transactions
    .iter()
    .enumerate()
    .map(|(i, tx)| StoredTxDigest {
    tx_sequence_number: (first_tx + i) as i64,
    tx_digest: tx.transaction.digest().inner().to_vec(),
    })
    .collect())
    }
    }

    #[async_trait::async_trait]
    impl Handler for TxDigests {
    type Store = ClickHouseStore;
    type Batch = Vec<Self::Value>;

    fn batch(
    &self,
    batch: &mut Self::Batch,
    values: &mut std::vec::IntoIter<Self::Value>,
    ) -> BatchStatus {
    batch.extend(values);
    BatchStatus::Pending
    }

    async fn commit<'a>(
    &self,
    values: &Self::Batch,
    conn: &mut <Self::Store as Store>::Connection<'a>,
    ) -> Result<usize> {
    let row_count = values.len();
    if row_count == 0 {
    return Ok(0);
    }

    // Use ClickHouse inserter for efficient bulk inserts
    let mut inserter = conn.client.inserter("tx_digests")?;
    for tx_digest in values {
    inserter.write(tx_digest)?;
    }
    inserter.end().await?;

    Ok(row_count)
    }
    }
  3. main.rs - ClickHouse backend를 사용하는 수동 indexer 설정.

    Click to open

    main.rs

    mod handlers;
    mod store;

    use anyhow::{Result, bail};
    use clap::Parser;
    use sui_indexer_alt_framework::{
    Indexer, IndexerArgs,
    ingestion::{ClientArgs, IngestionConfig},
    pipeline::concurrent::ConcurrentConfig,
    service::Error,
    };
    use url::Url;

    use handlers::TxDigests;
    use store::ClickHouseStore;

    #[derive(clap::Parser, Debug, Clone)]
    struct Args {
    #[clap(flatten)]
    pub indexer_args: IndexerArgs,

    #[clap(flatten)]
    pub client_args: ClientArgs,
    }

    #[tokio::main]
    async fn main() -> Result<()> {
    // Initialize crypto provider for HTTPS connections (needed for remote checkpoint fetching)
    rustls::crypto::ring::default_provider()
    .install_default()
    .expect("Failed to install crypto provider");

    // Parse command-line arguments
    let args = Args::parse();

    // ClickHouse connection (uses 'dev' user by default for local development)
    let clickhouse_url = "http://localhost:8123".parse::<Url>()?;

    println!("Connecting to ClickHouse at: {}", clickhouse_url);

    // Create our custom ClickHouse store
    let store = ClickHouseStore::new(clickhouse_url);

    // Ensure the database tables are created before starting the indexer
    store.create_tables_if_not_exists().await?;

    // Manually build the indexer with our custom ClickHouse store
    // This is the key difference from basic-sui-indexer which uses IndexerCluster::builder()
    let mut indexer = Indexer::new(
    store.clone(),
    args.indexer_args,
    args.client_args,
    IngestionConfig::default(),
    None, // No metrics prefix
    &Default::default(), // Empty prometheus registry
    )
    .await?;

    // Register our concurrent pipeline handler (better for testing pruning)
    // This processes checkpoints with separate reader and pruner components
    indexer
    .concurrent_pipeline(
    TxDigests,
    // ConcurrentConfig default comes with no pruning.
    ConcurrentConfig::default(),
    )
    .await?;

    println!("Starting ClickHouse Sui indexer...");

    // Start the indexer and wait for it to complete
    match indexer.run().await?.main().await {
    Ok(()) | Err(Error::Terminated) => Ok(()),
    Err(Error::Aborted) => {
    bail!("Indexer aborted due to an unexpected error")
    }
    Err(Error::Task(e)) => {
    bail!(e)
    }
    }
    }

Deserializing Move events

Move smart contract가 Sui에서 실행되면 sui::event module을 사용해 event를 발생시킬 수 있다. 이 event는 checkpoint 안에 BCS-serialized bytes로 저장되므로, indexer는 의미 있는 데이터를 추출하기 위해 이를 deserialize해야 한다.

Why deserialization is needed

Move contract는 다음과 같은 event를 발생시킨다:

// Move smart contract
use sui::event;

public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}

checkpoint 데이터에서 이 event는 raw BCS bytes로 도착하므로, 처리하려면 다시 Rust struct로 변환해야 한다.

Step-by-step deserialization

  1. BCS dependency를 추가한다.

    [dependencies]
    bcs = "0.1.6"
    serde = { version = "1.0", features = ["derive"] }
  2. Rust에서 Event struct를 정의한다.

    Move에 선언한 것과 같은 구조를 Rust에 정의한다. 수동으로 할 수도 있고, move-binding을 사용해 온체인 package에서 자동 생성할 수도 있다.

    use serde::Deserialize;
    use sui_indexer_alt_framework::types::::base_types::ObjectID;

    #[derive(Deserialize, Debug)]
    struct BalanceEvent {
    balance_manager_id: ObjectID,
    asset: ObjectID,
    amount: u64,
    deposit: bool,
    }
    important

    field 순서와 type은 Move event와 정확히 일치해야 한다.

  3. processor에서 event byte를 추출한다.

    impl Processor for YourHandler {
    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
    let mut results = Vec::new();

    for transaction in &checkpoint.transactions {
    for event in &transaction.events {
    // raw BCS bytes를 가져온다
    let event_bytes = &event.contents;

    // Rust struct로 deserialize한다
    if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
    // 작업을 수행한다
    }
    }
    }

    Ok(results)
    }
    }