Asynchronous Processing

Asynchronous processing is a cornerstone of Happen's design, allowing the framework to handle concurrent events efficiently while maintaining its commitment to radical simplicity. This document explains how Happen approaches asynchronous processing through multiple strategies: concurrent event handling, direct runtime capabilities, function-based flows, and streaming results.

Core Approach

In Happen, multiple events arriving at a node are processed concurrently without blocking each other, leveraging JavaScript's asynchronous capabilities while preserving event isolation. This happens as an implementation detail rather than an explicit API feature.

When events arrive at a node:

  1. Each event gets its own independent execution context

  2. Events are processed concurrently using JavaScript's natural Promise mechanism

  3. One event processing never blocks another, even within the same node

This approach ensures that slow-running event handlers don't create bottlenecks for unrelated events.

The Functional Asynchronous Model

Happen embraces JavaScript's native asynchronous capabilities while providing a clean, functional interface through the Event Continuum model:

// Register an asynchronous handler
orderNode.on("process-order", async (event, context) => {
  // Perform asynchronous validation
  const validationResult = await validateOrderData(event.payload);
  
  // Store in context
  context.validation = validationResult;
  
  if (!validationResult.valid) {
    return {
      success: false,
      reason: "validation-failed",
      errors: validationResult.errors
    };
  }
  
  // Return the next function to execute
  return processPayment;
});

// Asynchronous function in the flow
async function processPayment(event, context) {
  // Process payment asynchronously
  const paymentResult = await processTransaction(event.payload.payment);
  
  // Store in context
  context.payment = paymentResult;
  
  if (!paymentResult.success) {
    return handlePaymentFailure;
  }
  
  // Return next function
  return createShipment;
}

Happen's event execution engine handles the async nature transparently:

  1. When a function returns a Promise, Happen awaits it automatically

  2. Async functions are fully supported throughout the flow

  3. Error handling works seamlessly with async/await

Automatic Queue Management

For high-volume scenarios, Happen includes internal queue management to prevent overwhelming the system:

// Internal queue management - not exposed to users
class Node {
  // Configuration for concurrent execution
  #concurrencySettings = {
    maxConcurrent: 100, // Maximum concurrent executions
    queueLimit: 10000   // Maximum queued events
  };
  
  #activeCount = 0;     // Currently executing handlers
  #queue = [];          // Queue of pending events
  
  // Process with queue management
  process(event) {
    // If under concurrent limit, execute immediately
    if (this.#activeCount < this.#concurrencySettings.maxConcurrent) {
      return this.executeImmediately(event);
    }
    
    // Otherwise, queue the event
    return this.queueEvent(event);
  }
  
  // Execute immediately
  executeImmediately(event) {
    this.#activeCount++;
    
    // Start the event flow
    return this.executeFlow(event)
      .finally(() => {
        this.#activeCount--;
        this.processNextFromQueue();
      });
  }
  
  // Queue for later execution
  queueEvent(event) {
    // Check queue limits
    if (this.#queue.length >= this.#concurrencySettings.queueLimit) {
      return Promise.reject(new Error("Event queue limit exceeded"));
    }
    
    // Create deferred promise
    let resolve, reject;
    const promise = new Promise((res, rej) => {
      resolve = res;
      reject = rej;
    });
    
    // Add to queue with resolvers
    this.#queue.push({ event, resolve, reject });
    
    return promise;
  }
  
  // Process next from queue
  processNextFromQueue() {
    // If queue empty or at concurrency limit, do nothing
    if (this.#queue.length === 0 ||
        this.#activeCount >= this.#concurrencySettings.maxConcurrent) {
      return;
    }
    
    // Get next event from queue
    const next = this.#queue.shift();
    
    // Process it
    this.executeImmediately(next.event)
      .then(result => next.resolve(result))
      .catch(error => next.reject(error));
  }
}

This queue management:

  1. Limits maximum concurrent event handling to prevent resource exhaustion

  2. Preserves ordered processing within each event type

  3. Provides backpressure for high-volume scenarios

  4. Manages memory usage by controlling queue size

Node Configuration

Happen manages concurrency through the system configuration. This approach keeps infrastructure concerns separated from domain logic and provides consistent behavior across your application.

System-Level Concurrency Configuration

Concurrency settings are defined during framework initialization:

