Confluence

Unified Fan-in and Fan-out Event Processing

Confluence is Happen's system for handling multiple events and multiple nodes with minimal API surface. It provides powerful capabilities for both event batching (fan-in) and multi-node distribution (fan-out) through a single, intuitive container pattern.

Core Idea

Confluence embodies a simple concept: When something is in a container, it represents a collection. This principle applies consistently whether you're working with nodes or events:

  • An array of nodes means "multiple receivers"

  • An array of events means "a batch of events"

  • Handlers naturally work with both individual items and collections

This symmetry creates a powerful system with virtually no new API surface.

API Surface

The entire Confluence system introduces zero new methods, instead extending Happen's existing API to work with arrays:

Fan-out: One Event, Multiple Nodes

// Register a handler across multiple nodes using an array
[orderNode, paymentNode, inventoryNode].on("update", (event, context) => {
  // Handler receives events from any of these nodes
  console.log(`Processing update in ${context.node.id}`);
});

// Send an event to multiple nodes using an array
[orderNode, shippingNode, notificationNode].send({
  type: "order-completed",
  payload: { orderId: "ORD-123" }
});

Fan-in: Multiple Events, One Handler

// Send multiple events as a batch using an array
node.send(targetNode, [
  { type: "data-point", payload: { value: 10 } },
  { type: "data-point", payload: { value: 20 } },
  { type: "data-point", payload: { value: 30 } }
]);

// Handler naturally works with both individual events and batches
node.on("data-point", (eventOrEvents, context) => {
  // Simple array check tells you what you received
  if (Array.isArray(eventOrEvents)) {
    console.log(`Processing batch of ${eventOrEvents.length} events`);
    return processBatch(eventOrEvents);
  } else {
    console.log(`Processing single event: ${eventOrEvents.payload.value}`);
    return processSingle(eventOrEvents);
  }
});

The Event Continuum and Divergent Flows

When an event is sent to multiple nodes using Confluence, each node processes it through its own independent flow chain, creating "divergent flows" - parallel processing paths that naturally extend the Event Continuum model:

// Register with multiple nodes
[orderNode, inventoryNode, notificationNode].on("order-updated", function validateUpdate(event, context) {
  // This function runs independently for each node
  console.log(`Validating update in ${context.node.id}`);
  
  // Each node can return a different next function
  if (context.node.id === "order-service") {
    return updateOrderRecord;
  } else if (context.node.id === "inventory-service") {
    return updateInventoryLevels;
  } else {
    return sendNotifications;
  }
});

// Node-specific continuation functions
function updateOrderRecord(event, context) {
  // Only runs in the order node's flow
  console.log("Updating order records");
  return { updated: true };
}

function updateInventoryLevels(event, context) {
  // Only runs in the inventory node's flow
  console.log("Updating inventory levels");
  return { inventoryUpdated: true };
}

function sendNotifications(event, context) {
  // Only runs in the notification node's flow
  console.log("Sending notifications");
  return { notificationsSent: true };
}

This creates a causal tree structure where:

  • Each node has its own isolated context

  • Flow paths can diverge based on node-specific logic

  • Return values are tracked per node

  • The complete causal history is preserved

When you need the results from multiple nodes:

// Send to multiple nodes and await results using the standard return method
const results = await sender.send([node1, node2, node3], event).return();

// Results contains node-specific returns in a keyed object
console.log(`Node1 result: ${results.node1}`);
console.log(`Node2 result: ${results.node2}`);
console.log(`Node3 result: ${results.node3}`);

No Config Needed

Since batching is fully explicit with arrays, there's no magical configuration needed. Batches are simply arrays of events that you create and send directly:

// Send a single event
node.send(targetNode, {
  type: "data-point",
  payload: { value: 42 }
});

// Send a batch as an explicit array of events
node.send(targetNode, [
  { type: "data-point", payload: { value: 10 } },
  { type: "data-point", payload: { value: 20 } },
  { type: "data-point", payload: { value: 30 } }
]);

This direct approach ensures complete predictability with no behind-the-scenes magic.

Causality Preservation

Even with batches and multi-node processing, Confluence maintains Happen's causality guarantees:

  • The causal context is preserved for each event in a batch

  • Each event maintains its position in the causal chain

  • Batch processing still records each event's causal history

  • Divergent flows are tracked as branches in the causal tree

This means you can always trace the complete history of events, even when they've been processed in batches or across multiple nodes.

Context in Multi-Node Operations

