자체 스토어 사용하기 (BYOS)
IndexerCluster는 PostgreSQL로 시작하기에 편리한 방법을 제공하지만, 다른 database나 storage system을 사용하고 싶을 수도 있다.
이를 위해서는 수동 Indexer class를 사용하고 sui-indexer-alt-framework-store-traits의 custom Store와 Connection trait를 구현해야 한다.
lib.rs in sui-indexer-alt-framework-store-traits
lib.rs in sui-indexer-alt-framework-store-traits/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send {
/// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
/// Returns the committer watermark `checkpoint_hi_inclusive`.
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>>;
/// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
/// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
/// from the `Store`. The indexer fetches this value for each pipeline added to determine which
/// checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<CommitterWatermark>>;
/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
/// to determine the new `reader_lo` or inclusive lower bound of available data.
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>>;
/// Get the bounds for the region that the pruner is allowed to prune, and the time in
/// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
/// minimizes the possibility for the pruner to delete data still expected by inflight read
/// requests.
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<PrunerWatermark>>;
/// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
/// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
/// long as it raises the watermark stored in the database. Returns a boolean indicating whether
/// the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;
/// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
/// will reference this as the inclusive lower bound of available data for the corresponding
/// pipeline.
///
/// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
/// the watermark entry to the current time. Ideally, this would be from the perspective of the
/// store. If this is not possible, then it should come from some other common source of time
/// between the indexer and its readers. This timestamp is critical to the indexer's operations,
/// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
/// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
/// before beginning to prune.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool>;
/// Update the pruner watermark, returns true if the watermark was actually updated.
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool>;
}
/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;
/// Delimiter used to separate pipeline names from task identifiers when reading or writing the
/// committer watermark.
const DELIMITER: &'static str = "@";
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}
When to use BYOS:
- Different database: MongoDB, CouchDB, 또는 그 밖의 비-PostgreSQL database를 사용할 때이다. 기본 Diesel ORM 없이 PostgreSQL을 사용하고 싶을 때도 여기에 해당한다.
- Custom requirements: 특수한 storage logic, partitioning, 또는 성능 최적화가 필요할 때이다.
Core implementation requirements
BYOS를 구현하려면 다음이 필요하다:
- connection을 관리하는
Store와Connectionstruct를 정의한다. - connection 관리를 위해
Storetrait를 구현한다. - pipeline coordination을 위한 watermark 연산에 대해
Connectiontrait를 구현한다. IndexerCluster대신 수동Indexer를 사용한다.
###step Define your store structure
use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;
#[derive(Clone)]
pub struct MyCustomStore {
// 데이터베이스 connection 상세 정보
connection_pool: MyDatabasePool,
config: MyConfig,
}
pub struct MyCustomConnection<'a> {
// connection instance
conn: MyDatabaseConnection<'a>,
}
###step Implement the Store trait
Store trait는 connection lifecycle을 관리한다:
#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;
async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// 구현 내용
}
}
###step Implement the Connection trait
Connection trait는 pipeline coordination을 위한 watermark 연산을 처리한다:
#[async_trait]
impl Connection for MyCustomConnection<'_> {
// pipeline이 처리한 가장 높은 checkpoint를 가져온다
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// watermark 데이터를 위해 데이터베이스를 질의한다
todo!("스토리지 시스템에 맞게 구현")
}
// reader가 사용할 수 있는 가장 낮은 checkpoint를 가져온다
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// 구현은 데이터베이스 스키마에 따라 달라진다
todo!("스토리지 시스템에 맞게 구현")
}
// 필요한 다른 method를 구현한다...
}
완전한 reference는 Connection에 대한 sui-pg-db 구현을 살펴본다:
#[async_trait]
impl store::Connection for Connection<'_> {
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>> {
let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
// Do not create a watermark record with checkpoint_hi_inclusive = -1.
return Ok(self
.committer_watermark(pipeline_task)
.await?
.map(|w| w.checkpoint_hi_inclusive));
};
let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
reader_lo: default_next_checkpoint as i64,
pruner_timestamp: Utc::now().naive_utc(),
pruner_hi: default_next_checkpoint as i64,
};
use diesel::pg::upsert::excluded;
let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
// Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
.do_update()
// When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
// `excluded` is a virtual table containing the existing row that there was a conflict with.
.set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
.returning(watermarks::checkpoint_hi_inclusive)
.get_result(self)
.await?;
Ok(Some(checkpoint_hi_inclusive as u64))
}
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<store::CommitterWatermark>> {
let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
.select((
watermarks::epoch_hi_inclusive,
watermarks::checkpoint_hi_inclusive,
watermarks::tx_hi,
watermarks::timestamp_ms_hi_inclusive,
))
.filter(watermarks::pipeline.eq(pipeline_task))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::CommitterWatermark {
epoch_hi_inclusive: watermark.0 as u64,
checkpoint_hi_inclusive: watermark.1 as u64,
tx_hi: watermark.2 as u64,
timestamp_ms_hi_inclusive: watermark.3 as u64,
}))
} else {
Ok(None)
}
}
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::ReaderWatermark>> {
let watermark: Option<(i64, i64)> = watermarks::table
.select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::ReaderWatermark {
checkpoint_hi_inclusive: watermark.0 as u64,
reader_lo: watermark.1 as u64,
}))
} else {
Ok(None)
}
}
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<store::PrunerWatermark>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql!(as BigInt,
"CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis() as i64,
);
let watermark: Option<(i64, i64, i64)> = watermarks::table
.select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;
if let Some(watermark) = watermark {
Ok(Some(store::PrunerWatermark {
wait_for_ms: watermark.0,
pruner_hi: watermark.1 as u64,
reader_lo: watermark.2 as u64,
}))
} else {
Ok(None)
}
}
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: store::CommitterWatermark,
) -> anyhow::Result<bool> {
// Create a StoredWatermark directly from CommitterWatermark
let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
tx_hi: watermark.tx_hi as i64,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
reader_lo: 0,
pruner_timestamp: DateTime::UNIX_EPOCH.naive_utc(),
pruner_hi: 0,
};
use diesel::query_dsl::methods::FilterDsl;
Ok(diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
watermarks::tx_hi.eq(stored_watermark.tx_hi),
watermarks::timestamp_ms_hi_inclusive
.eq(stored_watermark.timestamp_ms_hi_inclusive),
))
.filter(
watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
)
.execute(self)
.await?
> 0)
}
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set((
watermarks::reader_lo.eq(reader_lo as i64),
watermarks::pruner_timestamp.eq(diesel::dsl::now),
))
.filter(watermarks::pipeline.eq(pipeline))
.filter(watermarks::reader_lo.lt(reader_lo as i64))
.execute(self)
.await?
> 0)
}
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(pruner_hi as i64))
.filter(watermarks::pipeline.eq(pipeline))
.execute(self)
.await?
> 0)
}
}
###step Use manual indexer
IndexerCluster를 수동 Indexer로 교체한다:
use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{
ClientArgs, IngestionConfig,
ingestion_client::IngestionClientArgs,
};
async fn main() -> anyhow::Result<()> {
// custom store를 초기화한다
let store = MyCustomStore::new(config).await?;
// indexer를 수동으로 구성한다
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
ingestion: IngestionClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
..Default::default()
},
..Default::default()
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;
// pipeline을 추가한다
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;
// indexer를 시작한다
indexer.run().await?;
Ok(())
}
Example: ClickHouse implementation
ClickHouse(분석용 고성능 columnar database)를 사용하는 완전한 BYOS working example은 example project in the Sui repo를 참조한다.
ClickHouse example README
ClickHouse Sui Indexer
A simple example of how to build a custom Sui indexer that writes transaction data to ClickHouse.
Quick Start
1. Start ClickHouse
docker run -d --name clickhouse-dev -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse/clickhouse-server
2. Set up database user
docker exec clickhouse-dev clickhouse-client --query "CREATE USER IF NOT EXISTS dev IDENTIFIED WITH no_password"
docker exec clickhouse-dev clickhouse-client --query "GRANT CREATE, INSERT, SELECT, ALTER, UPDATE, DELETE ON *.* TO dev"
3. Run the indexer
cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --last-checkpoint=10
That’s it! The indexer will:
- Create the necessary tables automatically
- Fetch checkpoints from the Sui testnet
- Write transaction data to ClickHouse
Verify Data
Check that data was written:
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT COUNT(*) FROM transactions"
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT * FROM transactions LIMIT 5"
Clean Up
Stop and remove the ClickHouse container:
docker stop clickhouse-dev && docker rm clickhouse-dev
What This Example Shows
- Custom Store Implementation: How to implement the
Storetrait for ClickHouse - Concurrent Pipeline: Uses the concurrent pipeline for better pruning and watermark testing
- Watermark Management: Tracking indexer progress with committer, reader, and pruner watermarks
- Transaction Processing: Extracting and storing transaction digests from checkpoints
- Simple Setup: Minimal configuration for local development
Architecture
Sui Network → Checkpoints → Concurrent Pipeline → ClickHouse Store → ClickHouse DB
The indexer uses a concurrent pipeline that processes checkpoints out-of-order with separate reader, committer, and pruner components. This is ideal for testing watermark functionality and pruning behavior.
이 예시는 다음을 보여준다:
- Custom store implementation using the ClickHouse Rust client.
- Watermark persistence with ClickHouse-specific SQL syntax.
- Transaction digest indexing similar to the built-in PostgreSQL handler.
예시에는 3개의 주요 구성 요소가 포함되어 있다:
-
store.rs-Store와Connectiontrait를 구현하는 ClickHouseStore.Click to openstore.rsuse anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use clickhouse::{Client, Row};
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use sui_indexer_alt_framework::store::{
CommitterWatermark, Connection, PrunerWatermark, ReaderWatermark, Store, TransactionalStore,
};
use url::Url;
#[derive(Clone)]
pub struct ClickHouseStore {
client: Client,
}
pub struct ClickHouseConnection {
pub client: Client,
}
/// Row structure for watermark table operations
#[derive(Row, Serialize, Deserialize, Debug, Default)]
struct WatermarkRow {
pipeline_task: String,
epoch_hi_inclusive: u64,
checkpoint_hi_inclusive: u64,
tx_hi: u64,
timestamp_ms_hi_inclusive: u64,
reader_lo: u64,
pruner_hi: u64,
pruner_timestamp: u64, // Unix timestamp in milliseconds
}
impl ClickHouseStore {
pub fn new(url: Url) -> Self {
let client = Client::default()
.with_url(url.as_str())
.with_user("dev") // Simple user for local development
.with_compression(clickhouse::Compression::Lz4);
Self { client }
}
/// Create tables if they don't exist
pub async fn create_tables_if_not_exists(&self) -> Result<()> {
// Create watermarks table for pipeline state management
self.client
.query(
"
CREATE TABLE IF NOT EXISTS watermarks
(
pipeline String,
epoch_hi_inclusive UInt64,
checkpoint_hi_inclusive UInt64,
tx_hi UInt64,
timestamp_ms_hi_inclusive UInt64,
reader_lo UInt64,
pruner_hi UInt64,
pruner_timestamp UInt64
)
ENGINE = MergeTree()
ORDER BY pipeline
",
)
.execute()
.await?;
// Create transactions table for the actual indexing data
self.client
.query(
"
CREATE TABLE IF NOT EXISTS transactions
(
checkpoint_sequence_number UInt64,
transaction_digest String,
indexed_at DateTime64(3, 'UTC') DEFAULT now()
)
ENGINE = MergeTree()
ORDER BY checkpoint_sequence_number
",
)
.execute()
.await?;
Ok(())
}
}
#[async_trait]
impl Store for ClickHouseStore {
type Connection<'c> = ClickHouseConnection;
async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>> {
Ok(ClickHouseConnection {
client: self.client.clone(),
})
}
}
#[async_trait]
impl TransactionalStore for ClickHouseStore {
async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
where
R: Send + 'a,
F: Send + 'a,
F: for<'r> FnOnce(
&'r mut Self::Connection<'_>,
) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
{
let mut conn = self.connect().await?;
f(&mut conn).await
}
}
#[async_trait]
impl Connection for ClickHouseConnection {
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>> {
let existing = self.committer_watermark(pipeline_task).await?;
let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
return Ok(existing.map(|w| w.checkpoint_hi_inclusive));
};
if let Some(existing) = existing {
return Ok(Some(existing.checkpoint_hi_inclusive));
}
let mut inserter = self.client.inserter("watermarks")?;
inserter.write(&WatermarkRow {
pipeline_task: pipeline_task.to_string(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
reader_lo: default_next_checkpoint,
pruner_hi: default_next_checkpoint,
pruner_timestamp: 0,
})?;
inserter.end().await?;
Ok(Some(checkpoint_hi_inclusive))
}
async fn committer_watermark(&mut self, pipeline: &str) -> Result<Option<CommitterWatermark>> {
let mut cursor = self
.client
.query(
"SELECT epoch_hi_inclusive, checkpoint_hi_inclusive, tx_hi, timestamp_ms_hi_inclusive
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(pipeline)
.fetch::<(u64, u64, u64, u64)>()?;
let row: Option<(u64, u64, u64, u64)> = cursor.next().await?;
Ok(row.map(
|(epoch_hi, checkpoint_hi, tx_hi, timestamp_hi)| CommitterWatermark {
epoch_hi_inclusive: epoch_hi,
checkpoint_hi_inclusive: checkpoint_hi,
tx_hi,
timestamp_ms_hi_inclusive: timestamp_hi,
},
))
}
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> Result<Option<ReaderWatermark>> {
let mut cursor = self
.client
.query(
"SELECT checkpoint_hi_inclusive, reader_lo
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1",
)
.bind(pipeline)
.fetch::<(u64, u64)>()?;
let row: Option<(u64, u64)> = cursor.next().await?;
Ok(row.map(|(checkpoint_hi, reader_lo)| ReaderWatermark {
checkpoint_hi_inclusive: checkpoint_hi,
reader_lo,
}))
}
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> Result<Option<PrunerWatermark>> {
// Follow PostgreSQL pattern: calculate wait_for_ms on database side
// We do this so that we can rely on the database to keep a consistent sense of time.
// Using own clocks can potentially be subject to some clock skew.
let delay_ms = delay.as_millis() as i64;
let mut cursor = self
.client
.query(
"SELECT reader_lo, pruner_hi,
toInt64(? + (pruner_timestamp - toUnixTimestamp64Milli(now64()))) as wait_for_ms
FROM watermarks
WHERE pipeline = ?
ORDER BY pruner_timestamp DESC
LIMIT 1"
)
.bind(delay_ms)
.bind(pipeline)
.fetch::<(u64, u64, i64)>()?;
let row: Option<(u64, u64, i64)> = cursor.next().await?;
Ok(
row.map(|(reader_lo, pruner_hi, wait_for_ms)| PrunerWatermark {
wait_for_ms,
reader_lo,
pruner_hi,
}),
)
}
async fn set_committer_watermark(
&mut self,
pipeline: &str,
watermark: CommitterWatermark,
) -> Result<bool> {
// Follow PostgreSQL pattern: check if row exists, then UPDATE or INSERT accordingly
// First check if pipeline exists and get current checkpoint
let mut cursor = self
.client
.query("SELECT checkpoint_hi_inclusive FROM watermarks WHERE pipeline = ? LIMIT 1")
.bind(pipeline)
.fetch::<u64>()?;
let existing_checkpoint: Option<u64> = cursor.next().await?;
if let Some(existing_checkpoint) = existing_checkpoint {
// Row exists - only update if checkpoint advances
if existing_checkpoint < watermark.checkpoint_hi_inclusive {
self.client
.query(
"ALTER TABLE watermarks
UPDATE
epoch_hi_inclusive = ?,
checkpoint_hi_inclusive = ?,
tx_hi = ?,
timestamp_ms_hi_inclusive = ?
WHERE pipeline = ?",
)
.bind(watermark.epoch_hi_inclusive)
.bind(watermark.checkpoint_hi_inclusive)
.bind(watermark.tx_hi)
.bind(watermark.timestamp_ms_hi_inclusive)
.bind(pipeline)
.execute()
.await?;
}
} else {
// No existing row - insert new one
let mut inserter = self.client.inserter("watermarks")?;
inserter.write(&WatermarkRow {
pipeline_task: pipeline.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
tx_hi: watermark.tx_hi,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
reader_lo: 0, // Will be updated by reader
pruner_hi: 0, // Will be updated by pruner
pruner_timestamp: Utc::now().timestamp_millis() as u64,
})?;
inserter.end().await?;
}
Ok(true)
}
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE with timestamp update and advancement check
self.client
.query(
"ALTER TABLE watermarks
UPDATE reader_lo = ?, pruner_timestamp = toUnixTimestamp64Milli(now64())
WHERE pipeline = ? AND reader_lo < ?",
)
.bind(reader_lo)
.bind(pipeline)
.bind(reader_lo)
.execute()
.await?;
Ok(true)
}
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> Result<bool> {
// Follow PostgreSQL pattern: simple UPDATE statement
self.client
.query(
"ALTER TABLE watermarks
UPDATE pruner_hi = ?
WHERE pipeline = ?",
)
.bind(pruner_hi)
.bind(pipeline)
.execute()
.await?;
Ok(true)
}
} -
handlers.rs- checkpoint 데이터를 처리하는TxDigesthandler.Click to openhandlers.rsuse anyhow::Result;
use clickhouse::Row;
use serde::Serialize;
use std::sync::Arc;
use sui_indexer_alt_framework::{
FieldCount,
pipeline::{
Processor,
concurrent::{BatchStatus, Handler},
},
store::Store,
types::full_checkpoint_content::Checkpoint,
};
use crate::store::ClickHouseStore;
/// Structure representing a transaction digest record in ClickHouse
/// Aligned with sui-indexer-alt's StoredTxDigest structure
#[derive(Row, Serialize, Clone, Debug, FieldCount)]
pub struct StoredTxDigest {
pub tx_sequence_number: i64,
pub tx_digest: Vec<u8>,
}
/// Handler that processes checkpoint data and extracts transaction digests
/// Named to align with sui-indexer-alt's TxDigests handler
#[derive(Clone, Default)]
pub struct TxDigests;
#[async_trait::async_trait]
impl Processor for TxDigests {
const NAME: &'static str = "tx_digests";
type Value = StoredTxDigest;
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let Checkpoint {
transactions,
summary,
..
} = checkpoint.as_ref();
let first_tx = summary.network_total_transactions as usize - transactions.len();
Ok(transactions
.iter()
.enumerate()
.map(|(i, tx)| StoredTxDigest {
tx_sequence_number: (first_tx + i) as i64,
tx_digest: tx.transaction.digest().inner().to_vec(),
})
.collect())
}
}
#[async_trait::async_trait]
impl Handler for TxDigests {
type Store = ClickHouseStore;
type Batch = Vec<Self::Value>;
fn batch(
&self,
batch: &mut Self::Batch,
values: &mut std::vec::IntoIter<Self::Value>,
) -> BatchStatus {
batch.extend(values);
BatchStatus::Pending
}
async fn commit<'a>(
&self,
values: &Self::Batch,
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> Result<usize> {
let row_count = values.len();
if row_count == 0 {
return Ok(0);
}
// Use ClickHouse inserter for efficient bulk inserts
let mut inserter = conn.client.inserter("tx_digests")?;
for tx_digest in values {
inserter.write(tx_digest)?;
}
inserter.end().await?;
Ok(row_count)
}
} -
main.rs- ClickHouse backend를 사용하는 수동 indexer 설정.Click to openmain.rsmod handlers;
mod store;
use anyhow::{Result, bail};
use clap::Parser;
use sui_indexer_alt_framework::{
Indexer, IndexerArgs,
ingestion::{ClientArgs, IngestionConfig},
pipeline::concurrent::ConcurrentConfig,
service::Error,
};
use url::Url;
use handlers::TxDigests;
use store::ClickHouseStore;
#[derive(clap::Parser, Debug, Clone)]
struct Args {
#[clap(flatten)]
pub indexer_args: IndexerArgs,
#[clap(flatten)]
pub client_args: ClientArgs,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize crypto provider for HTTPS connections (needed for remote checkpoint fetching)
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install crypto provider");
// Parse command-line arguments
let args = Args::parse();
// ClickHouse connection (uses 'dev' user by default for local development)
let clickhouse_url = "http://localhost:8123".parse::<Url>()?;
println!("Connecting to ClickHouse at: {}", clickhouse_url);
// Create our custom ClickHouse store
let store = ClickHouseStore::new(clickhouse_url);
// Ensure the database tables are created before starting the indexer
store.create_tables_if_not_exists().await?;
// Manually build the indexer with our custom ClickHouse store
// This is the key difference from basic-sui-indexer which uses IndexerCluster::builder()
let mut indexer = Indexer::new(
store.clone(),
args.indexer_args,
args.client_args,
IngestionConfig::default(),
None, // No metrics prefix
&Default::default(), // Empty prometheus registry
)
.await?;
// Register our concurrent pipeline handler (better for testing pruning)
// This processes checkpoints with separate reader and pruner components
indexer
.concurrent_pipeline(
TxDigests,
// ConcurrentConfig default comes with no pruning.
ConcurrentConfig::default(),
)
.await?;
println!("Starting ClickHouse Sui indexer...");
// Start the indexer and wait for it to complete
match indexer.run().await?.main().await {
Ok(()) | Err(Error::Terminated) => Ok(()),
Err(Error::Aborted) => {
bail!("Indexer aborted due to an unexpected error")
}
Err(Error::Task(e)) => {
bail!(e)
}
}
}
Deserializing Move events
Move smart contract가 Sui에서 실행되면 sui::event module을 사용해 event를 발생시킬 수 있다.
이 event는 checkpoint 안에 BCS-serialized bytes로 저장되므로, indexer는 의미 있는 데이터를 추출하기 위해 이를 deserialize해야 한다.
Why deserialization is needed
Move contract는 다음과 같은 event를 발생시킨다:
// Move smart contract
use sui::event;
public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}
checkpoint 데이터에서 이 event는 raw BCS bytes로 도착하므로, 처리하려면 다시 Rust struct로 변환해야 한다.
Step-by-step deserialization
-
BCS dependency를 추가한다.
[dependencies]
bcs = "0.1.6"
serde = { version = "1.0", features = ["derive"] } -
Rust에서
Eventstruct를 정의한다.Move에 선언한 것과 같은 구조를 Rust에 정의한다. 수동으로 할 수도 있고,
move-binding을 사용해 온체인 package에서 자동 생성할 수도 있다.use serde::Deserialize;
use sui_indexer_alt_framework::types::::base_types::ObjectID;
#[derive(Deserialize, Debug)]
struct BalanceEvent {
balance_manager_id: ObjectID,
asset: ObjectID,
amount: u64,
deposit: bool,
}importantfield 순서와 type은 Move event와 정확히 일치해야 한다.
-
processor에서 event byte를 추출한다.
impl Processor for YourHandler {
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
let mut results = Vec::new();
for transaction in &checkpoint.transactions {
for event in &transaction.events {
// raw BCS bytes를 가져온다
let event_bytes = &event.contents;
// Rust struct로 deserialize한다
if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
// 작업을 수행한다
}
}
}
Ok(results)
}
}