Asynchronous Processing
Asynchronous processing is a cornerstone of Happen's design, allowing the framework to handle concurrent events efficiently while maintaining its commitment to radical simplicity. This document explains how Happen approaches asynchronous processing through multiple strategies: concurrent event handling, direct runtime capabilities, function-based flows, and streaming results.
Core Approach
In Happen, multiple events arriving at a node are processed concurrently without blocking each other, leveraging JavaScript's asynchronous capabilities while preserving event isolation. This happens as an implementation detail rather than an explicit API feature.
When events arrive at a node:
Each event gets its own independent execution context
Events are processed concurrently using JavaScript's natural Promise mechanism
One event processing never blocks another, even within the same node
This approach ensures that slow-running event handlers don't create bottlenecks for unrelated events.
The Functional Asynchronous Model
Happen embraces JavaScript's native asynchronous capabilities while providing a clean, functional interface through the Event Continuum model:
// Register an asynchronous handler
orderNode.on("process-order", async (event, context) => {
// Perform asynchronous validation
const validationResult = await validateOrderData(event.payload);
// Store in context
context.validation = validationResult;
if (!validationResult.valid) {
return {
success: false,
reason: "validation-failed",
errors: validationResult.errors
};
}
// Return the next function to execute
return processPayment;
});
// Asynchronous function in the flow
async function processPayment(event, context) {
// Process payment asynchronously
const paymentResult = await processTransaction(event.payload.payment);
// Store in context
context.payment = paymentResult;
if (!paymentResult.success) {
return handlePaymentFailure;
}
// Return next function
return createShipment;
}
Happen's event execution engine handles the async nature transparently:
When a function returns a Promise, Happen awaits it automatically
Async functions are fully supported throughout the flow
Error handling works seamlessly with async/await
Automatic Queue Management
For high-volume scenarios, Happen includes internal queue management to prevent overwhelming the system:
// Internal queue management - not exposed to users
class Node {
// Configuration for concurrent execution
#concurrencySettings = {
maxConcurrent: 100, // Maximum concurrent executions
queueLimit: 10000 // Maximum queued events
};
#activeCount = 0; // Currently executing handlers
#queue = []; // Queue of pending events
// Process with queue management
process(event) {
// If under concurrent limit, execute immediately
if (this.#activeCount < this.#concurrencySettings.maxConcurrent) {
return this.executeImmediately(event);
}
// Otherwise, queue the event
return this.queueEvent(event);
}
// Execute immediately
executeImmediately(event) {
this.#activeCount++;
// Start the event flow
return this.executeFlow(event)
.finally(() => {
this.#activeCount--;
this.processNextFromQueue();
});
}
// Queue for later execution
queueEvent(event) {
// Check queue limits
if (this.#queue.length >= this.#concurrencySettings.queueLimit) {
return Promise.reject(new Error("Event queue limit exceeded"));
}
// Create deferred promise
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
// Add to queue with resolvers
this.#queue.push({ event, resolve, reject });
return promise;
}
// Process next from queue
processNextFromQueue() {
// If queue empty or at concurrency limit, do nothing
if (this.#queue.length === 0 ||
this.#activeCount >= this.#concurrencySettings.maxConcurrent) {
return;
}
// Get next event from queue
const next = this.#queue.shift();
// Process it
this.executeImmediately(next.event)
.then(result => next.resolve(result))
.catch(error => next.reject(error));
}
}
This queue management:
Limits maximum concurrent event handling to prevent resource exhaustion
Preserves ordered processing within each event type
Provides backpressure for high-volume scenarios
Manages memory usage by controlling queue size
Node Configuration
Happen manages concurrency through the system configuration. This approach keeps infrastructure concerns separated from domain logic and provides consistent behavior across your application.
System-Level Concurrency Configuration
Concurrency settings are defined during framework initialization:
// Initialization with system-wide defaults
const happen = initializeHappen({
// Core security configuration
security: {
// Security settings...
},
// System-wide concurrency configuration
concurrency: {
// Default settings for all nodes
default: {
maxConcurrent: 100, // Maximum concurrent executions
queueLimit: 10000, // Maximum queued events
queueStrategy: "fifo", // First-in-first-out processing
backpressureThreshold: 0.8 // Apply backpressure at 80% capacity
},
// Node-specific overrides based on node ID patterns
patterns: {
"api-*": {
maxConcurrent: 200, // Higher concurrency for API nodes
queueLimit: 20000 // Larger queue for API nodes
},
"worker-*": {
maxConcurrent: 50, // Lower concurrency for worker nodes
queueStrategy: "priority" // Priority-based queuing
},
"realtime-*": {
maxConcurrent: 300, // Higher concurrency for realtime nodes
queueLimit: 5000, // Smaller queue to avoid stale data
backpressureThreshold: 0.6 // Earlier backpressure
}
}
},
transport: {
// Transport settings...
}
});
// The initialized framework provides the node creation function
const { createNode } = happen;
This hierarchical approach ensures consistent behavior while allowing for customization where needed.
Generator-Based Streaming
Building on the foundation of asynchronous processing, Happen supports generator-based streaming for incremental data processing.
Instead of introducing separate API methods for streaming, Happen detects generator functions automatically:
// Register a handler that happens to be a generator function
node.on("process-large-dataset", async function* processLargeDataset(event, context) {
const { dataset } = event.payload;
// Process in chunks
for (let i = 0; i < dataset.length; i += 100) {
const chunk = dataset.slice(i, i + 100);
const result = await processChunk(chunk);
// Yield intermediate results
yield {
progress: Math.min(100, Math.round((i + chunk.length) / dataset.length * 100)),
results: result
};
}
// Final result
return { status: "complete" };
});
When interacting with a generator-based handler, Happen automatically provides an AsyncIterator interface:
// Request handling of a large dataset using the flow mechanism
clientNode.on("process-dataset", async function* handleDatasetProcessing(event, context) {
// Send the request to the processor node
const processingEvent = {
type: "process-large-dataset",
payload: { dataset: event.payload.dataset }
};
// Use the send method without chaining
const processingStream = await clientNode.send(processorNode, processingEvent);
// Process results incrementally using the event continuum
for await (const update of processingStream) {
// Log progress
console.log(`Progress: ${update.progress}%`);
// Display intermediate results
displayIntermediateResults(update.results);
// Yield progress updates to the caller if needed
yield {
status: "processing",
progress: update.progress,
resultsCount: update.results.length
};
}
// Return final result
return {
status: "complete",
message: "Dataset processing complete"
};
});
// To use this handler:
const result = await userNode.send(clientNode, {
type: "process-dataset",
payload: {
dataset: largeDataset
}
});
// The result will be the final return value from the handler
Generator functions also work with broadcasting, enabling publish-subscribe patterns for streams:
// Continuous system monitoring with generator
monitorNode.on("system-metrics", async function* generateMetrics() {
while (true) {
const metrics = await collectSystemMetrics();
yield metrics;
await sleep(5000); // Update every 5 seconds
}
});
// Start broadcasting metrics
monitorNode.broadcast({ type: "system-metrics" });
// Subscribers receive the stream
dashboardNode.on("system-metrics", async function processMetrics(metricsStream, context) {
// Iterate through metrics stream
for await (const metrics of metricsStream) {
// Update dashboard
updateDashboard(metrics);
}
});
Use Cases for Streaming
Generator-based streaming in Happen addresses several key use cases:
1. Large Dataset Processing
Break down large datasets into manageable chunks:
dataProcessorNode.on("process-batch", async function* processBatch(event, context) {
const { items } = event.payload;
// Process in chunks to avoid memory issues
for (let i = 0; i < items.length; i += 100) {
const chunk = items.slice(i, i + 100);
const results = await processChunk(chunk);
yield {
progress: Math.round((i + chunk.length) / items.length * 100),
processedItems: chunk.length,
results
};
}
});
2. Real-time Progress Updates
Provide incremental feedback for long-running operations:
importNode.on("import-file", async function* importFile(event, context) {
const { filePath } = event.payload;
const fileSize = await getFileSize(filePath);
const reader = createFileReader(filePath);
let bytesProcessed = 0;
// Process file in chunks
while (!reader.eof()) {
const chunk = await reader.readChunk(1024 * 1024); // 1MB chunks
const records = parseRecords(chunk);
await saveRecords(records);
bytesProcessed += chunk.length;
// Yield progress update
yield {
progress: Math.round(bytesProcessed / fileSize * 100),
recordsProcessed: records.length
};
}
// Final result
return { status: "complete", totalRecords: getTotalRecords() };
});
3. Continuous Data Streams
Handle data from sources that produce continuous updates:
sensorNode.on("monitor-sensors", async function* monitorSensors(event, context) {
const { sensorIds, duration } = event.payload;
const endTime = Date.now() + duration;
// Connect to sensors
const sensors = sensorIds.map(id => connectToSensor(id));
try {
// Process sensor data until duration expires
while (Date.now() < endTime) {
// Collect readings from all sensors
const readings = await Promise.all(
sensors.map(sensor => sensor.getReading())
);
// Process this batch of readings
const analysis = analyzeReadings(readings);
// Emit real-time analysis
yield {
timestamp: Date.now(),
readings,
analysis
};
// Wait before next collection
await sleep(1000); // 1 second interval
}
} finally {
// Clean up resources
sensors.forEach(sensor => sensor.disconnect());
}
});
4. Paginated API Results
Fetch and process paginated data from external APIs:
apiNode.on("fetch-all-records", async function* fetchAllRecords(event, context) {
const { endpoint, pageSize = 100 } = event.payload;
let page = 1;
let hasMore = true;
let totalRecords = 0;
// Fetch pages until complete
while (hasMore) {
// Fetch this page
const response = await fetch(`${endpoint}?page=${page}&size=${pageSize}`);
const data = await response.json();
// Process records
processRecords(data.records);
totalRecords += data.records.length;
// Yield progress
yield {
page,
records: data.records,
totalRecords,
hasMore: data.hasMore
};
// Update for next iteration
hasMore = data.hasMore;
page++;
// Respect rate limits
if (hasMore) {
await sleep(200); // Small delay between requests
}
}
});
Benefits of Happen's Async Processing Model
The combination of concurrent event processing and generator-based streaming delivers several key benefits:
Maximized Throughput: Multiple events process concurrently without blocking
Resource Efficiency: Memory usage is controlled through incremental processing
Responsive System: Long-running operations don't block other processing
Natural Backpressure: Consumers process stream results at their own pace
Minimal API Surface: Power and flexibility without additional primitives
Clean Isolation: Each processing path maintains its own context and error boundaries
Choosing the Right Concurrency Approach
With multiple approaches to concurrency available in Happen, here's a quick guide to choosing the right one for different scenarios:
Functional Flows
Most use cases
Default approach for all event handling
Async/Await
Asynchronous operations
When operations need to wait for external results
Generator-based Streaming
Incremental processing
For progress updates and chunked processing
Worker Threads
CPU-intensive tasks
When tasks would block the main thread
Distributed Processing
Horizontal scaling
When single machine capacity is exceeded
Key Takeaways
The Event Continuum model naturally supports asynchronous processing through JavaScript's native async/await
Runtime transparency allows direct use of platform-specific concurrency features
Generator-based streaming provides powerful incremental processing with minimal API additions
Internal queue management ensures system stability under high load
This multi-faceted approach delivers flexible asynchronous capabilities by providing multiple concurrency strategies that compose seamlessly.
Last updated