// Initialization with system-wide defaults
const happen = initializeHappen({
  // Core security configuration
  security: {
    // Security settings...
  },
  // System-wide concurrency configuration
  concurrency: {
    // Default settings for all nodes
    default: {
      maxConcurrent: 100,     // Maximum concurrent executions
      queueLimit: 10000,      // Maximum queued events
      queueStrategy: "fifo",  // First-in-first-out processing
      backpressureThreshold: 0.8  // Apply backpressure at 80% capacity
    },
    // Node-specific overrides based on node ID patterns
    patterns: {
      "api-*": {
        maxConcurrent: 200,    // Higher concurrency for API nodes
        queueLimit: 20000      // Larger queue for API nodes
      },
      "worker-*": {
        maxConcurrent: 50,     // Lower concurrency for worker nodes
        queueStrategy: "priority"  // Priority-based queuing
      },
      "realtime-*": {
        maxConcurrent: 300,    // Higher concurrency for realtime nodes
        queueLimit: 5000,      // Smaller queue to avoid stale data
        backpressureThreshold: 0.6  // Earlier backpressure
      }
    }
  },
  transport: {
    // Transport settings...
  }
});

// The initialized framework provides the node creation function
const { createNode } = happen;

This hierarchical approach ensures consistent behavior while allowing for customization where needed.

Generator-Based Streaming

Building on the foundation of asynchronous processing, Happen supports generator-based streaming for incremental data processing.

Instead of introducing separate API methods for streaming, Happen detects generator functions automatically:

// Register a handler that happens to be a generator function
node.on("process-large-dataset", async function* processLargeDataset(event, context) {
  const { dataset } = event.payload;
  
  // Process in chunks
  for (let i = 0; i < dataset.length; i += 100) {
    const chunk = dataset.slice(i, i + 100);
    const result = await processChunk(chunk);
    
    // Yield intermediate results
    yield {
      progress: Math.min(100, Math.round((i + chunk.length) / dataset.length * 100)),
      results: result
    };
  }
  
  // Final result
  return { status: "complete" };
});

When interacting with a generator-based handler, Happen automatically provides an AsyncIterator interface:

// Request handling of a large dataset using the flow mechanism
clientNode.on("process-dataset", async function* handleDatasetProcessing(event, context) {
  // Send the request to the processor node
  const processingEvent = {
    type: "process-large-dataset",
    payload: { dataset: event.payload.dataset }
  };
  
  // Use the send method without chaining
  const processingStream = await clientNode.send(processorNode, processingEvent);
  
  // Process results incrementally using the event continuum
  for await (const update of processingStream) {
    // Log progress
    console.log(`Progress: ${update.progress}%`);
    
    // Display intermediate results
    displayIntermediateResults(update.results);
    
    // Yield progress updates to the caller if needed
    yield {
      status: "processing",
      progress: update.progress,
      resultsCount: update.results.length
    };
  }
  
  // Return final result
  return {
    status: "complete",
    message: "Dataset processing complete"
  };
});

// To use this handler:
const result = await userNode.send(clientNode, {
  type: "process-dataset",
  payload: { 
    dataset: largeDataset 
  }
});

// The result will be the final return value from the handler

Generator functions also work with broadcasting, enabling publish-subscribe patterns for streams:

// Continuous system monitoring with generator
monitorNode.on("system-metrics", async function* generateMetrics() {
  while (true) {
    const metrics = await collectSystemMetrics();
    yield metrics;
    await sleep(5000); // Update every 5 seconds
  }
});

// Start broadcasting metrics
monitorNode.broadcast({ type: "system-metrics" });

// Subscribers receive the stream
dashboardNode.on("system-metrics", async function processMetrics(metricsStream, context) {
  // Iterate through metrics stream
  for await (const metrics of metricsStream) {
    // Update dashboard
    updateDashboard(metrics);
  }
});

Use Cases for Streaming

Generator-based streaming in Happen addresses several key use cases:

1. Large Dataset Processing

Break down large datasets into manageable chunks:

dataProcessorNode.on("process-batch", async function* processBatch(event, context) {
  const { items } = event.payload;
  
  // Process in chunks to avoid memory issues
  for (let i = 0; i < items.length; i += 100) {
    const chunk = items.slice(i, i + 100);
    const results = await processChunk(chunk);
    
    yield {
      progress: Math.round((i + chunk.length) / items.length * 100),
      processedItems: chunk.length,
      results
    };
  }
});

