Observable Architecture in Polycentric Core
Polycentric Core uses RxJS Observables as a core abstraction for handling asynchronous data streams, state management, and real-time updates. This reactive approach is particularly well-suited for a distributed social network where data can change frequently and come from multiple sources.
Core Concepts
Query Observables
The query system extensively uses observables to provide real-time data streams. Here's a basic example:
import * as RXJS from 'rxjs';
import * as Models from '@polycentric/polycentric-core/models';
import { queryCRDTObservable } from '@polycentric/polycentric-core/queries';
const observable = queryCRDTObservable(
queryCRDT,
system,
Models.ContentType.ContentTypeUsername
).pipe(
RXJS.switchMap((value) => {
return RXJS.of(value);
})
);
const subscription = observable.subscribe({
next: (value) => {
console.log('New value:', value);
},
error: (err) => {
console.error('Error:', err);
}
});
subscription.unsubscribe();
Data Sources
Observables in Polycentric Core combine multiple data sources:
- Local Storage: Query and observe changes to local data
- Network Responses: Stream real-time updates from remote servers
- Cache Updates: Monitor and react to cache state changes
Common Patterns
State Synchronization
Observables help manage state synchronization between different parts of the system:
import { QueryCRDT } from '@polycentric/polycentric-core/queries';
const queryCRDT = new QueryCRDT(store, queryServers);
const subscription = queryCRDT.subscribe(system, contentType, {
callback: (value) => {
updateUI(value);
}
});
Real-time Updates
Handle live data streams with automatic updates:
import { QueryHead } from '@polycentric/polycentric-core/queries';
const queryHead = new QueryHead(store, queryServers);
queryHead.subscribe(system, {
callback: (value) => {
if (!value.missingData) {
handleHeadUpdate(value.head);
}
}
});