커스텀 인덱서와 Walrus
Walrus는 큰 binary file을 위해 특별히 설계된 분산 storage 및 data availability protocol이다. content-addressable storage protocol이므로 데이터는 blob이라고 하는 고유 식별자를 사용해 식별되고 검색된다. blob은 파일 경로나 위치가 아니라 콘텐츠 자체에서 파생된다. 따라서 서로 다른 사용자가 동일한 콘텐츠를 업로드하면 Walrus는 새 blob을 생성하지 않고 기존 blob을 재사용한다.
고유성을 위해 Walrus에 업로드되는 각 blob은 고유 ID를 가진 대응 Blob NFT object on Sui도 함께 생성한다. 또한 연관된 Blob object에는 선택적으로 Metadata dynamic field가 있을 수 있다.
Metadata dynamic field는 on-chain object의 데이터를 runtime에 확장할 수 있게 해 주는 key-value 확장이다. 설정된 경우 이 dynamic field는 key-value attribute pairs의 mapping으로 동작한다.
커스텀 인덱싱 프레임워크를 사용해 Walrus의 기존 기능을 확장할 수 있다.
Walrus Foundation이 Walrus를 운영하고 관리한다. Walrus protocol에 대한 가장 정확하고 최신 정보를 얻으려면 공식 Walrus Docs를 참조한다.
Blog platform using Walrus
시스템은 dynamic field의 ID를 해당 field의 타입과 parent object의 ID에서 도출한다. 각 Metadata dynamic field ID 역시 고유하다. 이러한 고유한 특성과 Metadata attribute pair를 활용하면 사용자가 다음을 수행할 수 있는 blog platform을 구축할 수 있다:
- 제목이 있는 blog post를 업로드한다.
- 자신의 post와 metric을 조회한다.
- 자신이 생성한 post를 삭제한다.
- post 제목을 수정한다.
- 다른 publisher가 작성한 post를 탐색한다.
Walrus로의 업로드를 처리하는 blog platform 서비스가 이미 존재한다고 가정한다. 이 서비스가 Walrus에 blob과 그에 연관된 NFT object를 생성할 때 publisher(blob을 업로드한 Sui Address), view_count, title에 대한 key-value pair를 담은 Metadata dynamic field도 함께 부착한다. 서비스는 사용자가 publisher와 view_count pair를 수정하지 못하게 막지만 publisher가 title 값은 업데이트할 수 있도록 허용한다.
사용자가 post를 조회할 때 서비스는 index된 데이터베이스에서 해당 blog post의 Metadata를 가져온다. 그런 다음 Owner field를 사용해 full node에서 blob을 가져온다. Sui 상의 Blob object가 live 상태인지 여부를 통해 blog post가 사용 가능한지를 표현한다. Blob object가 wrapped되거나 삭제된 경우 Walrus에 해당 콘텐츠가 여전히 존재하더라도 서비스에서는 해당 blog post에 접근할 수 없다.
Data modeling
data modeling 방법 중 하나는 publisher address를 Metadata dynamic field에 매핑하는 단일 테이블을 사용하는 것이다. 이 방식에서는 테이블이 dynamic_field_id를 key로 사용하며, 이는 dApp 데이터의 식별자이면서 업로드된 각 blob의 콘텐츠를 고유하게 나타내는 역할을 모두 수행하기 때문이다.
예를 들어 이 테이블을 생성하는 up.sql 파일은 다음과 같을 수 있다.
-- This table maps a blob to its associated Sui Blob object and the latest dynamic field metadata
-- for traceability. The `view_count` is indexed to serve reads on the app.
CREATE TABLE IF NOT EXISTS blog_post (
-- ID of the Metadata dynamic field.
dynamic_field_id BYTEA NOT NULL,
-- Current version of the Metadata dynamic field.
df_version BIGINT NOT NULL,
-- Address that published the Walrus Blob.
publisher BYTEA NOT NULL,
-- ID of the Blob object on Sui, used during reads to fetch the actual blob content. If this
-- object has been wrapped or deleted, it will not be present on the live object set, which
-- means the corresponding content on Walrus is also not accessible.
blob_obj_id BYTEA NOT NULL,
view_count BIGINT NOT NULL,
title TEXT NOT NULL,
PRIMARY KEY (dynamic_field_id)
);
-- Index to support ordering and filtering by title
CREATE INDEX IF NOT EXISTS blog_post_by_title ON blog_post
(publisher, title);
Reads
특정 publisher의 blog post를 불러오려면 publisher와 LIMIT 값을 다음 쿼리 패턴에 전달한다:
SELECT *
FROM blog_post
WHERE publisher = $1
ORDER BY title
LIMIT $2;
Custom indexer implementation
이 예제는 sequential pipeline을 사용하며 각 checkpoint가 엄격한 순서로 한 번씩만 commit되고 단일 atomic operation으로 처리되도록 보장한다. sequential pipeline 아키텍처는 이 프로젝트에 필수는 아니지만 concurrent 아키텍처를 구현하는 것보다 더 단순한 선택지이다. 프로젝트가 필요로 하는 시점에는 언제든지 concurrent pipeline으로 확장할 수 있다.
이 구현은 checkpoint 경계에서 최신 object 상태를 추적한다. Metadata dynamic field가 생성, 변경, wrapped, 삭제 또는 unwrap될 때 해당 field는 object changes 아래의 transaction output에 나타난다. 이 field를 생성하는 example transaction을 Testnet에서 확인할 수 있다. 이러한 dynamic field의 타입은 0x2::dynamic_field::Field<vector<u8>, 0xabc...123::metadata::Metadata>이다.
Metadata dynamic field에 대한 object 변경 | input object에 포함됨 | live output object에 포함됨 | index 방법 |
|---|---|---|---|
| 생성(또는 unwrap) | ❌ | ✅ | 행 추가 |
| 변경 | ✅ | ✅ | 행 업데이트 |
| 삭제(또는 wrap) | ✅ | ❌ | 행 삭제 |
Processor
모든 pipeline은 ingestion task로부터 넘어온 checkpoint를 store에 commit할 중간 또는 최종 형태로 변환하는 logic을 정의하는 Processor trait를 동일하게 구현한다. 데이터는 processor 안으로 들어오고 밖으로 나가며 이때 순서가 뒤섞여 있을 수 있다.
process function
process 함수는 checkpoint에 진입하고 빠져나가는 object의 상태를 포착하기 위해 checkpoint_input_objects 집합과 latest_live_output_objects 집합을 계산한다. checkpoint_input_objects에는 있지만 latest_live_output_objects에는 없는 Metadata dynamic field는 해당 field가 wrapped되었거나 삭제되었음을 의미한다. 이러한 경우에는 commit 함수가 나중에 삭제를 처리할 수 있도록 dynamic field ID만 기록하면 된다. 생성, 변경, unwrap operation의 경우 object는 항상 최소한 latest_live_output_objects 집합에는 나타난다.
/// This pipeline operates on a checkpoint granularity to produce a set of values reflecting the
/// state of relevant Metadata dynamic fields at checkpoint boundary.
async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
let checkpoint_input_objects = checkpoint_input_objects(checkpoint)?;
let latest_live_output_objects = checkpoint_output_objects(checkpoint)?;
// Collect values to be passed to committer. This map is keyed on the dynamic field id.
let mut values: BTreeMap<ObjectID, Self::Value> = BTreeMap::new();
// Process relevant Metadata dynamic fields that were wrapped or deleted in this checkpoint.
for (object_id, object) in &checkpoint_input_objects {
// If an object appears in both maps, it is still alive at the end of the checkpoint.
if latest_live_output_objects.contains_key(object_id) {
continue;
}
// Check the checkpoint input state of the Metadata dynamic field to see if it's
// relevant to our indexing.
let Some((_, _)) = extract_content_from_metadata(&self.metadata_type, object)? else {
continue;
};
// Since the table is keyed on the dynamic field id, this is all the information we need
// to delete the correct entry in the commit fn.
values.insert(*object_id, ProcessedWalrusMetadata::Delete(*object_id));
}
for (object_id, object) in &latest_live_output_objects {
let Some((blog_post_metadata, blob_obj_id)) =
extract_content_from_metadata(&self.metadata_type, object)?
else {
continue;
};
values.insert(
*object_id,
ProcessedWalrusMetadata::Upsert {
df_version: object.version().into(),
dynamic_field_id: *object_id,
blog_post_metadata,
blob_obj_id,
},
);
}
Ok(values.into_values().collect())
}
Committer
sequential pipeline의 두 번째이자 마지막 구성 요소는 Committer이다. 데이터가 processor에서 committer로 넘어올 때 순서가 맞지 않을 수 있으므로, 변환된 데이터를 checkpoint 경계 기준으로 올바른 순서로 batch 처리해 store에 쓰는 책임은 committer에 있다.
batch
batch 함수는 다른 checkpoint에서 처리된 변환 데이터를 어떻게 batch로 묶을지 정의한다. 이 함수는 dynamic_field_id에서 처리된 Walrus Metadata로의 mapping을 유지한다. batch 함수는 다음에 batch할 checkpoint가 바로 다음 연속 checkpoint임을 보장하므로 기존 entry를 덮어써도 안전하다.
fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
for value in values {
match value {
ProcessedWalrusMetadata::Upsert {
dynamic_field_id, ..
} => {
batch.insert(dynamic_field_id, value);
}
ProcessedWalrusMetadata::Delete(dynamic_field_id) => {
batch.insert(dynamic_field_id, value);
}
}
}
}
commit
commit 함수는 store에 데이터를 쓰기 전에 처리된 데이터에 대한 최종 변환을 수행한다. 이 경우 로직은 처리된 데이터를 to_delete와 to_upsert로 분할한다.
async fn commit<'a>(
&self,
batch: &Self::Batch,
conn: &mut postgres::Connection<'a>,
) -> Result<usize> {
// Partition the batch into items to delete and items to upsert
let (to_delete, to_upsert): (Vec<_>, Vec<_>) = batch
.values()
.partition(|item| matches!(item, ProcessedWalrusMetadata::Delete(_)));
let to_upsert: Vec<StoredBlogPost> = to_upsert
.into_iter()
.map(|item| item.to_stored())
.collect::<Result<Vec<_>>>()?;
let to_delete: Vec<ObjectID> = to_delete
.into_iter()
.map(|item| Ok(item.dynamic_field_id()))
.collect::<Result<Vec<_>>>()?;
let mut total_affected = 0;
if !to_delete.is_empty() {
let deleted_count = diesel::delete(blog_post::table)
.filter(blog_post::dynamic_field_id.eq_any(to_delete.iter().map(|id| id.to_vec())))
.execute(conn)
.await?;
total_affected += deleted_count;
}
if !to_upsert.is_empty() {
let upserted_count = diesel::insert_into(blog_post::table)
.values(&to_upsert)
.on_conflict(blog_post::dynamic_field_id)
.do_update()
.set((
blog_post::df_version.eq(excluded(blog_post::df_version)),
blog_post::title.eq(excluded(blog_post::title)),
blog_post::view_count.eq(excluded(blog_post::view_count)),
blog_post::blob_obj_id.eq(excluded(blog_post::blob_obj_id)),
))
.filter(blog_post::df_version.lt(excluded(blog_post::df_version)))
.execute(conn)
.await?;
total_affected += upserted_count;
}
Ok(total_affected)
}
Putting it all together
서비스를 위한 main 함수
#[tokio::main]
async fn main() -> Result<()> {
let args = WalrusIndexerArgs::parse();
// The `IndexerClusterBuilder` offers a convenient way to quickly set up an `IndexerCluster`,
// which consists of the base indexer, metrics service, and a cancellation token.
let mut indexer = IndexerClusterBuilder::new()
.with_database_url(args.database_url)
.with_args(args.cluster_args)
.with_migrations(&MIGRATIONS)
.build()
.await?;
let blog_post_pipeline = BlogPostPipeline::new(METADATA_DYNAMIC_FIELD_TYPE).unwrap();
// Other pipelines can be easily added with `.sequential_pipeline()` or
// `.concurrent_pipeline()`.
indexer
.sequential_pipeline(blog_post_pipeline, SequentialConfig::default())
.await?;
match indexer.run().await?.main().await {
Ok(()) | Err(Error::Terminated) => Ok(()),
Err(Error::Aborted) => {
bail!("Indexer aborted due to an unexpected error")
}
Err(Error::Task(e)) => {
bail!(e)
}
}
}
publisher가 작성한 post 목록을 사용자에게 제공하려면 서비스가 먼저 publisher를 기준으로 데이터베이스를 쿼리하여 다음과 같은 결과를 얻는다. 그런 다음 서비스는 blob_obj_id를 사용해 Blob NFT 콘텐츠를 가져온다. 그 이후 실제 Walrus 콘텐츠를 가져올 수 있다.
dynamic_field_id | df_version | publisher | blob_obj_id | view_count | title
--------------------------------------------------------------------+------------+--------------------------------------------------------------------+--------------------------------------------------------------------+------------+------------------
\x40b5ae12e780ae815d7b0956281291253c02f227657fe2b7a8ccf003a5f597f7 | 608253371 | \xfe9c7a465f63388e5b95c8fd2db857fad4356fc873f96900f4d8b6e7fc1e760e | \xcfb3d474c9a510fde93262d4b7de66cad62a2005a54f31a63e96f3033f465ed3 | 10 | Blog Post Module
Additional considerations
모든 Walrus blob에는 관련된 lifetime이 있으므로 만료 변경 사항을 추적해야 한다. Metadata dynamic field가 변경될 때마다 parent Sui Blob object도 output changes에 나타나야 한다. blob의 lifetime은 Blob object의 내용에서 직접 읽을 수 있다. 그러나 lifetime 변경은 보통 Blob object 자체에서 발생한다. parent object에 대한 업데이트는 child dynamic field에 영향을 주지 않기 때문에 child를 직접 수정하지 않는 한 이러한 lifetime 변경은 현재 indexing 설정에서는 드러나지 않은 상태로 남는다. 이 문제는 여러 가지 방식으로 해결할 수 있다:
Blobobject의 모든 변경을 감시한다.- 모든
BlobCertifiedevent를 감시한다. - blob lifetime을 관리하는 호출과
Metadatadynamic field를 ping하는 호출을 동일 transaction 안에서 수행하는 PTB를 구성한다.
write 측에서 추가 작업을 수행하고 싶지 않다면 선택지는 첫 번째와 두 번째 방법으로 제한된다. 이를 위해서는 앞에서 설명한 metadata indexing 작업을 수행하는 pipeline 하나와 BlobCertified event(또는 Blob object 변경)를 index하는 또 다른 pipeline, 이렇게 두 개의 pipeline이 필요하다.