#Observable Architecture in Polycentric Core

Polycentric Core uses RxJS Observables as a core abstraction for handling asynchronous data streams, state management, and real-time updates. This reactive approach is particularly well-suited for a distributed social network where data can change frequently and come from multiple sources.

#Core Concepts

#Query Observables

The query system extensively uses observables to provide real-time data streams. Here's a basic example:

import * as RXJS from 'rxjs'; import * as Models from '@polycentric/polycentric-core/models'; import { queryCRDTObservable } from '@polycentric/polycentric-core/queries'; // Create an observable stream for a CRDT query const observable = queryCRDTObservable( queryCRDT, system, Models.ContentType.ContentTypeUsername ).pipe( RXJS.switchMap((value) => { // Transform or filter the value return RXJS.of(value); }) ); // Subscribe to updates const subscription = observable.subscribe({ next: (value) => { // Handle new value console.log('New value:', value); }, error: (err) => { // Handle error console.error('Error:', err); } }); // Don't forget to unsubscribe when done subscription.unsubscribe();
#Data Sources

Observables in Polycentric Core combine multiple data sources:

  1. Local Storage: Query and observe changes to local data
  2. Network Responses: Stream real-time updates from remote servers
  3. Cache Updates: Monitor and react to cache state changes

#Common Patterns

#State Synchronization

Observables help manage state synchronization between different parts of the system:

import { QueryCRDT } from '@polycentric/polycentric-core/queries'; // Create a query manager const queryCRDT = new QueryCRDT(store, queryServers); // Subscribe to state changes const subscription = queryCRDT.subscribe(system, contentType, { callback: (value) => { // Handle state update updateUI(value); } });
#Real-time Updates

Handle live data streams with automatic updates:

import { QueryHead } from '@polycentric/polycentric-core/queries'; // Create a head query to track latest events const queryHead = new QueryHead(store, queryServers); // Subscribe to head updates queryHead.subscribe(system, { callback: (value) => { if (!value.missingData) { // Process new head state handleHeadUpdate(value.head); } } });