When working with multiple nodes, context handling is structured to maintain node-specific isolation while providing clear access to relevant information:

Node-Specific Context Structure

When a handler receives an event in a multi-node operation, the context structure clearly identifies which node is processing it:

[orderNode, inventoryNode, shippingNode].on("order-updated", (event, context) => {
  // The context.node property identifies the specific node
  console.log(`Processing in: ${context.node.id}`);
  
  // Each node maintains its own isolated context
  context.nodeState = context.nodeState || {};
  context.nodeState.lastProcessed = Date.now();
});

Results Collection

When collecting results from multiple nodes, the returned object has a clear structure with node IDs as keys:

// Send to multiple nodes and collect results
const results = await sender.send([orderNode, inventoryNode, shippingNode], {
  type: "verify-order",
  payload: { orderId: "ORD-123" }
}).return();

// Results object is structured by node ID
console.log(`Order verification: ${results[orderNode.id].verified}`);
console.log(`Inventory status: ${results[inventoryNode.id].available}`);
console.log(`Shipping estimate: ${results[shippingNode.id].estimatedDays}`);

Batch Context Structure

For batch operations (multiple events to a single node), the context provides batch-level information at the root, with individual event contexts in an array:

node.on("data-point", (events, context) => {
  if (Array.isArray(events)) {
    // Batch-level information at the root
    console.log(`Processing batch of ${events.length} events`);
    console.log(`Batch received at: ${context.receivedAt}`);
    
    // Each event has its own context in the events array
    events.forEach((event, index) => {
      // context.events contains the individual event contexts
      const eventContext = context.events[index];
      console.log(`Event ${index} causation: ${eventContext.causal.causationId}`);
    });
  }
});

Combined Multi-Node and Batch Operations

In the rare case of both batching and multi-node operations, the context structure maintains clear separation:

[nodeA, nodeB].on("process-batch", (eventsOrEvent, context) => {
  // First determine which node we are
  const nodeId = context.node.id;
  
  // Then check if we're processing a batch
  if (Array.isArray(eventsOrEvent)) {
    // We're in a specific node, processing a batch
    console.log(`Node ${nodeId} processing batch of ${eventsOrEvent.length} events`);
    
    // Access node-specific batch processing logic
    return nodeProcessingStrategies[nodeId].processBatch(eventsOrEvent, context);
  } else {
    // We're in a specific node, processing a single event
    console.log(`Node ${nodeId} processing single event`);
    
    // Access node-specific individual processing logic
    return nodeProcessingStrategies[nodeId].processSingle(eventsOrEvent, context);
  }
});

This context structure ensures that no matter how complex the operation, each node maintains its own isolated processing environment while still providing clear access to all necessary information.

Examples

Explicit Batch Creation and Processing

// Efficiently handle high-volume sensor data
const telemetryNode = createNode("telemetry-processor");

// Explicitly create batches - no magic behind the scenes
function sendSensorData(readings) {
  // Create batches of appropriate size
  const batchSize = 50;
  
  for (let i = 0; i < readings.length; i += batchSize) {
    // Explicitly create a batch as an array of events
    const batch = readings.slice(i, i + batchSize).map(reading => ({
      type: "sensor-reading",
      payload: { 
        sensorId: reading.sensorId, 
        value: reading.value 
      }
    }));
    
    // Send the batch explicitly as an array
    sensorNode.send(telemetryNode, batch);
  }
}

// Generate some sample readings
const readings = Array.from({ length: 1000 }, (_, i) => ({
  sensorId: `sensor-${i % 10}`,
  value: Math.random() * 100
}));

// Send readings in explicit batches
sendSensorData(readings);

// Handler processes either single events or explicitly sent batches
telemetryNode.on("sensor-reading", (eventOrEvents, context) => {
  if (Array.isArray(eventOrEvents)) {
    // Process an explicitly sent batch
    const readings = eventOrEvents.map(e => e.payload.value);
    const average = readings.reduce((sum, v) => sum + v, 0) / readings.length;
    
    console.log(`Processed batch of ${eventOrEvents.length} readings, avg: ${average}`);
    return { processed: readings.length };
  } else {
    // Process single event
    processSingleReading(eventOrEvents.payload);
    return { processed: 1 };
  }
});

Distributed Notification System with Divergent Flows

// System monitor that alerts multiple subsystems
const monitorNode = createNode("system-monitor");
const alertNode = createNode("alert-service");
const loggingNode = createNode("logging-service");
const adminNode = createNode("admin-dashboard");

