# Observable Architecture in Polycentric Core

Polycentric Core uses RxJS Observables as a core abstraction for handling asynchronous data streams, state management, and real-time updates. This reactive approach is particularly well-suited for a distributed social network where data can change frequently and come from multiple sources.

# Core Concepts

# Query Observables

The query system extensively uses observables to provide real-time data streams. Here's a basic example:

import * as RXJS from 'rxjs';
import * as Models from '@polycentric/polycentric-core/models';
import { queryCRDTObservable } from '@polycentric/polycentric-core/queries';

// Create an observable stream for a CRDT query
const observable = queryCRDTObservable(
    queryCRDT,
    system,
    Models.ContentType.ContentTypeUsername
).pipe(
    RXJS.switchMap((value) => {
        // Transform or filter the value
        return RXJS.of(value);
    })
);

// Subscribe to updates
const subscription = observable.subscribe({
    next: (value) => {
        // Handle new value
        console.log('New value:', value);
    },
    error: (err) => {
        // Handle error
        console.error('Error:', err);
    }
});

// Don't forget to unsubscribe when done
subscription.unsubscribe();
# Data Sources

Observables in Polycentric Core combine multiple data sources:

  1. Local Storage: Query and observe changes to local data
  2. Network Responses: Stream real-time updates from remote servers
  3. Cache Updates: Monitor and react to cache state changes

# Common Patterns

# State Synchronization

Observables help manage state synchronization between different parts of the system:

import { QueryCRDT } from '@polycentric/polycentric-core/queries';

// Create a query manager
const queryCRDT = new QueryCRDT(store, queryServers);

// Subscribe to state changes
const subscription = queryCRDT.subscribe(system, contentType, {
    callback: (value) => {
        // Handle state update
        updateUI(value);
    }
});
# Real-time Updates

Handle live data streams with automatic updates:

import { QueryHead } from '@polycentric/polycentric-core/queries';

// Create a head query to track latest events
const queryHead = new QueryHead(store, queryServers);

// Subscribe to head updates
queryHead.subscribe(system, {
    callback: (value) => {
        if (!value.missingData) {
            // Process new head state
            handleHeadUpdate(value.head);
        }
    }
});