Sui RPC용 GraphQL (베타)
GraphQL은 Sui 네트워크를 유연하게 쿼리하는 방법을 제공한다. 이 페이지는 요청 헤더, variables 및 fragments를 사용한 쿼리 구성, pagination 전략, 쿼리 범위, 서비스 제한을 포함하여 Sui RPC에서 GraphQL을 사용할 때의 핵심 개념을 다룬다.
실용적인 예시는 GraphQL RPC 사용하기을 참조하라. GraphQL의 전반적인 기초는 GraphQL 및 GitHub의 소개 문서를 참조하라.
JSON-RPC is deprecated. Migrate to either gRPC or GraphQL RPC by July 2026.
Refer to the list of RPC or data providers that have enabled gRPC on their full nodes or offer GraphQL RPC. Contact a provider directly to request access. If your RPC or data provider doesn’t yet support these data access methods, ask them to enable support or contact the Sui Foundation team on Discord, Telegram, or Slack for help.
GraphQL RPC Service는 General-purpose Indexer의 Postgres-compatible database, Archival Store and Service, full node에서 데이터를 읽는다. GraphQL RPC는 gRPC API의 대안이다. General-purpose Indexer는 custom indexing framework의 확장 가능한 구현이다. 이 프레임워크는 remote checkpoint store와 full node RPCs를 사용해 데이터를 수집한 다. 이를 통해 서로 다른 타입의 Sui 네트워크 데이터를 Postgres 테이블에 병렬로 적재하도록 구성할 수 있어 데이터 수집 성능이 향상된다. 또한 서로 다른 테이블에 대해 pruning을 구성해 성능과 비용의 균형을 맞출 수 있다.
High-level release timeline
표시된 목표 시점은 잠정적이며 프로젝트 진행 상황과 사용자 피드백에 따라 업데이트될 수 있다.
| Tentative time | Milestone | Description |
|---|---|---|
| ✔️ September 2025 | GraphQL RPC Server와 General-purpose Indexer의 Beta 릴리스 | GraphQL RPC Server를 테스트하면서 indexed된 Sui 데이터에 접근하고, General-purpose Indexer 설정 검증을 시작할 수 있다. 또한 프로덕션이 아닌 환경에서 애플리케이션 마이그레이션을 시작하고 원하는 개선 사항에 대한 피드백을 공유할 수 있다. |
| ✔️ September-October 2025 | JSON-RPC의 deprecation | 이 시점에 JSON-RPC는 deprecated되며 마이그레이션 고지 기간이 시작된다. |
| February-March 2026 | GraphQL RPC Server와 General-purpose Indexer의 GA 릴리스 | 프로덕션 환경에서 애플리케이션의 마이그레이션과 컷오버를 시작한다. |
| July 2026 | 마이그레이션 타임라인 종료 | 이 시점에 JSON-RPC는 완전히 비활성화된다. 이 타임라인은 약 7개월의 마이그레이션 고지 기간을 가정한다. |
Refer to Access Sui Data for an overview of options to access Sui network data.
The GraphQL RPC release stage is currently in beta. Refer to the high-level timeline for releases.
Components
GraphQL과 General-purpose Indexer 스택의 핵심 구성 요소는 다음을 포함한다:
-
General-purpose Indexer: 구성 가능하고 병렬적인 pipelines를 사용해 Sui checkpoint 데이터를 수집하고 변환한 뒤 Postgres-compatible database에 기록한다. 데이터 소스로 Sui remote checkpoint store와 full node를 사용하도록 구성할 수 있다.
-
Postgres-compatible database: GraphQL queries를 위한 indexed 데이터를 저장한다. GCP AlloyDB로 테스트되었지만 어떤 Postgres-compatible database든 실행할 수 있다. 대체 데이터베이스를 테스트하고 성능, 비용, 운영 특성에 대한 피드백을 공유하라.
-
GraphQL service: indexed 데이터에 대해 구조화된 queries를 제공한다. GraphQL specification을 따르며 지원되는 schema는 Sui RPC용 GraphQL (베타)에 문서화되어 있다.
-
Archival Service: key-value store에서 과거 데이터에 대한 point lookup을 가능하게 한다. 사용할 수 없는 경우 GraphQL service는 lookups를 위해 Postgres-compatible database로 폴백하며, 이 경우 해당 데이터베이스의 retention policy에 의해 제한될 수 있다. 자세한 내용은 아카이브 서비스 사용하기를 참조하라.
-
Consistent Store: 최근 1시간 이내 네트워크의 최신 상태(objects owned by addresses, objects by type, balances by address and type)에 대한 queries에 응답한다. 일관성은 queries를 특정 최근 checkpoint에 고정함으로써 보장된다.
-
Full node: transaction 실행과 simulation을 가능하게 한다.
When to use
풍부한 dashboards, explorers, 데이터 중심 앱을 구축하는 유연하고 인체공학적인 데이터 API로 GraphQL RPC with General-purpose Indexer를 사용하라. 이 API는 custom indexing framework로 생성된 indexer로 구동된다.
애플리케이션이 다음에 해당한다면 GraphQL을 사용하라:
-
address가 보낸 모든 transactions와 같이 구성 가능한 retention 또는 필터링된 데이터 접근을 갖는 과거 데이터가 필요하다.
-
wallets와 dashboards와 같이 frontend에서 구조화된 결과를 표시해야 한다.
-
overfetching을 줄이는 유연하고 composable한 queries의 이점을 얻는다.
-
transactions, objects, events와 같은 여러 데이터 엔티티에 단일 요청에서 의존하거나, 일부 checkpoint의 스냅샷에서 온 응답처럼 여러 요청에 걸쳐 일관된 방식으로 의존한다.
How GraphQL RPC and General-purpose Indexer fit into the application stack
애플리케이션에서 deprecated된 JSON-RPC를 사용하고 있다면, General-purpose Indexer, Postgres-compatible database, GraphQL RPC server의 결합 스택을 직접 운영하거나 RPC provider 또는 indexer operator의 서비스로 활용하여 GraphQL RPC로 마이그레이션할 수 있다.
GraphQL 및 Indexer 데이터 스택은 다음 구성으로 실행하거나 사용할 수 있다.
Fully managed service
개발자는 전체 스택을 백그라운드에서 실행하고 운영하는 indexer operator 또는 data provider가 제공하는 서비스로 GraphQL에 접근할 수 있다. 이미 이 서비스를 제공하는지 또는 제공할 계획이 있는지 data provider에 문의하라.
Partial self-managed
개발자는 다음을 할 수 있다:
-
Archival Service와 full node는 RPC provider 또는 indexer operator의 것을 사용하면서, Indexer pipelines와 GraphQL service를 실행한다.
-
Postgres-compatible database(local Postgres, AlloyDB 등)를 기본 데이터 스토어로 구성하고 관리한다.
-
self-managed 구성 요소를 cloud infrastructure 또는 baremetal에 배포한다.
Fully self-managed
개발자, indexer operator, 또는 RPC provider는 다음을 할 수 있다:
-
complete stack, 즉 Indexer pipelines, GraphQL service, Postgres-compatible database, Archival Service, Consistent Store, full node를 cloud infrastructure 또는 bare metal에서 실행한다.
-
자신의 애플리케이션 또는 다른 빌더와 third-party services에 GraphQL을 제공한다.
Working with the GraphQL service
GraphQL service는 GraphQL 개념에 부합하는 query surface를 노출한다. 이 서비스는 pagination, filtering, consistent snapshot queries를 허용한다. 또한 schema, query cost limits, logging에 대한 런타임 구성도 지원한다. GraphQL schema는 Sui RPC용 GraphQL (베타)에 정의되어 있다. 여기에서 지원되는 타입과 fields를 살펴보고, GraphiQL IDE를 사용해 queries를 테스트하고, 최신 schema에 대한 문서를 읽을 수 있다.
GraphQL service는 stateless하고 수평 확장이 가능한 단일 binary로 배포된다. Queries는 필요에 따라 Postgres-compatible database(과거 데이터에 대한 filters), Archival Service(point lookups), Consistent Store(실시간 데이터), full node(실행 및 simulation) 중 하나 이상에서 제공되는 데이터로 응답한다. 이러한 stores에 대한 접근은 서비스 시작 시 서비스와 함께 구성해야 하며, 그렇지 않으면 서비스가 요청에 올바르게 응답하지 못할 수 있다. 설정, 구성, 실행 방법에 대한 자세한 내용은 README를 참조하라.
GraphQL에 대한 요청은 여러 제한을 받으며, 이는 리소스가 클라이언트 간에 공정하게 공유되도록 하기 위함이다. 각 제한은 구성 가능하며 인스턴스에 구성된 값은 Query.serviceConfig를 통해 조회할 수 있다. 제한을 충족하지 않는 요청은 오류를 반환한다. 현재 적용되는 제한은 다음과 같다:
-
Request size: 요청은 일정 바이트 크기를 초과할 수 없다. 제한은 transaction signing, execution, simulation fields의 파라미터인 모든 values와 variable bindings에 적용되는 transaction payload limit(기본값: 175KB)과 쿼리의 다른 모든 부분에 적용되는 query payload limit(기본값: 5KB)으로 나뉜다.
-
Request timeout: 각 요청에 소요되는 시간은 제한되며 execution(기본값: 74s)과 일반 읽기(기본값: 40s)에 대해 서로 다른 제한을 둔다.
-
Query input nodes and depth: query는 너무 복잡할 수 없으며, 너무 많은 input nodes 또는 field names(기본값: 300)를 포함하거나 너무 깊게 중첩될 수 없다(기본값: 20).
-
Output nodes: 서비스는 요청된 모든 field가 존재하고, 모든 paginated field가 전체 페이지를 반환하며, 모든 multi-get이 요청된 모든 keys를 찾는다고 가정해 query가 생성할 수 있는 최대 output nodes 수를 추정한다. 이 추정치는 제한되어야 한다(기본값: 1,000,000).
-
Page and multi-get size: 각 paginated field(기본값: 50)와 multi-get(기본값: 200)은 최대 크기 제한을 받는다. 특정 paginated fields는 더 높거나 더 낮은 최대값을 제공하기 위해 이를 재정의할 수 있다.
-
(TBD) Rich queries: 요청은 데이터베이스에 대한 전용 접근이 필요한 queries를 제한된 수(기본값: 5)만 포함할 수 있다(다른 요청과 그룹화할 수 없다).
Working with General-purpose Indexer
General-purpose Indexer는 remote object store, local files, full node RPC 중 하나에서 checkpoints 데이터를 가져오고, 특화된 pipelines 집합을 통해 여러 데이터베이스 테이블에 데이터를 인덱싱한다. 각 pipeline은 특정 데이터를 추출하여 대상 테이블에 기록하는 역할을 한다.
Full list of tables and their schemas
// @generated automatically by Diesel CLI.
diesel::table! {
coin_balance_buckets (object_id, cp_sequence_number) {
object_id -> Bytea,
cp_sequence_number -> Int8,
owner_kind -> Nullable<Int2>,
owner_id -> Nullable<Bytea>,
coin_type -> Nullable<Bytea>,
coin_balance_bucket -> Nullable<Int2>,
}
}
diesel::table! {
coin_balance_buckets_deletion_reference (cp_sequence_number, object_id) {
object_id -> Bytea,
cp_sequence_number -> Int8,
}
}
diesel::table! {
cp_bloom_blocks (cp_block_index, bloom_block_index) {
cp_block_index -> Int8,
bloom_block_index -> Int2,
bloom_filter -> Bytea,
}
}
diesel::table! {
cp_blooms (cp_sequence_number) {
cp_sequence_number -> Int8,
bloom_filter -> Bytea,
}
}
diesel::table! {
cp_sequence_numbers (cp_sequence_number) {
cp_sequence_number -> Int8,
tx_lo -> Int8,
epoch -> Int8,
}
}
diesel::table! {
ev_emit_mod (package, module, tx_sequence_number) {
package -> Bytea,
module -> Text,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
ev_struct_inst (package, module, name, instantiation, tx_sequence_number) {
package -> Bytea,
module -> Text,
name -> Text,
instantiation -> Bytea,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
kv_checkpoints (sequence_number) {
sequence_number -> Int8,
checkpoint_contents -> Bytea,
checkpoint_summary -> Bytea,
validator_signatures -> Bytea,
}
}
diesel::table! {
kv_epoch_ends (epoch) {
epoch -> Int8,
cp_hi -> Int8,
tx_hi -> Int8,
end_timestamp_ms -> Int8,
safe_mode -> Bool,
total_stake -> Nullable<Int8>,
storage_fund_balance -> Nullable<Int8>,
storage_fund_reinvestment -> Nullable<Int8>,
storage_charge -> Nullable<Int8>,
storage_rebate -> Nullable<Int8>,
stake_subsidy_amount -> Nullable<Int8>,
total_gas_fees -> Nullable<Int8>,
total_stake_rewards_distributed -> Nullable<Int8>,
leftover_storage_fund_inflow -> Nullable<Int8>,
epoch_commitments -> Bytea,
}
}
diesel::table! {
kv_epoch_starts (epoch) {
epoch -> Int8,
protocol_version -> Int8,
cp_lo -> Int8,
start_timestamp_ms -> Int8,
reference_gas_price -> Int8,
system_state -> Bytea,
}
}
diesel::table! {
kv_feature_flags (protocol_version, flag_name) {
protocol_version -> Int8,
flag_name -> Text,
flag_value -> Bool,
}
}
diesel::table! {
kv_genesis (genesis_digest) {
genesis_digest -> Bytea,
initial_protocol_version -> Int8,
}
}
diesel::table! {
kv_objects (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
serialized_object -> Nullable<Bytea>,
}
}
diesel::table! {
kv_packages (package_id, package_version) {
package_id -> Bytea,
package_version -> Int8,
original_id -> Bytea,
is_system_package -> Bool,
serialized_object -> Bytea,
cp_sequence_number -> Int8,
}
}
diesel::table! {
kv_protocol_configs (protocol_version, config_name) {
protocol_version -> Int8,
config_name -> Text,
config_value -> Nullable<Text>,
}
}
diesel::table! {
kv_transactions (tx_digest) {
tx_digest -> Bytea,
cp_sequence_number -> Int8,
timestamp_ms -> Int8,
raw_transaction -> Bytea,
raw_effects -> Bytea,
events -> Bytea,
user_signatures -> Bytea,
}
}
diesel::table! {
obj_info (object_id, cp_sequence_number) {
object_id -> Bytea,
cp_sequence_number -> Int8,
owner_kind -> Nullable<Int2>,
owner_id -> Nullable<Bytea>,
package -> Nullable<Bytea>,
module -> Nullable<Text>,
name -> Nullable<Text>,
instantiation -> Nullable<Bytea>,
}
}
diesel::table! {
obj_info_deletion_reference (cp_sequence_number, object_id) {
object_id -> Bytea,
cp_sequence_number -> Int8,
}
}
diesel::table! {
obj_versions (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
object_digest -> Nullable<Bytea>,
cp_sequence_number -> Int8,
}
}
diesel::table! {
sum_displays (object_type) {
object_type -> Bytea,
display_id -> Bytea,
display_version -> Int2,
display -> Bytea,
}
}
diesel::table! {
tx_affected_addresses (affected, tx_sequence_number) {
affected -> Bytea,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
tx_affected_objects (affected, tx_sequence_number) {
tx_sequence_number -> Int8,
affected -> Bytea,
sender -> Bytea,
}
}
diesel::table! {
tx_balance_changes (tx_sequence_number) {
tx_sequence_number -> Int8,
balance_changes -> Bytea,
}
}
diesel::table! {
tx_calls (package, module, function, tx_sequence_number) {
package -> Bytea,
module -> Text,
function -> Text,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
tx_digests (tx_sequence_number) {
tx_sequence_number -> Int8,
tx_digest -> Bytea,
}
}
diesel::table! {
tx_kinds (tx_kind, tx_sequence_number) {
tx_kind -> Int2,
tx_sequence_number -> Int8,
}
}
diesel::table! {
watermarks (pipeline) {
pipeline -> Text,
epoch_hi_inclusive -> Int8,
checkpoint_hi_inclusive -> Int8,
tx_hi -> Int8,
timestamp_ms_hi_inclusive -> Int8,
reader_lo -> Int8,
pruner_timestamp -> Timestamp,
pruner_hi -> Int8,
}
}
diesel::allow_tables_to_appear_in_same_query!(
coin_balance_buckets,
coin_balance_buckets_deletion_reference,
cp_bloom_blocks,
cp_blooms,
cp_sequence_numbers,
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
kv_epoch_ends,
kv_epoch_starts,
kv_feature_flags,
kv_genesis,
kv_objects,
kv_packages,
kv_protocol_configs,
kv_transactions,
obj_info,
obj_info_deletion_reference,
obj_versions,
sum_displays,
tx_affected_addresses,
tx_affected_objects,
tx_balance_changes,
tx_calls,
tx_digests,
tx_kinds,
watermarks,
);
아래는 처리하는 데이터 타입에 따라 다양한 pipeline 범주를 간략히 설명한 것이다:
Blockchain raw content pipelines
이 pipelines는 핵심 blockchain 데이터를 원시 형태로 캡처하여 complete checkpoint 정보, full transaction 및 objects contents, Move package bytecode와 metadata를 보존한다. 이를 통해 object ID와 version, transaction digest, checkpoint sequence number 같은 key로 직접 조회할 수 있도록 complete blockchain state를 사용할 수 있게 한다. 일부 프로덕션 배포는 해당 kv_ 테이블 대신 Archival Store를 사용해 checkpoints, transactions, objects contents를 조회한다.
다음 pipelines는 object owner, transaction type, affected addresses, event type 같은 서로 다른 속성에 따라 효율적으로 filtering 및 querying할 수 있는 indexed views를 생성한다. 이러한 indexes는 관심 있는 keys를 식별하는 데 도움을 주며, 그런 다음 raw content kv_ 테이블에서 상세 content를 가져올 수 있다:
Tables: kv_checkpoints, kv_transactions, kv_objects, kv_packages
Transaction pipelines
이 pipelines는 효율적인 filtering과 querying를 지원하기 위해 핵심 transaction 속성을 추출하고 인덱싱한다. tx_kinds, tx_calls, tx_affected_addresses, tx_affected_objects는 types, function calls, sender 및 receiver addresses, 변경된 objects를 기준으로 transactions를 빠르게 조회할 수 있게 한다. tx_digests는 kv_ 테이블에서 digests로 transactions를 조회하는 데 필요한 transaction sequence numbers와 transaction digests 간의 변환을 가능하게 하고, tx_balance_changes는 각 transaction의 balance changes 정보를 저장한다.
Tables : tx_digests, tx_kinds, tx_calls, tx_affected_addresses, tx_affected_objects, tx_balance_changes
Object pipelines
이 pipelines는 현재 및 과거 object 정보를 관리한다. active object metadata를 저장하고, 각 object의 version history를 유지하며, balances로 정렬된 효율적인 coin queries를 위해 coin balances를 buckets로 분류한다. obj_versions 테이블은 GraphQL service에 특히 중요하다. 이 테이블은 모든 blockchain objects의 version history를 추적하며 object ID, version number, digest, checkpoint sequence number를 저장한다. GraphQL service는 전체 object 데이터를 불러오지 않고도 version bounds, checkpoint bounds, exact versions 기준으로 object queries를 해결하는 효율적인 index로 이 테이블을 사용하여 version pagination과 temporal consistency 같은 기능을 가능하게 한다.
Pruning policies는 obj_info와 coin_balance_buckets에 대해 구성할 수 있으며, 이를 통해 지정된 시간 범위 내에서 과거 데이터를 유지하여 query 요구사항과 storage 관리를 균형 있게 맞출 수 있다. 이는 모든 과거 데이터를 무기한 보관하지 않고 최근 object history를 조회해야 하는 사용 사례를 지원할 수 있게 한다.
Tables: obj_info, obj_versions, coin_balance_buckets
Epoch information pipelines
이 pipelines는 protocol upgrades와 epoch transition points를 캡처한다. 각 epoch의 system state, reward distribution, validator 위원회, protocol configurations를 추적하여 네트워크 진화의 과거 기록을 제공한다.
Tables: kv_epoch_starts, kv_epoch_ends, kv_feature_flags, kv_protocol_configs
Event processing pipelines
이 pipelines는 sender, emitting module, event type 기준의 효율적인 querying를 위해 blockchain events를 인덱싱한다.
Tables: ev_emit_mod, ev_struct_inst
Utility and support pipelines
이 pipelines는 pruning을 위한 checkpoint sequence number tracking과 GraphQL query에서 서로 다른 테이블 간의 일관된 읽기를 보장하는 watermark tracking과 같은 지원 인프라를 제공한다.
Tables: cp_sequence_numbers, watermarks
Other pipelines
sum_displays 테이블은 각 object type에 대한 최신 버전의 Display object를 저장하며, 이는 the off-chain representation (display) for a type를 렌더링하는 데 사용된다.
Working with Consistent Store
Consistent Store는 온체인의 실시간 데이터를 인덱싱하고 최근 checkpoints에 대한 queries를 제공하는 combined indexer 및 RPC service이다. Retention은 구성 가능하며 일반적으로 분 또는 시간 단위로 측정된다. 이 service의 indexer는 General-purpose Indexer와 동일한 소스에서 checkpoints를 가져오고, 요청은 gRPC를 통해 제공되면서 데이터를 embedded RocksDB store에 기록하며, 다음 queries에 응답한다:
-
최근 checkpoint에서 owner의 실시간 objects, 필요에 따라 type으로 필터링
-
최근 checkpoint에서 주어진 type의 실시간 objects
-
최근 checkpoint에서의 address balance
이 service는 자체 데이터베이스를 유지하므로 stateless하지 않다. 새로운 인스턴스는 genesis부터 동기화하거나, 형식 스냅샷에서 복원하는 방식으로 indexer와 유사하게 띄울 수 있다.
For RPC providers and data operators
GraphQL RPC와 General-purpose Indexer 스택을 서비스로 운영하는 경우, 빌더에게 성능이 좋고 비용 효율적인 경험을 제공하도록 구성을 맞추기 위한 몇 가지 핵심 고려 사항은 다음과 같다. 단계별 설정 및 운영 지침은 GraphQL 및 General-purpose Indexer를 참조하라.
How much data to index and retain
Postgres-compatible database에는 최근 checkpoint 데이터를 30일에서 90일 정도 유지하는 것이 좋다. 이는 전체 과거 인덱싱의 높은 storage 비용을 들이지 않으면서 대부분의 앱에 강력한 기본값을 제공한다.
-
30일은 최근 활동과 assets가 필요한 dashboards와 explorers를 위한 기준선 역할을 한다.
-
90일은 더 긴 범위의 pagination, 과거 조회, 참여 주기가 더 느린 앱에 대한 지원을 개선한다.
events, objects, transactions와 같이 포함할 데이터를 인덱싱 pipelines에서 범위 지정할 수 있으며, 필요하지 않은 구성 요소는 비활성화할 수 있다.
특정 앱에 필요하지 않는 한 Postgres에 장기 과거 데이터를 보관하는 것은 권장되지 않는다.
Use the Archival Service and Store for historical lookups
모든 프로덕션 배포에서는 Postgres와 Archival Service를 함께 사용하여 관련 데이터가 Postgres에 존재하지 않을 때 transactions, objects, checkpoints의 point lookup을 지원하라. Archival Service는 과거 버전과 checkpoint 데이터의 백엔드 역할을 하여 Postgres 인스턴스에 가해지는 부담을 줄인다. 엄밀히 필수는 아니지만, 높은 retention의 GraphQL 또는 gRPC 워크로드를 지원하려는 모든 프로덕션 구성에서는 Archival Service를 사용하는 것이 좋다.
현재 구현은 매우 확장 가능하고 성능 좋은 데이터 스토어인 GCP Bigtable을 지원한다. 자체 archival store를 운영할 계획이라면 indexer 설정과 RPC service 구현을 위해 각각 sui-kvstore와 sui-kv-rpc를 참조하라. indexer 설정 시에는 반드시 커스텀 인덱싱 프레임워크를 사용하라. 다른 확장형 데이터 스토어 지원에 기여하고 싶다면 GitHub에서 새 이슈를 만들어 문의하라.
lib.rs in sui-kvstore
lib.rs in sui-kvstoremod bigtable;
pub mod config;
mod handlers;
mod rate_limiter;
pub mod tables;
use std::sync::Arc;
use std::sync::OnceLock;
use anyhow::Result;
use async_trait::async_trait;
use prometheus::Registry;
use serde::Deserialize;
use serde::Serialize;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_framework::IndexerArgs;
use sui_indexer_alt_framework::ingestion::ClientArgs;
use sui_indexer_alt_framework::pipeline::CommitterConfig;
use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
use crate::rate_limiter::CompositeRateLimiter;
use crate::rate_limiter::RateLimiter;
use sui_protocol_config::Chain;
use sui_types::balance_change::BalanceChange;
use sui_types::base_types::ObjectID;
use sui_types::committee::EpochId;
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
use sui_types::digests::CheckpointDigest;
use sui_types::digests::TransactionDigest;
use sui_types::effects::TransactionEffects;
use sui_types::effects::TransactionEvents;
use sui_types::event::Event;
use sui_types::messages_checkpoint::CheckpointContents;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::messages_checkpoint::CheckpointSummary;
use sui_types::object::Object;
use sui_types::storage::ObjectKey;
use sui_types::transaction::Transaction;
pub use crate::bigtable::client::BigTableClient;
pub use crate::bigtable::store::BigTableConnection;
pub use crate::bigtable::store::BigTableStore;
pub use crate::handlers::BigTableHandler;
pub use crate::handlers::CheckpointsByDigestPipeline;
pub use crate::handlers::CheckpointsPipeline;
pub use crate::handlers::EpochEndPipeline;
pub use crate::handlers::EpochLegacyBatch;
pub use crate::handlers::EpochLegacyPipeline;
pub use crate::handlers::EpochStartPipeline;
pub use crate::handlers::ObjectsPipeline;
pub use crate::handlers::PackagesByCheckpointPipeline;
pub use crate::handlers::PackagesByIdPipeline;
pub use crate::handlers::PackagesPipeline;
pub use crate::handlers::PrevEpochUpdate;
pub use crate::handlers::ProtocolConfigsPipeline;
pub use crate::handlers::SystemPackagesPipeline;
pub use crate::handlers::TransactionsPipeline;
pub use config::CommitterLayer;
pub use config::ConcurrentLayer;
pub use config::IndexerConfig;
pub use config::IngestionConfig;
pub use config::PipelineLayer;
pub const CHECKPOINTS_PIPELINE: &str =
<BigTableHandler<CheckpointsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const CHECKPOINTS_BY_DIGEST_PIPELINE: &str =
<BigTableHandler<CheckpointsByDigestPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const TRANSACTIONS_PIPELINE: &str =
<BigTableHandler<TransactionsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const OBJECTS_PIPELINE: &str =
<BigTableHandler<ObjectsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const EPOCH_START_PIPELINE: &str =
<BigTableHandler<EpochStartPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const EPOCH_END_PIPELINE: &str =
<BigTableHandler<EpochEndPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PROTOCOL_CONFIGS_PIPELINE: &str =
<BigTableHandler<ProtocolConfigsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const EPOCH_LEGACY_PIPELINE: &str =
<EpochLegacyPipeline as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PACKAGES_PIPELINE: &str =
<BigTableHandler<PackagesPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PACKAGES_BY_ID_PIPELINE: &str =
<BigTableHandler<PackagesByIdPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PACKAGES_BY_CHECKPOINT_PIPELINE: &str =
<BigTableHandler<PackagesByCheckpointPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const SYSTEM_PACKAGES_PIPELINE: &str =
<BigTableHandler<SystemPackagesPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
/// All pipeline names registered by the indexer. Used by `LegacyWatermarkTracker`
/// to know when all pipelines have reported.
pub const ALL_PIPELINE_NAMES: [&str; 12] = [
CHECKPOINTS_PIPELINE,
CHECKPOINTS_BY_DIGEST_PIPELINE,
TRANSACTIONS_PIPELINE,
OBJECTS_PIPELINE,
EPOCH_START_PIPELINE,
EPOCH_END_PIPELINE,
PROTOCOL_CONFIGS_PIPELINE,
EPOCH_LEGACY_PIPELINE,
PACKAGES_PIPELINE,
PACKAGES_BY_ID_PIPELINE,
PACKAGES_BY_CHECKPOINT_PIPELINE,
SYSTEM_PACKAGES_PIPELINE,
];
/// Non-legacy pipeline names used for the default `get_watermark` implementation.
const WATERMARK_PIPELINES: [&str; 11] = [
CHECKPOINTS_PIPELINE,
CHECKPOINTS_BY_DIGEST_PIPELINE,
TRANSACTIONS_PIPELINE,
OBJECTS_PIPELINE,
EPOCH_START_PIPELINE,
EPOCH_END_PIPELINE,
PROTOCOL_CONFIGS_PIPELINE,
PACKAGES_PIPELINE,
PACKAGES_BY_ID_PIPELINE,
PACKAGES_BY_CHECKPOINT_PIPELINE,
SYSTEM_PACKAGES_PIPELINE,
];
static WRITE_LEGACY_DATA: OnceLock<bool> = OnceLock::new();
/// Set whether to write legacy data (legacy watermark row, epoch DEFAULT_COLUMN, tx column).
/// Must be called before creating any pipelines. Panics if called more than once.
pub fn set_write_legacy_data(value: bool) {
WRITE_LEGACY_DATA
.set(value)
.expect("write_legacy_data already set");
}
pub fn write_legacy_data() -> bool {
*WRITE_LEGACY_DATA.get_or_init(|| false)
}
pub struct BigTableIndexer {
pub indexer: Indexer<BigTableStore>,
}
#[derive(Clone, Debug)]
pub struct CheckpointData {
pub summary: CheckpointSummary,
pub contents: CheckpointContents,
pub signatures: AuthorityStrongQuorumSignInfo,
}
#[derive(Clone, Debug)]
pub struct TransactionData {
pub transaction: Transaction,
pub effects: TransactionEffects,
pub events: Option<TransactionEvents>,
pub checkpoint_number: CheckpointSequenceNumber,
pub timestamp: u64,
pub balance_changes: Vec<BalanceChange>,
pub unchanged_loaded_runtime_objects: Vec<ObjectKey>,
}
/// Partial transaction and events for when we only need transaction content for events
#[derive(Clone, Debug)]
pub struct TransactionEventsData {
pub events: Vec<Event>,
pub timestamp_ms: u64,
}
/// Epoch data returned by reader methods.
/// All fields are optional to support partial column queries.
#[derive(Clone, Debug, Default)]
pub struct EpochData {
pub epoch: Option<u64>,
pub protocol_version: Option<u64>,
pub start_timestamp_ms: Option<u64>,
pub start_checkpoint: Option<u64>,
pub reference_gas_price: Option<u64>,
pub system_state: Option<sui_types::sui_system_state::SuiSystemState>,
pub end_timestamp_ms: Option<u64>,
pub end_checkpoint: Option<u64>,
pub cp_hi: Option<u64>,
pub tx_hi: Option<u64>,
pub safe_mode: Option<bool>,
pub total_stake: Option<u64>,
pub storage_fund_balance: Option<u64>,
pub storage_fund_reinvestment: Option<u64>,
pub storage_charge: Option<u64>,
pub storage_rebate: Option<u64>,
pub stake_subsidy_amount: Option<u64>,
pub total_gas_fees: Option<u64>,
pub total_stake_rewards_distributed: Option<u64>,
pub leftover_storage_fund_inflow: Option<u64>,
pub epoch_commitments: Option<Vec<u8>>,
}
/// Package metadata returned by reader methods.
/// The actual serialized object is stored in the `objects` table.
#[derive(Clone, Debug)]
pub struct PackageData {
pub package_id: Vec<u8>,
pub package_version: u64,
pub original_id: Vec<u8>,
pub is_system_package: bool,
pub cp_sequence_number: u64,
}
/// Protocol config data returned by reader methods.
#[derive(Clone, Debug, Default)]
pub struct ProtocolConfigData {
pub configs: std::collections::BTreeMap<String, Option<String>>,
pub flags: std::collections::BTreeMap<String, bool>,
}
/// Serializable watermark for per-pipeline tracking in BigTable.
/// Mirrors the framework's CommitterWatermark type.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Watermark {
pub epoch_hi_inclusive: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi: u64,
pub timestamp_ms_hi_inclusive: u64,
}
#[async_trait]
pub trait KeyValueStoreReader {
async fn get_objects(&mut self, objects: &[ObjectKey]) -> Result<Vec<Object>>;
async fn get_transactions(
&mut self,
transactions: &[TransactionDigest],
) -> Result<Vec<TransactionData>>;
async fn get_checkpoints(
&mut self,
sequence_numbers: &[CheckpointSequenceNumber],
) -> Result<Vec<CheckpointData>>;
async fn get_checkpoint_by_digest(
&mut self,
digest: CheckpointDigest,
) -> Result<Option<CheckpointData>>;
/// Return the minimum watermark across the given pipelines, selecting the whole
/// watermark with the lowest `checkpoint_hi_inclusive`. Returns `None` if any
/// pipeline is missing a watermark.
async fn get_watermark_for_pipelines(
&mut self,
pipelines: &[&str],
) -> Result<Option<Watermark>>;
/// Return the minimum watermark across all non-legacy pipelines.
async fn get_watermark(&mut self) -> Result<Option<Watermark>> {
self.get_watermark_for_pipelines(&WATERMARK_PIPELINES).await
}
async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>>;
async fn get_epoch(&mut self, epoch_id: EpochId) -> Result<Option<EpochData>>;
async fn get_latest_epoch(&mut self) -> Result<Option<EpochData>>;
async fn get_protocol_configs(
&mut self,
protocol_version: u64,
) -> Result<Option<ProtocolConfigData>>;
async fn get_events_for_transactions(
&mut self,
keys: &[TransactionDigest],
) -> Result<Vec<(TransactionDigest, TransactionEventsData)>>;
/// Resolve package_ids to their original_ids.
async fn get_package_original_ids(
&mut self,
package_ids: &[ObjectID],
) -> Result<Vec<(ObjectID, ObjectID)>>;
/// Get packages by (original_id, version) pairs.
async fn get_packages_by_version(
&mut self,
keys: &[(ObjectID, u64)],
) -> Result<Vec<PackageData>>;
/// Get the latest version of a package at or before `cp_bound`.
async fn get_package_latest(
&mut self,
original_id: ObjectID,
cp_bound: u64,
) -> Result<Option<PackageData>>;
/// Paginate package versions for an original_id, filtered by cp_bound.
async fn get_package_versions(
&mut self,
original_id: ObjectID,
cp_bound: u64,
after_version: Option<u64>,
before_version: Option<u64>,
limit: usize,
descending: bool,
) -> Result<Vec<PackageData>>;
/// Get packages created in a checkpoint range, ordered by checkpoint.
async fn get_packages_by_checkpoint_range(
&mut self,
cp_after: Option<u64>,
cp_before: Option<u64>,
limit: usize,
descending: bool,
) -> Result<Vec<PackageData>>;
/// Get all system packages with their latest version at or before `cp_bound`.
async fn get_system_packages(
&mut self,
cp_bound: u64,
after_original_id: Option<ObjectID>,
limit: usize,
) -> Result<Vec<PackageData>>;
}
impl BigTableIndexer {
pub async fn new(
store: BigTableStore,
indexer_args: IndexerArgs,
client_args: ClientArgs,
ingestion_config: IngestionConfig,
committer: CommitterConfig,
config: IndexerConfig,
pipeline: PipelineLayer,
chain: Chain,
registry: &Registry,
) -> Result<Self> {
let mut indexer = Indexer::new(
store,
indexer_args,
client_args,
ingestion_config.into(),
None,
registry,
)
.await?;
let global = config.total_max_rows_per_second.map(RateLimiter::new);
let base_rps = config.max_rows_per_second;
fn build_rate_limiter(
layer: &ConcurrentLayer,
base_rps: Option<u64>,
global: &Option<Arc<RateLimiter>>,
) -> Arc<CompositeRateLimiter> {
let mut limiters = Vec::new();
if let Some(rps) = layer.max_rows_per_second.or(base_rps) {
limiters.push(RateLimiter::new(rps));
}
if let Some(g) = global {
limiters.push(g.clone());
}
Arc::new(CompositeRateLimiter::new(limiters))
}
let base = ConcurrentConfig {
committer,
pruner: None,
..Default::default()
};
indexer
.concurrent_pipeline(
BigTableHandler::new(
CheckpointsPipeline,
&pipeline.checkpoints,
build_rate_limiter(&pipeline.checkpoints, base_rps, &global),
),
pipeline.checkpoints.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
CheckpointsByDigestPipeline,
&pipeline.checkpoints_by_digest,
build_rate_limiter(&pipeline.checkpoints_by_digest, base_rps, &global),
),
pipeline.checkpoints_by_digest.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
TransactionsPipeline,
&pipeline.transactions,
build_rate_limiter(&pipeline.transactions, base_rps, &global),
),
pipeline.transactions.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
ObjectsPipeline,
&pipeline.objects,
build_rate_limiter(&pipeline.objects, base_rps, &global),
),
pipeline.objects.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
EpochStartPipeline,
&pipeline.epoch_start,
build_rate_limiter(&pipeline.epoch_start, base_rps, &global),
),
pipeline.epoch_start.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
EpochEndPipeline,
&pipeline.epoch_end,
build_rate_limiter(&pipeline.epoch_end, base_rps, &global),
),
pipeline.epoch_end.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
ProtocolConfigsPipeline(chain),
&pipeline.protocol_configs,
build_rate_limiter(&pipeline.protocol_configs, base_rps, &global),
),
pipeline.protocol_configs.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
PackagesPipeline,
&pipeline.packages,
build_rate_limiter(&pipeline.packages, base_rps, &global),
),
pipeline.packages.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
PackagesByIdPipeline,
&pipeline.packages_by_id,
build_rate_limiter(&pipeline.packages_by_id, base_rps, &global),
),
pipeline.packages_by_id.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
PackagesByCheckpointPipeline,
&pipeline.packages_by_checkpoint,
build_rate_limiter(&pipeline.packages_by_checkpoint, base_rps, &global),
),
pipeline.packages_by_checkpoint.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
SystemPackagesPipeline,
&pipeline.system_packages,
build_rate_limiter(&pipeline.system_packages, base_rps, &global),
),
pipeline.system_packages.finish(base.clone()),
)
.await?;
if write_legacy_data() {
indexer
.concurrent_pipeline(EpochLegacyPipeline, pipeline.epoch_legacy.finish(base))
.await?;
}
Ok(Self { indexer })
}
pub fn pipeline_names(&self) -> Vec<&'static str> {
self.indexer.pipelines().collect()
}
}
impl From<sui_indexer_alt_framework_store_traits::CommitterWatermark> for Watermark {
fn from(w: sui_indexer_alt_framework_store_traits::CommitterWatermark) -> Self {
Self {
epoch_hi_inclusive: w.epoch_hi_inclusive,
checkpoint_hi_inclusive: w.checkpoint_hi_inclusive,
tx_hi: w.tx_hi,
timestamp_ms_hi_inclusive: w.timestamp_ms_hi_inclusive,
}
}
}
impl From<Watermark> for sui_indexer_alt_framework_store_traits::CommitterWatermark {
fn from(w: Watermark) -> Self {
Self {
epoch_hi_inclusive: w.epoch_hi_inclusive,
checkpoint_hi_inclusive: w.checkpoint_hi_inclusive,
tx_hi: w.tx_hi,
timestamp_ms_hi_inclusive: w.timestamp_ms_hi_inclusive,
}
}
}
main.rs in sui-kv-rpc
main.rs in sui-kv-rpcuse anyhow::Result;
use axum::Router;
use axum::routing::get;
use clap::Parser;
use mysten_network::callback::CallbackLayer;
use prometheus::Registry;
use std::sync::Arc;
use std::time::Duration;
use sui_kv_rpc::KvRpcServer;
use sui_rpc_api::{RpcMetrics, RpcMetricsMakeCallbackHandler, ServerVersion};
use telemetry_subscribers::TelemetryConfig;
use tonic::transport::{Identity, Server, ServerTlsConfig};
bin_version::bin_version!();
#[derive(Parser)]
struct App {
/// Path to GCP service account JSON key file. If not provided, uses Application Default
/// Credentials (GOOGLE_APPLICATION_CREDENTIALS or metadata server).
#[clap(long)]
credentials: Option<String>,
instance_id: String,
#[clap(default_value = "[::1]:8000")]
address: String,
#[clap(default_value = "127.0.0.1")]
metrics_host: String,
#[clap(default_value_t = 9184)]
metrics_port: usize,
#[clap(long = "tls-cert", default_value = "")]
tls_cert: String,
#[clap(long = "tls-key", default_value = "")]
tls_key: String,
/// GCP project ID for the BigTable instance (defaults to the token provider's project)
#[clap(long = "bigtable-project")]
bigtable_project: Option<String>,
#[clap(long = "app-profile-id")]
app_profile_id: Option<String>,
#[clap(long = "checkpoint-bucket")]
checkpoint_bucket: Option<String>,
/// Channel-level timeout in milliseconds for BigTable gRPC calls (default: 60000)
#[clap(long = "bigtable-channel-timeout-ms")]
bigtable_channel_timeout_ms: Option<u64>,
}
async fn health_check() -> &'static str {
"OK"
}
#[tokio::main]
async fn main() -> Result<()> {
let _guard = TelemetryConfig::new().with_env().init();
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install CryptoProvider");
let app = App::parse();
let server_version = Some(ServerVersion::new("sui-kv-rpc", VERSION));
let registry_service = mysten_metrics::start_prometheus_server(
format!("{}:{}", app.metrics_host, app.metrics_port).parse()?,
);
let registry: Registry = registry_service.default_registry();
mysten_metrics::init_metrics(®istry);
let channel_timeout = app.bigtable_channel_timeout_ms.map(Duration::from_millis);
let server = KvRpcServer::new(
app.instance_id,
app.bigtable_project,
app.app_profile_id,
app.checkpoint_bucket,
channel_timeout,
server_version,
®istry,
app.credentials,
)
.await?;
let addr = app.address.parse()?;
let mut builder = Server::builder();
if !app.tls_cert.is_empty() && !app.tls_key.is_empty() {
let identity =
Identity::from_pem(std::fs::read(app.tls_cert)?, std::fs::read(app.tls_key)?);
let tls_config = ServerTlsConfig::new().identity(identity);
builder = builder.tls_config(tls_config)?;
}
let reflection_v1 = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
sui_rpc_api::proto::google::protobuf::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(sui_rpc_api::proto::google::rpc::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET)
.build_v1()?;
let reflection_v1alpha = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
sui_rpc_api::proto::google::protobuf::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(sui_rpc_api::proto::google::rpc::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET)
.build_v1alpha()?;
tokio::spawn(async {
let web_server = Router::new().route("/health", get(health_check));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8081")
.await
.expect("can't bind to the healthcheck port");
axum::serve(listener, web_server.into_make_service())
.await
.expect("healh check service failed");
});
builder
.layer(CallbackLayer::new(RpcMetricsMakeCallbackHandler::new(
Arc::new(RpcMetrics::new(®istry)),
)))
.add_service(
sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(server),
)
.add_service(reflection_v1)
.add_service(reflection_v1alpha)
.serve(addr)
.await?;
Ok(())
}
Deployment strategies and trade-offs
신뢰할 수 있고 성능 좋은 GraphQL RPC service를 제공하기 위해 모든 것을 인덱싱할 필요는 없다. 실제로 많은 개발자는 최신 object 및 transaction 데이터와 몇 주에서 몇 개월의 이력만 필요할 수 있다. 다음과 같은 방식으로 운영 오버헤드를 줄이고 query 성능을 개선할 수 있다:
-
Postgres에서 30일에서 90일과 같은 명확한 retention window를 구성한다.
-
모든 버전을 Postgres에 보관하는 대신 Archival Service를 사용하여 깊은 과거 queries를 처리한다.
배포를 설계할 때는 비용, 신뢰성, 기능 완전성 간의 절충을 고려하라:
-
짧은 retention의 Postgres-only는 storage 비용이 낮고 성능이 빠르지만 과거 범위가 제한된다.
-
높은 retention의 Postgres-only는 더 넓은 데이터 범위를 제공하지만 상대적으로 더 높은 storage 비용과 대규모에서 더 느린 성능을 초래한다.
-
짧은 retention의 Postgres와 Archival Service 조합은 비용과 완전성을 모두 최적화하여 프로덕션 배포에 적합하다.
Best practices
성능과 신뢰성을 개선하려면 다음 운영 best practices도 고려하라:
-
지연 시간을 최소화하려면 database, indexing pipelines, GraphQL RPC service, archival service를 사용자와 같은 region에 co-locate하도록 시도하라.
-
업그레이드 또는 실패 시 SLA를 보장하기 위해 replication과 staged deployments를 사용하라.
-
서로 다른 개발자 요구를 충족하기 위해 서로 다른 service tiers를 제공하는 것을 고려하라. 예를 들면:
-
GraphQL RPC 또는 gRPC를 통해 최근 데이터(예: 30일)를 제공하는 기본 tier.
-
과거 조회가 필요한 앱에 적합한 전체 GraphQL 또는 gRPC와 Archival Service 접근을 제공하는 premium tier.
-
선택적으로, 다양한 클라이언트 footprints를 지원하기 위해 region-specific instances 또는 throughput 기반 가격 책정을 제공한다.
-