Confluence
Unified Fan-in and Fan-out Event Processing
Confluence is Happen's system for handling multiple events and multiple nodes with minimal API surface. It provides powerful capabilities for both event batching (fan-in) and multi-node distribution (fan-out) through a single, intuitive container pattern.
Core Idea
Confluence embodies a simple concept: When something is in a container, it represents a collection. This principle applies consistently whether you're working with nodes or events:
An array of nodes means "multiple receivers"
An array of events means "a batch of events"
Handlers naturally work with both individual items and collections
This symmetry creates a powerful system with virtually no new API surface.
API Surface
The entire Confluence system introduces zero new methods, instead extending Happen's existing API to work with arrays:
Fan-out: One Event, Multiple Nodes
// Register a handler across multiple nodes using an array
[orderNode, paymentNode, inventoryNode].on("update", (event, context) => {
// Handler receives events from any of these nodes
console.log(`Processing update in ${context.node.id}`);
});
// Send an event to multiple nodes using an array
[orderNode, shippingNode, notificationNode].send({
type: "order-completed",
payload: { orderId: "ORD-123" }
});
Fan-in: Multiple Events, One Handler
// Send multiple events as a batch using an array
node.send(targetNode, [
{ type: "data-point", payload: { value: 10 } },
{ type: "data-point", payload: { value: 20 } },
{ type: "data-point", payload: { value: 30 } }
]);
// Handler naturally works with both individual events and batches
node.on("data-point", (eventOrEvents, context) => {
// Simple array check tells you what you received
if (Array.isArray(eventOrEvents)) {
console.log(`Processing batch of ${eventOrEvents.length} events`);
return processBatch(eventOrEvents);
} else {
console.log(`Processing single event: ${eventOrEvents.payload.value}`);
return processSingle(eventOrEvents);
}
});
The Event Continuum and Divergent Flows
When an event is sent to multiple nodes using Confluence, each node processes it through its own independent flow chain, creating "divergent flows" - parallel processing paths that naturally extend the Event Continuum model:
// Register with multiple nodes
[orderNode, inventoryNode, notificationNode].on("order-updated", function validateUpdate(event, context) {
// This function runs independently for each node
console.log(`Validating update in ${context.node.id}`);
// Each node can return a different next function
if (context.node.id === "order-service") {
return updateOrderRecord;
} else if (context.node.id === "inventory-service") {
return updateInventoryLevels;
} else {
return sendNotifications;
}
});
// Node-specific continuation functions
function updateOrderRecord(event, context) {
// Only runs in the order node's flow
console.log("Updating order records");
return { updated: true };
}
function updateInventoryLevels(event, context) {
// Only runs in the inventory node's flow
console.log("Updating inventory levels");
return { inventoryUpdated: true };
}
function sendNotifications(event, context) {
// Only runs in the notification node's flow
console.log("Sending notifications");
return { notificationsSent: true };
}
This creates a causal tree structure where:
Each node has its own isolated context
Flow paths can diverge based on node-specific logic
Return values are tracked per node
The complete causal history is preserved
When you need the results from multiple nodes:
// Send to multiple nodes and await results using the standard return method
const results = await sender.send([node1, node2, node3], event).return();
// Results contains node-specific returns in a keyed object
console.log(`Node1 result: ${results.node1}`);
console.log(`Node2 result: ${results.node2}`);
console.log(`Node3 result: ${results.node3}`);
No Config Needed
Since batching is fully explicit with arrays, there's no magical configuration needed. Batches are simply arrays of events that you create and send directly:
// Send a single event
node.send(targetNode, {
type: "data-point",
payload: { value: 42 }
});
// Send a batch as an explicit array of events
node.send(targetNode, [
{ type: "data-point", payload: { value: 10 } },
{ type: "data-point", payload: { value: 20 } },
{ type: "data-point", payload: { value: 30 } }
]);
This direct approach ensures complete predictability with no behind-the-scenes magic.
Causality Preservation
Even with batches and multi-node processing, Confluence maintains Happen's causality guarantees:
The causal context is preserved for each event in a batch
Each event maintains its position in the causal chain
Batch processing still records each event's causal history
Divergent flows are tracked as branches in the causal tree
This means you can always trace the complete history of events, even when they've been processed in batches or across multiple nodes.
Context in Multi-Node Operations
When working with multiple nodes, context handling is structured to maintain node-specific isolation while providing clear access to relevant information:
Node-Specific Context Structure
When a handler receives an event in a multi-node operation, the context structure clearly identifies which node is processing it:
[orderNode, inventoryNode, shippingNode].on("order-updated", (event, context) => {
// The context.node property identifies the specific node
console.log(`Processing in: ${context.node.id}`);
// Each node maintains its own isolated context
context.nodeState = context.nodeState || {};
context.nodeState.lastProcessed = Date.now();
});
Results Collection
When collecting results from multiple nodes, the returned object has a clear structure with node IDs as keys:
// Send to multiple nodes and collect results
const results = await sender.send([orderNode, inventoryNode, shippingNode], {
type: "verify-order",
payload: { orderId: "ORD-123" }
}).return();
// Results object is structured by node ID
console.log(`Order verification: ${results[orderNode.id].verified}`);
console.log(`Inventory status: ${results[inventoryNode.id].available}`);
console.log(`Shipping estimate: ${results[shippingNode.id].estimatedDays}`);
Batch Context Structure
For batch operations (multiple events to a single node), the context provides batch-level information at the root, with individual event contexts in an array:
node.on("data-point", (events, context) => {
if (Array.isArray(events)) {
// Batch-level information at the root
console.log(`Processing batch of ${events.length} events`);
console.log(`Batch received at: ${context.receivedAt}`);
// Each event has its own context in the events array
events.forEach((event, index) => {
// context.events contains the individual event contexts
const eventContext = context.events[index];
console.log(`Event ${index} causation: ${eventContext.causal.causationId}`);
});
}
});
Combined Multi-Node and Batch Operations
In the rare case of both batching and multi-node operations, the context structure maintains clear separation:
[nodeA, nodeB].on("process-batch", (eventsOrEvent, context) => {
// First determine which node we are
const nodeId = context.node.id;
// Then check if we're processing a batch
if (Array.isArray(eventsOrEvent)) {
// We're in a specific node, processing a batch
console.log(`Node ${nodeId} processing batch of ${eventsOrEvent.length} events`);
// Access node-specific batch processing logic
return nodeProcessingStrategies[nodeId].processBatch(eventsOrEvent, context);
} else {
// We're in a specific node, processing a single event
console.log(`Node ${nodeId} processing single event`);
// Access node-specific individual processing logic
return nodeProcessingStrategies[nodeId].processSingle(eventsOrEvent, context);
}
});
This context structure ensures that no matter how complex the operation, each node maintains its own isolated processing environment while still providing clear access to all necessary information.
Examples
Explicit Batch Creation and Processing
// Efficiently handle high-volume sensor data
const telemetryNode = createNode("telemetry-processor");
// Explicitly create batches - no magic behind the scenes
function sendSensorData(readings) {
// Create batches of appropriate size
const batchSize = 50;
for (let i = 0; i < readings.length; i += batchSize) {
// Explicitly create a batch as an array of events
const batch = readings.slice(i, i + batchSize).map(reading => ({
type: "sensor-reading",
payload: {
sensorId: reading.sensorId,
value: reading.value
}
}));
// Send the batch explicitly as an array
sensorNode.send(telemetryNode, batch);
}
}
// Generate some sample readings
const readings = Array.from({ length: 1000 }, (_, i) => ({
sensorId: `sensor-${i % 10}`,
value: Math.random() * 100
}));
// Send readings in explicit batches
sendSensorData(readings);
// Handler processes either single events or explicitly sent batches
telemetryNode.on("sensor-reading", (eventOrEvents, context) => {
if (Array.isArray(eventOrEvents)) {
// Process an explicitly sent batch
const readings = eventOrEvents.map(e => e.payload.value);
const average = readings.reduce((sum, v) => sum + v, 0) / readings.length;
console.log(`Processed batch of ${eventOrEvents.length} readings, avg: ${average}`);
return { processed: readings.length };
} else {
// Process single event
processSingleReading(eventOrEvents.payload);
return { processed: 1 };
}
});
Distributed Notification System with Divergent Flows
// System monitor that alerts multiple subsystems
const monitorNode = createNode("system-monitor");
const alertNode = createNode("alert-service");
const loggingNode = createNode("logging-service");
const adminNode = createNode("admin-dashboard");
// Send critical alerts to multiple services
function reportCriticalIssue(issue) {
[alertNode, loggingNode, adminNode].send({
type: "critical-alert",
payload: {
issue,
timestamp: Date.now(),
severity: "critical"
}
});
}
// Each node processes alerts through its own flow
[alertNode, loggingNode, adminNode].on("critical-alert", function processAlert(event, context) {
// Common processing logic
console.log(`Alert received in ${context.node.id}`);
// Node-specific next step
if (context.node.id === "alert-service") {
return sendUserAlerts;
} else if (context.node.id === "logging-service") {
return createLogEntry;
} else {
return updateDashboard;
}
});
// Node-specific continuation functions
function sendUserAlerts(event, context) {
// Alert service sends notifications to users
return { alertsSent: 10 };
}
function createLogEntry(event, context) {
// Logging service creates permanent record
return { loggedAt: Date.now() };
}
function updateDashboard(event, context) {
// Admin dashboard updates UI
return { dashboardUpdated: true };
}
Manual Batch Processing
// Explicitly create and send a batch of events
const orderBatch = [
{ type: "process-order", payload: { orderId: "ORD-001", total: 125.00 } },
{ type: "process-order", payload: { orderId: "ORD-002", total: 75.50 } },
{ type: "process-order", payload: { orderId: "ORD-003", total: 240.00 } }
];
// Send the batch
node.send(processingNode, orderBatch);
// Handle the batch on the receiving side
processingNode.on("process-order", (eventOrEvents, context) => {
if (Array.isArray(eventOrEvents)) {
// Batch processing
const totalValue = eventOrEvents.reduce((sum, e) => sum + e.payload.total, 0);
return { totalProcessed: eventOrEvents.length, totalValue };
} else {
// Individual processing
return { processed: eventOrEvents.payload.orderId };
}
});
Performance Considerations
Confluence is designed to be explicit and predictable while still providing performance benefits:
When to Use Batching
High-volume event streams with similar event types
Processing that benefits from aggregation (like analytics)
Networks with significant latency where reducing round-trips helps
Calculations that can be optimized when performed on multiple items together
When to Use Multi-node Operations
Broadcast notifications that multiple subsystems need to process
Commands that affect multiple services simultaneously
Cross-cutting concerns like logging, monitoring, or security
Redundant processing for critical operations
Batch Processing Efficiency
For maximum efficiency when processing batches:
Use Specialized Algorithms: Many operations are more efficient on batches (like bulk database inserts)
Minimize Per-event Overhead: Amortize setup costs across multiple events
Leverage Memory Locality: Process related data together to improve cache efficiency
Prefer Single Passes: Process the entire batch in one pass rather than multiple iterations
Memory Management
Batch processing can also help with memory efficiency:
// Process high-volume data with controlled memory usage
node.on("high-volume-data", (eventOrEvents, context) => {
if (Array.isArray(eventOrEvents)) {
// Process incrementally to avoid memory spikes
let results = [];
const batchSize = 100;
for (let i = 0; i < eventOrEvents.length; i += batchSize) {
const chunk = eventOrEvents.slice(i, i + batchSize);
const chunkResults = processChunk(chunk);
results = results.concat(chunkResults);
// Allow event loop to run between chunks if needed
if (i + batchSize < eventOrEvents.length) {
yield { progress: Math.round((i + batchSize) / eventOrEvents.length * 100) };
}
}
return { results };
}
});
Conclusion
Confluence provides powerful capabilities for handling multiple events and multiple nodes while maintaining Happen's commitment to radical simplicity. Through a single intuitive container pattern - using arrays for both nodes and events - it enables sophisticated batch processing and multi-node communication without introducing special methods or complex APIs.
The system offers:
Pure Symmetry: The same pattern works for both nodes and events
Explicit Control: No "magic" - batching and multi-node operations are explicit
Divergent Flows: Natural extension of the Event Continuum to parallel processing
Zero New Methods: Works entirely through existing APIs with array support
Powerful Capabilities: Enables sophisticated patterns with minimal complexity
Staying true to Happen's core philosophy of simplicity by recognizing that arrays naturally represent collections enables a powerful system with virtually no learning curve.
Last updated