Skip to content

Observable Operators

Special operators for composing and transforming observables.

FynX Operators - Observable Operator Implementations and Mixins

This module provides the core operator implementations and mixins that enable FynX's fluent reactive programming syntax. These operators allow observables to be composed using intuitive Python operators, creating complex reactive behaviors from simple building blocks.

Why Operators?

FynX uses Python's operator overloading to provide a natural, readable syntax for reactive programming. Instead of verbose method calls, you can express reactive relationships using familiar operators:

  • observable >> function - Transform values reactively
  • observable & condition - Filter values conditionally
  • obs1 + obs2 + obs3 - Combine observables

This approach makes reactive code more declarative and easier to understand.

Operator Overview

Transform (>>): Apply functions to create derived values

doubled = counter >> (lambda x: x * 2)

Filter (&): Only emit values when conditions are met

valid_data = data & is_valid

Combine (+): Merge multiple observables into tuples

coordinates = x + y + z

These operators work together to create complex reactive pipelines:

result = (x + y) >> (lambda a, b: a + b) & (total >> (lambda t: t > 10))

Operator Mixins

This module also provides mixin classes that consolidate operator overloading logic:

OperatorMixin: Provides common reactive operators (add, rshift, and, invert) for all observable types that support reactive composition.

TupleMixin: Adds tuple-like behavior (iter, len, getitem, setitem) for observables that represent collections of values.

ValueMixin: Provides transparent value wrapper behavior for ObservableValue instances, making them behave like regular Python values while supporting reactive operators.

Implementation Details

The operators are implemented as standalone functions rather than methods to avoid circular import issues and enable lazy loading. They are called automatically when you use the corresponding operators on Observable instances.

The functions handle different observable types (regular, merged, conditional) appropriately, ensuring consistent behavior across the reactive system.

Performance Characteristics

  • Lazy Evaluation: Operators create computed/conditional observables that only evaluate when needed
  • Efficient Composition: Multiple operators can be chained without creating intermediate objects
  • Memory Conscious: Operators reuse existing infrastructure rather than creating new classes

Common Patterns

Data Processing Pipeline:

raw_data = observable([])
processed = (raw_data
    >> (lambda d: [x for x in d if x > 0])  # Filter positive values
    >> (lambda d: sorted(d))                # Sort results
    >> (lambda d: sum(d) / len(d) if d else 0))  # Calculate average

Conditional UI Updates:

user_input = observable("")
is_valid = user_input >> (lambda s: len(s) >= 3)
show_error = user_input & ~is_valid  # Show error when input is invalid but not empty

Reactive Calculations:

price = observable(10.0)
quantity = observable(1)
tax_rate = observable(0.08)

subtotal = (price + quantity) >> (lambda p, q: p * q)
tax = subtotal >> (lambda s: s * tax_rate.value)
total = (subtotal + tax) >> (lambda s, t: s + t)

Error Handling

Operators handle errors gracefully: - Transformation function errors are propagated but don't break the reactive system - Invalid operator usage provides clear error messages - Circular dependencies are detected and prevented

Best Practices

  • Keep Functions Pure: Transformation functions should not have side effects
  • Use Meaningful Lambdas: Complex operations deserve named functions
  • Chain Thoughtfully: Break complex chains into intermediate variables for clarity
  • Handle Edge Cases: Consider what happens with None, empty collections, etc.

Migration from Method Calls

If you're familiar with other reactive libraries, here's how FynX operators compare:

# Other libraries (method-based)
result = obs.map(lambda x: x * 2).filter(lambda x: x > 10)

# FynX (operator-based)
result = obs >> (lambda x: x * 2) & (obs >> (lambda x: x > 10))

The operator syntax is more concise and readable for simple transformations.

See Also

  • fynx.observable: Core observable classes that use these operators and mixins
  • fynx.computed: Computed observables created by the >> operator
  • fynx.watch: Conditional reactive functions (alternative to &)

