인덱서 파이프라인 아키텍처
sui-indexer-alt-framework는 두 가지 고유한 파이프라인 아키텍처를 제공한다. 올바른 접근 방식을 선택하려면 이 차이점을 이해하는 것이 중요하다.
Sequential versus concurrent pipelines
Sequential pipelines은 complete checkpoints를 순서대로 commit한다. 각 checkpoint는 다음 checkpoint보다 먼저 완전히 commit되므로 단순하고 일관된 reads를 보장한다.
Concurrent pipelines은 순서와 무관하게 commit하며 개별 checkpoints를 부분적으로 commit할 수 있다. 이를 통해 더 높은 throughput을 위해 여러 checkpoints를 동시에 처리할 수 있지만, 일관성을 보장하려면 reads가 어떤 데이터가 완전히 commit되었는지 확인해야 한다.
When to use each pipeline
두 파이프라인 유형 모두 in-place updates, aggregations, complex business logic를 처리할 수 있다. Sequential pipelines는 concurrent pipelines에 비해 throughput 제한이 있지만, 둘 중 하나를 선택하는 결정은 성능 요구 사항보다 엔지니어링 복잡성에 더 가깝다.
Recommended: Sequential pipeline
대부분의 사용 사례에서는 여기서 시작한다. 더 직접적인 구현과 유지보수를 제공한다.
- ✓ 직접적인 commits와 단순한 queries를 갖는 직관적인 구현을 원한다.
- ✓ 팀이 예측 가능하고 디버깅하기 쉬운 동작을 선호한다.
- ✓ 현재 성능이 요구 사항을 충족한다.
- ✓ 운영 단순성을 중요하게 여긴다.
Concurrent pipeline
다음과 같은 경우에는 concurrent pipeline 구현을 고려한다:
- ✓ 성능 최적화가 필수적이다.
- ✓ sequential processing이 데이터 볼륨을 따라갈 수 없다.
- ✓ 팀이 성능 이점을 위해 추가적인 구현 복잡성을 감수할 의향이 있다.
순서와 무관한 commits를 지원하면 파이프라인에 몇 가지 추가 복잡성이 생긴다:
- Watermark-aware queries: 모든 reads는 어떤 데이터가 완전히 commit되었는지 확인해야 한다. 자세한 내용은 the watermark system 섹션을 참조한다.
- Complex application logic: complete checkpoints를 처리하는 대신 data commits를 조각 단위로 처리해야 한다.
Decision framework
프로젝트에 어떤 파이프라인을 선택해야 할지 확신이 없다면 구현과 디버깅이 더 쉬운 sequential pipeline으로 시작한다. 그런 다음 현실적인 부하에서 성능을 측정한다. Sequential pipeline이 프로젝트 요구 사항을 충족할 수 없다면 concurrent pipeline으로 전환한다.
전체 목록은 아니지만 sequential pipeline이 요구 사항을 충족하지 못할 수 있는 구체적인 시나리오에는 다음이 포함된다:
- 파이프라인이 각 checkpoint에서 chunking과 out-of-order commits의 이점을 얻는 데이터를 생성한다. 개별 checkpoints는 많은 데이터나 latency를 유발할 수 있는 개별 writes를 생성할 수 있다.
- pruning이 필요한 많은 데이터를 생성하고 있다. 이 경우 concurrent pipeline을 사용해야 한다.
어떤 파이프라인을 사용할지 결정하는 것 외에도 scaling도 고려해야 한다. 여러 종류의 데이터를 indexing한다면 여러 pipelines와 watermarks를 사용하는 것을 고려한다.
Common pipeline components
Sequential pipelines와 concurrent pipelines는 공통된 components와 concepts를 공유한다. 이러한 공통 요소를 이해하면 두 아키텍처가 어떻게 다른지 더 명확해진다.
Processor component
The Processor is the concurrent processing engine, handling multiple tasks running at the same time for maximum throughput. Its primary responsibility is to convert raw checkpoint data into database-ready rows using parallel workers.
The component handles this task by spawning FANOUT worker tasks (default: 10) for parallel processing. The FANOUT is the key configuration as it controls parallel processing capacity.
Variable not found. If code is formatted correctly, consider using code comments instead.
Each worker calls your Handler::process() method independently.
Variable not found. If code is formatted correctly, consider using code comments instead.
Each of these workers can process different checkpoints simultaneously and in any order. The workers send their processed data to the Collector with checkpoint metadata.
Processor component는 sequential pipelines와 concurrent pipelines에서 동일하게 동작한다. 이는 Broadcaster로부터 checkpoint data를 받아 custom logic으로 변환하고, 처리된 결과를 파이프라인의 다음 단계로 전달한다.
Watermark concepts summary
파이프라인별 아키텍처를 살펴보기 전에 coordination에 사용되는 세 가지 watermarks를 이해한다:
| Watermark | Purpose | Used by |
|---|---|---|
checkpoint_hi_inclusive | 갭 없이 모든 data가 commit된 가장 높은 checkpoint | 복구와 진행 상황 추적을 위한 양쪽 pipelines |
reader_lo | queries에 사용할 수 있음이 보장되는 가장 낮은 checkpoint | pruning이 활성화된 concurrent pipelines |
pruner_hi | pruning된(deleted) 가장 높은 checkpoint | pruning이 활성화된 concurrent pipelines |
이 watermarks들은 함께 안전한 out-of-order processing, 자동 data cleanup, failures로부터의 recovery를 가능하게 한다.
The watermark system
각 pipeline에 대해 indexer는 최소한 해당 지점까지의 모든 data가 commit된 가장 높은 checkpoint를 추적한다. 추적은 committer watermark인 checkpoint_hi_inclusive를 통해 수행된다. Concurrent pipelines와 sequential pipelines 모두 restart 시 처리를 재개할 위치를 이해하기 위해 checkpoint_hi_inclusive에 의존한다.
선택적으로 pipeline은 pruning이 활성화된 경우 읽기와 pruning 작업의 안전한 하한을 정의하는 reader_lo와 pruner_hi를 추적한다. 이러한 watermarks는 data integrity를 유지하면서 out-of-order processing을 가능하게 하므로 concurrent pipelines에 특히 중요하다.
Safe pruning
Watermark system은 강력한 data lifecycle management system을 만든다:
- Guaranteed data availability: checkpoint data availability rules를 적용하여 readers가 안전한 queries를 수행하도록 보장한다.
- Automatic cleanup process: pipeline은 retention 보장을 유지하면서 storage가 무한히 증가하지 않도록 unpruned checkpoints를 자주 정리한다. Pruning process는 race conditions를 피하기 위해 safety delay와 함께 실행된다.
- Balanced approach: system은 안전성과 효율성 사이의 균형을 맞춘다.
- Storage efficiency: 오래된 데이터는 자동으로 삭제된다.
- Data availability: 항상 retention 양만큼의 complete data를 유지한다.
- Safety guarantees: readers는 누락된 data gaps를 만나지 않는다.
- Performance: out-of-order processing이 throughput을 극대화한다.
이 watermark system은 concurrent pipelines를 고성능이면서도 신뢰할 수 있게 만들어 주며, 강력한 data availability guarantees와 자동 storage management를 유지하면서 대규모 throughput을 가능하게 한다.
Scenario 1: Basic watermark (no pruning)
Pruning이 비활성화된 경우 indexer는 각 pipeline committer의 checkpoint_hi_inclusive만 보고한다. 다음 타임라인을 생각해 보자. 여기서는 여러 checkpoints가 처리되고 있으며 일부는 순서와 무관하게 commit된다.
Checkpoint Processing Timeline:
[1000] [1001] [1002] [1003] [1004] [1005]
✓ ✓ ✗ ✓ ✗ ✗
^
checkpoint_hi_inclusive = 1001
✓ = Committed (all data written)
✗ = Not Committed (processing or failed)
이 시나리오에서 checkpoint 1003이 commit되었더라도 1002에 여전히 gap이 있기 때문에 checkpoint_hi_inclusive는 1001이다. Indexer는 start부터 checkpoint_hi_inclusive까지의 모든 data를 사용할 수 있다는 보장을 만족하기 위해 high watermark를 1001로 보고해야 한다.
Checkpoint 1002가 commit된 후에는 1003까지의 데이터를 안전하게 읽을 수 있다.
[1000] [1001] [1002] [1003] [1004] [1005]
✓ ✓ ✓ ✓ ✗ ✗
[---- SAFE TO READ -------]
(start → checkpoint_hi_inclusive at 1003)
Scenario 2: Pruning enabled
Pruning은 retention policy로 구성된 pipelines에 대해 활성화된다. 예를 들어 table이 너무 커져 마지막 4개의 checkpoints만 유지하려면 retention = 4이다. 이는 indexer가 checkpoint_hi_inclusive와 구성된 retention의 차이로 reader_lo를 주기적으로 업데이트한다는 뜻이다. 별도의 pruning task는 [pruner_hi, reader_lo] 사이의 data를 pruning하는 역할을 한다.
[998] [999] [1000] [1001] [1002] [1003] [1004] [1005] [1006]
🗑️ 🗑️ ✓ ✓ ✓ ✓ ✗ ✗ ✗
^ ^
reader_lo = 1000 checkpoint_hi_inclusive = 1003
🗑️ = Pruned (deleted)
✓ = Committed
✗ = Not Committed
Current watermarks:
-
checkpoint_hi_inclusive= 1003:- start부터 1003까지의 모든 data가 complete하다(gaps 없음).
- 1004가 아직 commit되지 않았기 때문에(gap) 1005로 advance할 수 없다.
-
reader_lo= 1000:- availability가 보장되는 가장 낮은 checkpoint이다.
- 다음과 같이 계산된다:
reader_lo = checkpoint_hi_inclusive - retention + 1. reader_lo= 1003 - 4 + 1 = 1000.
-
pruner_hi= 1000:- 삭제된 가장 높은 exclusive checkpoint이다.
- checkpoints 998과 999는 공간 절약을 위해 삭제되었다.
Clear safe zones:
[998] [999] [1000] [1001] [1002] [1003] [1004] [1005] [1006]
🗑️ 🗑️ ✓ ✓ ✓ ✓ ✗ ✗ ✓
[--PRUNED--][--- Safe Reading Zone ---] [--- Processing ---]
How watermarks progress over time
Step 1: Checkpoint 1004가 완료된다.
[999] [1000] [1001] [1002] [1003] [1004] [1005] [1006] [1007]
🗑️ ✓ ✓ ✓ ✓ ✓ ✗ ✓ ✗
^ ^
reader_lo = 1000 checkpoint_hi_inclusive = 1004 (advanced by 1)
pruner_hi = 1000
이제 checkpoint 1004가 commit되었으므로 1004까지 gaps가 없어서 checkpoint_hi_inclusive는 1003에서 1004로 advance할 수 있다. reader_lo와 pruner_hi는 아직 바뀌지 않았다는 점에 유의한다.
Step 2: Reader watermark가 주기적으로 업데이트된다.
[999] [1000] [1001] [1002] [1003] [1004] [1005] [1006] [1007]
🗑️ ✓ ✓ ✓ ✓ ✓ ✗ ✓ ✗
^ ^
reader_lo = 1001 checkpoint_hi_inclusive = 1004
(1004 - 4 + 1 = 1001)
pruner_hi = 1000 (unchanged as pruner hasn't run yet)
별도의 reader watermark update task가 주기적으로 실행되며(구성 가능), retention policy에 따라 reader_lo를 1001로 advance한다(1004 - 4 + 1 = 1001로 계산). 그러나 pruner는 아직 실행되지 않았으므로 pruner_hi는 1000으로 유지된다.
Step 3: Pruner가 safety delay 이후 실행된다.
[999] [1000] [1001] [1002] [1003] [1004] [1005] [1006] [1007]
🗑️ 🗑️ ✓ ✓ ✓ ✓ ✗ ✓ ✗
^ ^
reader_lo = 1001 checkpoint_hi_inclusive = 1004
pruner_hi = 1001
pruner_hi (1000) < reader_lo (1001)이므로 pruner는 일부 checkpoints가 retention window 밖에 있음을 감지한다. 이는 reader_lo까 지의 모든 요소를 정리하고(checkpoint 1000 삭제) pruner_hi를 reader_lo(1001)로 업데이트한다.
reader_lo보다 오래된 checkpoints도 다음 이유 때문에 일시적으로는 여전히 사용 가능할 수 있다:
- in-flight queries를 보호하기 위한 의도적인 delay
- pruner가 cleanup을 아직 완료하지 않음
Sequential pipeline architecture
Sequential pipelines는 ordered processing을 우선시하는 보다 직접적이면서도 강력한 indexing 아키텍처를 제공한다. Concurrent pipelines에 비해 일부 throughput을 희생하지만 더 강한 guarantees를 제공하며, 대체로 reasoning하기 더 쉽다.
Architecture overview
Sequential pipeline은 두 개의 주요 components만으로 구성되어 concurrent pipeline의 여섯 component 아키텍처보다 훨씬 단순하다.

Broadcaster와 Processor components는 concurrent pipeline과 동일한 backpressure mechanisms, adaptive parallel processing, processor() implementations를 사용한다. Processor component는 Common pipeline components 섹션에서 자세히 설명한다.
핵심 차이점은 ordering, batching, database commits를 처리하는 단 하나의 Committer component만 있는 극적으로 단순화된 pipeline core이다. 반대로 concurrent pipelines는 Processor 외에 Collector, Committer, CommitterWatermark, ReaderWatermark, Pruner라는 다섯 개의 별도 components를 가진다.
Sequential pipeline components
Sequential pipelines는 shared Processor 외에 pipeline-specific component 하나, 즉 Committer를 가진다.
Committer
Sequential Committer는 pipeline의 주요 component이자 주된 customization 지점이다. 높은 수준에서 Committer는 다음 작업을 수행한다:
- Receives processor로부터 순서와 무관하게 처리된 data를 받는다.
- Orders data를 checkpoint sequence 기준으로 정렬한다.
- Batches 여러 checkpoints를 사용자의 logic으로 함께 batching한다.
- Commits batch를 데이터베이스에 원자적으로 commit한다.
- Signals ingestion layer에 progress를 다시 전달한다.
이를 customize하기 위해 사용자의 코드는 committer가 호출하는 두 개의 핵심 함수를 사용한다:
batch(): data merging logic.
fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);
commit(): database write logic.
async fn commit<'a>(
&self,
batch: &Self::Batch,
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize>;
Sequential pipeline backpressure mechanisms
Sequential pipelines는 memory overflow와 ordering-related deadlocks를 방지하기 위해 두 계층의 backpressure를 사용한다:
