# Storage and Querying in Polycentric Core

Polycentric implements a sophisticated storage and querying system that handles data ingestion, indexing, and real-time updates. For information on how data is actually persisted, see the Persistence section.

# Data Ingestion

All data in Polycentric goes through a structured ingestion process before being queryable. The Store class manages this process:

class Store {
    private readonly stages: readonly HasIngest[];
    
    constructor(level: BinaryAbstractLevel) {
        // Initialize indexes
        this.indexEvents = new IndexEvents(registerSublevel);
        this.indexSystemStates = new IndexSystemState(registerSublevel);
        this.indexProcessStates = new IndexProcessState(registerSublevel);
        // ... more indexes
        
        // Register all stages for ingestion
        this.stages = [
            this.indexEvents,
            this.indexSystemStates,
            this.indexProcessStates,
            // ... more stages
        ];
    }

    public async ingest(signedEvent: SignedEvent): Promise<void> {
        const actions: BinaryUpdateLevel[] = [];
        
        // Run through all ingestion stages
        for (const stage of this.stages) {
            actions.push(...(await stage.ingest(signedEvent)));
        }
        
        // Batch write all changes
        await this.level.batch(actions);
    }
}

# Indexes

The system maintains multiple specialized indexes for efficient querying:

  1. Events Index - Core event storage
  2. System States - Tracks system-wide state
  3. Process States - Individual process states
  4. Feed Index - Social feed organization
  5. CRDT Element Sets - Conflict-free replicated data
  6. Opinion Index - User reactions and opinions

# Real-time Querying

Queries combine multiple data sources using RxJS observables:

  1. Disk Storage: Query local data
private loadFromDisk(
    system: Models.PublicKey.PublicKey,
): RXJS.Observable<Batch> {
    const loadProcessHead = (processProto: Protocol.Process) => {
        const process = Models.Process.fromProto(processProto);

        return RXJS.from(
            this.processHandle
                .store()
                .indexProcessStates.getProcessState(system, process),
        ).pipe(
            RXJS.switchMap((processState) =>
                RXJS.from(
                    this.processHandle
                        .store()
                        .indexEvents.getSignedEvent(
                            system,
                            process,
                            processState.logicalClock,
                        ),
                ).pipe(
                    RXJS.switchMap((potentialEvent) =>
                        potentialEvent
                            ? RXJS.of(potentialEvent)
                            : RXJS.NEVER,
                    ),
                ),
            ),
        );
    };

    return RXJS.from(
        this.processHandle.store().indexSystemStates.getSystemState(system),
    ).pipe(
        RXJS.switchMap((systemState) =>
            systemState.processes.length > 0
                ? RXJS.combineLatest(
                        systemState.processes.map(loadProcessHead),
                    )
                : RXJS.of([]),
        ),
        RXJS.switchMap((signedEvents) =>
            RXJS.of({
                source: 'disk',
                signedEvents: signedEvents,
            }),
        ),
    );
}
  1. Network Updates: Stream from remote servers
private loadFromNetwork(
    system: Models.PublicKey.PublicKey,
): RXJS.Observable<Batch> {
    const loadFromServer = async (server: string) => {
        return {
            source: server,
            signedEvents: (await APIMethods.getHead(server, system)).events,
        };
    };

    return Util.taskPerServerObservable(
        this.queryServers,
        system,
        (server: string) => {
            return Util.fromPromiseExceptionToNever(loadFromServer(server));
        },
    );
}
  1. Live Updates: Changes are propagated through observable streams:
// Example of a CRDT query with live updates
const observable = queryCRDTObservable(queryCRDT, system, contentType).pipe(
    switchMap((value) => {
        if (value.missingData) {
            // Load from network
            return loadFromNetwork(system, contentType);
        }
        return of(value);
    }),
    distinctUntilChanged(callbackValuesEqual)
);

# Update Flow

  1. Data Ingestion
  • New events are received
  • Run through ingestion stages
  • Indexed in appropriate stores
  1. State Updates
  • Changes trigger observable updates
  • Queries receive new data
  • UI components react to changes
  1. Synchronization
  • Network updates are merged
  • Conflicts are resolved (using CRDTs)
  • State is propagated to observers

# Example: Live Query Flow

// Create a query for user data
const query = new QueryHead(store, queryServers);

// Subscribe to updates
query.subscribe(system, {
    callback: (value) => {
        if (!value.missingData) {
            // Handle new data
            updateUI(value.head);
        }
    },
    cancelContext: new CancelContext()
});

// Data flow:
// 1. Check local storage
// 2. Stream updates from network
// 3. Ingest new data
// 4. Trigger observable updates
// 5. Notify subscribers

This architecture ensures that:

  • All data is properly indexed before querying
  • Updates are propagated in real-time
  • Queries combine local and remote data seamlessly
  • State is consistently maintained across the system