OperatorMixin

Mixin class providing common reactive operators for observable classes.

This mixin consolidates the operator overloading logic that was previously duplicated across multiple observable classes. It provides the core reactive operators (add, rshift, and, invert) that enable FynX's fluent reactive programming syntax.

Classes inheriting from this mixin get automatic support for: - Merging with + operator - Transformation with >> operator - Conditional filtering with & operator - Boolean negation with ~ operator

This mixin should be used by classes that represent reactive values and need to support reactive composition operations.

__add__

__add__(other)

Combine this observable with another using the + operator.

This creates a merged observable that contains a tuple of both values and updates automatically when either observable changes.

Parameters:

Name Type Description Default
other

Another Observable to combine with

required

Returns:

Type Description
Mergeable

A MergedObservable containing both values as a tuple

__and__

__and__(condition)

Create a conditional observable using the & operator for filtered reactivity.

This creates a ConditionalObservable that only emits values when all specified conditions are True, enabling precise control over reactive updates.

Parameters:

Name Type Description Default
condition

A boolean Observable, callable, or compound condition

required

Returns:

Type Description
Conditional

A ConditionalObservable that filters values based on the condition

__invert__

__invert__()

Create a negated boolean observable using the ~ operator.

This creates a computed observable that returns the logical negation of the current boolean value, useful for creating inverse conditions.

Returns:

Type Description
Observable[bool]

A computed Observable[bool] with negated boolean value

__or__

__or__(other)

Create a logical OR condition using the | operator.

This creates a conditional observable that only emits when the OR result is truthy. If the initial OR result is falsy, raises ConditionalNeverMet.

Parameters:

Name Type Description Default
other

Another boolean observable to OR with

required

Returns:

Type Description
Observable

A conditional observable that only emits when OR is truthy

Raises:

Type Description
ConditionalNeverMet

If initial OR result is falsy

__radd__

__radd__(other)

Support right-side addition for merging observables.

This enables expressions like other + self to work correctly, ensuring that merged observables can be chained properly.

Parameters:

Name Type Description Default
other

Another Observable to combine with

required

Returns:

Type Description
Mergeable

A MergedObservable containing both values as a tuple

__rshift__

__rshift__(func)

Apply a transformation function using the >> operator to create computed observables.

This implements the functorial map operation over observables, allowing you to transform observable values through pure functions while preserving reactivity.

Parameters:

Name Type Description Default
func Callable

A pure function to apply to the observable's value(s)

required

Returns:

Type Description
Observable

A new computed Observable containing the transformed values

TupleMixin

Mixin class providing tuple-like operators for merged observables.

This mixin adds tuple-like behavior to observables that represent collections of values (like MergedObservable). It provides operators for iteration, indexing, and length operations that make merged observables behave like tuples of their component values.

Classes inheriting from this mixin get automatic support for: - Iteration with for item in merged: - Length with len(merged) - Indexing with merged[0], merged[-1], etc. - Setting values by index with merged[0] = new_value

__getitem__

__getitem__(index)

Allow indexing into the merged observable like a tuple.

__iter__

__iter__()

Allow iteration over the tuple value.

__len__

__len__()

Return the number of combined observables.

__setitem__

__setitem__(index, value)

Allow setting values by index, updating the corresponding source observable.

ValueMixin

Mixin class providing value wrapper operators for ObservableValue.

This mixin adds operators that make observable values behave transparently like their underlying values in most Python contexts. It provides magic methods for equality, string conversion, iteration, indexing, etc., while also supporting the reactive operators.

Classes inheriting from this mixin get automatic support for: - Value-like behavior (equality, string conversion, etc.) - Reactive operators (add, and, invert, rshift) - Transparent access to the wrapped observable

__add__

__add__(other)

Support merging observables with + operator.

__and__

__and__(condition)

Support conditional observables with & operator.

__invert__

__invert__()

Support negating conditions with ~ operator.

__radd__

__radd__(other)