2. Real-time Progress Updates

Provide incremental feedback for long-running operations:

importNode.on("import-file", async function* importFile(event, context) {
  const { filePath } = event.payload;
  const fileSize = await getFileSize(filePath);
  const reader = createFileReader(filePath);
  
  let bytesProcessed = 0;
  
  // Process file in chunks
  while (!reader.eof()) {
    const chunk = await reader.readChunk(1024 * 1024); // 1MB chunks
    const records = parseRecords(chunk);
    await saveRecords(records);
    
    bytesProcessed += chunk.length;
    
    // Yield progress update
    yield {
      progress: Math.round(bytesProcessed / fileSize * 100),
      recordsProcessed: records.length
    };
  }
  
  // Final result
  return { status: "complete", totalRecords: getTotalRecords() };
});

3. Continuous Data Streams

Handle data from sources that produce continuous updates:

sensorNode.on("monitor-sensors", async function* monitorSensors(event, context) {
  const { sensorIds, duration } = event.payload;
  const endTime = Date.now() + duration;
  
  // Connect to sensors
  const sensors = sensorIds.map(id => connectToSensor(id));
  
  try {
    // Process sensor data until duration expires
    while (Date.now() < endTime) {
      // Collect readings from all sensors
      const readings = await Promise.all(
        sensors.map(sensor => sensor.getReading())
      );
      
      // Process this batch of readings
      const analysis = analyzeReadings(readings);
      
      // Emit real-time analysis
      yield {
        timestamp: Date.now(),
        readings,
        analysis
      };
      
      // Wait before next collection
      await sleep(1000); // 1 second interval
    }
  } finally {
    // Clean up resources
    sensors.forEach(sensor => sensor.disconnect());
  }
});

4. Paginated API Results

Fetch and process paginated data from external APIs:

apiNode.on("fetch-all-records", async function* fetchAllRecords(event, context) {
  const { endpoint, pageSize = 100 } = event.payload;
  let page = 1;
  let hasMore = true;
  let totalRecords = 0;
  
  // Fetch pages until complete
  while (hasMore) {
    // Fetch this page
    const response = await fetch(`${endpoint}?page=${page}&size=${pageSize}`);
    const data = await response.json();
    
    // Process records
    processRecords(data.records);
    totalRecords += data.records.length;
    
    // Yield progress
    yield {
      page,
      records: data.records,
      totalRecords,
      hasMore: data.hasMore
    };
    
    // Update for next iteration
    hasMore = data.hasMore;
    page++;
    
    // Respect rate limits
    if (hasMore) {
      await sleep(200); // Small delay between requests
    }
  }
});

Benefits of Happen's Async Processing Model

The combination of concurrent event processing and generator-based streaming delivers several key benefits:

  1. Maximized Throughput: Multiple events process concurrently without blocking

  2. Resource Efficiency: Memory usage is controlled through incremental processing

  3. Responsive System: Long-running operations don't block other processing

  4. Natural Backpressure: Consumers process stream results at their own pace

  5. Minimal API Surface: Power and flexibility without additional primitives

  6. Clean Isolation: Each processing path maintains its own context and error boundaries

Choosing the Right Concurrency Approach

With multiple approaches to concurrency available in Happen, here's a quick guide to choosing the right one for different scenarios:

Approach
Best For
When To Use

Functional Flows

Most use cases

Default approach for all event handling

Async/Await

Asynchronous operations

When operations need to wait for external results

Generator-based Streaming

Incremental processing

For progress updates and chunked processing

Worker Threads

CPU-intensive tasks

When tasks would block the main thread

Distributed Processing

Horizontal scaling

When single machine capacity is exceeded

Key Takeaways

  1. The Event Continuum model naturally supports asynchronous processing through JavaScript's native async/await

  2. Runtime transparency allows direct use of platform-specific concurrency features

  3. Generator-based streaming provides powerful incremental processing with minimal API additions

  4. Internal queue management ensures system stability under high load

This multi-faceted approach delivers flexible asynchronous capabilities by providing multiple concurrency strategies that compose seamlessly.

Last updated