// Send critical alerts to multiple services
function reportCriticalIssue(issue) {
  [alertNode, loggingNode, adminNode].send({
    type: "critical-alert",
    payload: {
      issue,
      timestamp: Date.now(),
      severity: "critical"
    }
  });
}

// Each node processes alerts through its own flow
[alertNode, loggingNode, adminNode].on("critical-alert", function processAlert(event, context) {
  // Common processing logic
  console.log(`Alert received in ${context.node.id}`);
  
  // Node-specific next step
  if (context.node.id === "alert-service") {
    return sendUserAlerts;
  } else if (context.node.id === "logging-service") {
    return createLogEntry;
  } else {
    return updateDashboard;
  }
});

// Node-specific continuation functions
function sendUserAlerts(event, context) {
  // Alert service sends notifications to users
  return { alertsSent: 10 };
}

function createLogEntry(event, context) {
  // Logging service creates permanent record
  return { loggedAt: Date.now() };
}

function updateDashboard(event, context) {
  // Admin dashboard updates UI
  return { dashboardUpdated: true };
}

Manual Batch Processing

// Explicitly create and send a batch of events
const orderBatch = [
  { type: "process-order", payload: { orderId: "ORD-001", total: 125.00 } },
  { type: "process-order", payload: { orderId: "ORD-002", total: 75.50 } },
  { type: "process-order", payload: { orderId: "ORD-003", total: 240.00 } }
];

// Send the batch
node.send(processingNode, orderBatch);

// Handle the batch on the receiving side
processingNode.on("process-order", (eventOrEvents, context) => {
  if (Array.isArray(eventOrEvents)) {
    // Batch processing
    const totalValue = eventOrEvents.reduce((sum, e) => sum + e.payload.total, 0);
    return { totalProcessed: eventOrEvents.length, totalValue };
  } else {
    // Individual processing
    return { processed: eventOrEvents.payload.orderId };
  }
});

Performance Considerations

Confluence is designed to be explicit and predictable while still providing performance benefits:

When to Use Batching

  • High-volume event streams with similar event types

  • Processing that benefits from aggregation (like analytics)

  • Networks with significant latency where reducing round-trips helps

  • Calculations that can be optimized when performed on multiple items together

When to Use Multi-node Operations

  • Broadcast notifications that multiple subsystems need to process

  • Commands that affect multiple services simultaneously

  • Cross-cutting concerns like logging, monitoring, or security

  • Redundant processing for critical operations

Batch Processing Efficiency

For maximum efficiency when processing batches:

  1. Use Specialized Algorithms: Many operations are more efficient on batches (like bulk database inserts)

  2. Minimize Per-event Overhead: Amortize setup costs across multiple events

  3. Leverage Memory Locality: Process related data together to improve cache efficiency

  4. Prefer Single Passes: Process the entire batch in one pass rather than multiple iterations

Memory Management

Batch processing can also help with memory efficiency:

// Process high-volume data with controlled memory usage
node.on("high-volume-data", (eventOrEvents, context) => {
  if (Array.isArray(eventOrEvents)) {
    // Process incrementally to avoid memory spikes
    let results = [];
    const batchSize = 100;
    
    for (let i = 0; i < eventOrEvents.length; i += batchSize) {
      const chunk = eventOrEvents.slice(i, i + batchSize);
      const chunkResults = processChunk(chunk);
      results = results.concat(chunkResults);
      
      // Allow event loop to run between chunks if needed
      if (i + batchSize < eventOrEvents.length) {
        yield { progress: Math.round((i + batchSize) / eventOrEvents.length * 100) };
      }
    }
    
    return { results };
  }
});

Conclusion

Confluence provides powerful capabilities for handling multiple events and multiple nodes while maintaining Happen's commitment to radical simplicity. Through a single intuitive container pattern - using arrays for both nodes and events - it enables sophisticated batch processing and multi-node communication without introducing special methods or complex APIs.

The system offers:

  1. Pure Symmetry: The same pattern works for both nodes and events

  2. Explicit Control: No "magic" - batching and multi-node operations are explicit

  3. Divergent Flows: Natural extension of the Event Continuum to parallel processing

  4. Zero New Methods: Works entirely through existing APIs with array support

  5. Powerful Capabilities: Enables sophisticated patterns with minimal complexity

Staying true to Happen's core philosophy of simplicity by recognizing that arrays naturally represent collections enables a powerful system with virtually no learning curve.

Last updated