#
Storage and Querying in Polycentric Core
Polycentric implements a sophisticated storage and querying system that handles data ingestion, indexing, and real-time updates. For information on how data is actually persisted, see the Persistence section.
#
Data Ingestion
All data in Polycentric goes through a structured ingestion process before being queryable. The Store class manages this process:
class Store {
private readonly stages: readonly HasIngest[];
constructor(level: BinaryAbstractLevel) {
// Initialize indexes
this.indexEvents = new IndexEvents(registerSublevel);
this.indexSystemStates = new IndexSystemState(registerSublevel);
this.indexProcessStates = new IndexProcessState(registerSublevel);
// ... more indexes
// Register all stages for ingestion
this.stages = [
this.indexEvents,
this.indexSystemStates,
this.indexProcessStates,
// ... more stages
];
}
public async ingest(signedEvent: SignedEvent): Promise<void> {
const actions: BinaryUpdateLevel[] = [];
// Run through all ingestion stages
for (const stage of this.stages) {
actions.push(...(await stage.ingest(signedEvent)));
}
// Batch write all changes
await this.level.batch(actions);
}
}
#
Indexes
The system maintains multiple specialized indexes for efficient querying:
- Events Index - Core event storage
- System States - Tracks system-wide state
- Process States - Individual process states
- Feed Index - Social feed organization
- CRDT Element Sets - Conflict-free replicated data
- Opinion Index - User reactions and opinions
#
Real-time Querying
Queries combine multiple data sources using RxJS observables:
- Disk Storage: Query local data
private loadFromDisk(
system: Models.PublicKey.PublicKey,
): RXJS.Observable<Batch> {
const loadProcessHead = (processProto: Protocol.Process) => {
const process = Models.Process.fromProto(processProto);
return RXJS.from(
this.processHandle
.store()
.indexProcessStates.getProcessState(system, process),
).pipe(
RXJS.switchMap((processState) =>
RXJS.from(
this.processHandle
.store()
.indexEvents.getSignedEvent(
system,
process,
processState.logicalClock,
),
).pipe(
RXJS.switchMap((potentialEvent) =>
potentialEvent
? RXJS.of(potentialEvent)
: RXJS.NEVER,
),
),
),
);
};
return RXJS.from(
this.processHandle.store().indexSystemStates.getSystemState(system),
).pipe(
RXJS.switchMap((systemState) =>
systemState.processes.length > 0
? RXJS.combineLatest(
systemState.processes.map(loadProcessHead),
)
: RXJS.of([]),
),
RXJS.switchMap((signedEvents) =>
RXJS.of({
source: 'disk',
signedEvents: signedEvents,
}),
),
);
}
- Network Updates: Stream from remote servers
private loadFromNetwork(
system: Models.PublicKey.PublicKey,
): RXJS.Observable<Batch> {
const loadFromServer = async (server: string) => {
return {
source: server,
signedEvents: (await APIMethods.getHead(server, system)).events,
};
};
return Util.taskPerServerObservable(
this.queryServers,
system,
(server: string) => {
return Util.fromPromiseExceptionToNever(loadFromServer(server));
},
);
}
- Live Updates: Changes are propagated through observable streams:
// Example of a CRDT query with live updates
const observable = queryCRDTObservable(queryCRDT, system, contentType).pipe(
switchMap((value) => {
if (value.missingData) {
// Load from network
return loadFromNetwork(system, contentType);
}
return of(value);
}),
distinctUntilChanged(callbackValuesEqual)
);
#
Update Flow
- Data Ingestion
- New events are received
- Run through ingestion stages
- Indexed in appropriate stores
- State Updates
- Changes trigger observable updates
- Queries receive new data
- UI components react to changes
- Synchronization
- Network updates are merged
- Conflicts are resolved (using CRDTs)
- State is propagated to observers
#
Example: Live Query Flow
// Create a query for user data
const query = new QueryHead(store, queryServers);
// Subscribe to updates
query.subscribe(system, {
callback: (value) => {
if (!value.missingData) {
// Handle new data
updateUI(value.head);
}
},
cancelContext: new CancelContext()
});
// Data flow:
// 1. Check local storage
// 2. Stream updates from network
// 3. Ingest new data
// 4. Trigger observable updates
// 5. Notify subscribers
This architecture ensures that:
- All data is properly indexed before querying
- Updates are propagated in real-time
- Queries combine local and remote data seamlessly
- State is consistently maintained across the system