#
Observable Architecture in Polycentric Core
Polycentric Core uses RxJS Observables as a core abstraction for handling asynchronous data streams, state management, and real-time updates. This reactive approach is particularly well-suited for a distributed social network where data can change frequently and come from multiple sources.
#
Core Concepts
#
Query Observables
The query system extensively uses observables to provide real-time data streams. Here's a basic example:
import * as RXJS from 'rxjs';
import * as Models from '@polycentric/polycentric-core/models';
import { queryCRDTObservable } from '@polycentric/polycentric-core/queries';
// Create an observable stream for a CRDT query
const observable = queryCRDTObservable(
queryCRDT,
system,
Models.ContentType.ContentTypeUsername
).pipe(
RXJS.switchMap((value) => {
// Transform or filter the value
return RXJS.of(value);
})
);
// Subscribe to updates
const subscription = observable.subscribe({
next: (value) => {
// Handle new value
console.log('New value:', value);
},
error: (err) => {
// Handle error
console.error('Error:', err);
}
});
// Don't forget to unsubscribe when done
subscription.unsubscribe();
#
Data Sources
Observables in Polycentric Core combine multiple data sources:
- Local Storage: Query and observe changes to local data
- Network Responses: Stream real-time updates from remote servers
- Cache Updates: Monitor and react to cache state changes
#
Common Patterns
#
State Synchronization
Observables help manage state synchronization between different parts of the system:
import { QueryCRDT } from '@polycentric/polycentric-core/queries';
// Create a query manager
const queryCRDT = new QueryCRDT(store, queryServers);
// Subscribe to state changes
const subscription = queryCRDT.subscribe(system, contentType, {
callback: (value) => {
// Handle state update
updateUI(value);
}
});
#
Real-time Updates
Handle live data streams with automatic updates:
import { QueryHead } from '@polycentric/polycentric-core/queries';
// Create a head query to track latest events
const queryHead = new QueryHead(store, queryServers);
// Subscribe to head updates
queryHead.subscribe(system, {
callback: (value) => {
if (!value.missingData) {
// Process new head state
handleHeadUpdate(value.head);
}
}
});