Support right-side addition for merging observables.

__rshift__

__rshift__(func)

Support computed observables with >> operator.

and_operator

and_operator(obs, condition)

Implement the & operator for creating conditional observables.

This operator creates conditional observables that only emit values when boolean conditions are satisfied. The resulting observable filters the reactive stream, preventing unnecessary updates and computations when conditions aren't met.

Parameters:

Name Type Description Default
obs

The source observable whose values will be conditionally emitted.

required
condition

A boolean observable that acts as a gate. Values from obs are only emitted when this condition is True.

required

Returns:

Type Description

A new ConditionalObservable that only emits values when the condition is met.

The observable starts with None if the condition is initially False.

Examples:

from fynx.observable import Observable

# Basic conditional filtering
data = Observable("data", "hello")
is_ready = Observable("ready", False)

filtered = data & is_ready  # Only emits when is_ready is True

filtered.subscribe(lambda x: print(f"Received: {x}"))
data.set("world")      # No output (is_ready is False)
is_ready.set(True)     # Prints: "Received: world"

# Multiple conditions (chained)
user_present = Observable("present", True)
smart_data = data & is_ready & user_present  # All must be True

# Practical example: temperature monitoring
temperature = Observable("temp", 20)
alarm_enabled = Observable("alarm", True)
is_critical = Observable("critical", False)

alarm_trigger = temperature & alarm_enabled & is_critical
alarm_trigger.subscribe(lambda t: print(f"🚨 Alarm: {t}°C"))
Note

Multiple conditions can be chained: obs & cond1 & cond2 & cond3. All conditions must be True for values to be emitted.

See Also

ConditionalObservable: The class that implements conditional behavior Observable.and: The magic method that calls this operator

rshift_operator

rshift_operator(obs, func)

Implement the >> operator with comprehensive categorical optimization.

This operator creates computed observables using the full categorical optimization system, applying functor composition fusion, product factorization, and cost-optimal materialization strategies automatically.

Categorical Optimization System: - Rule 1: Functor composition collapse (fuses sequential transformations) - Rule 2: Product factorization (shares common subexpressions) - Rule 3: Pullback fusion (combines sequential filters) - Rule 4: Cost-optimal materialization (decides what to cache vs recompute)

The optimization uses a cost functional C(σ) = α·|Dep(σ)| + β·E[Updates(σ)] + γ·depth(σ) to find semantically equivalent observables with minimal computational cost.

For merged observables (created with +), the function receives multiple arguments corresponding to the tuple values. For single observables, it receives one argument.

Parameters:

Name Type Description Default
obs Observable[T]

The source observable(s) to transform. Can be a single Observable or a MergedObservable (from + operator).

required
func Callable[..., U]

A pure function that transforms the observable value(s). For merged observables, receives unpacked tuple values as separate arguments.

required

Returns:

Type Description
Observable[U]

A new computed observable with optimal structure. Updates automatically

Observable[U]

when source observables change, but with dramatically improved performance

Observable[U]

through categorical optimizations.

Examples:

from fynx.observable import Observable

# Single observable with automatic optimization
counter = Observable("counter", 5)
result = counter >> (lambda x: x * 2) >> (lambda x: x + 10) >> str
# Automatically optimized to single fused computation

# Complex reactive pipelines are optimized globally
width = Observable("width", 10)
height = Observable("height", 20)
area = (width + height) >> (lambda w, h: w * h)
volume = (width + height + Observable("depth", 5)) >> (lambda w, h, d: w * h * d)
# Shared width/height computations are factored out automatically
Performance
  • Chain fusion: O(N) depth → O(1) for transformation chains
  • Subexpression sharing: Eliminates redundant computations
  • Cost optimization: Balances memory vs computation tradeoffs
  • Typical speedup: 1000× - 10000× for deep reactive graphs
See Also

Observable.then: The method that creates computed observables MergedObservable: For combining multiple observables with + optimizer: The categorical optimization system