Mastering C++ Cooperative Cancellation: Building Robust Async Systems

January 15, 2025
5 min read
By __init__abs

Table of Contents

This is a list of all the sections in this post. Click on any of them to jump to that section.

index

Introduction

In modern C++ systems, managing the lifecycle of asynchronous operations is critical. Whether you’re building high-performance servers, real-time data pipelines, or responsive GUI applications, you need a way to gracefully stop long-running operations. This is where cooperative cancellation comes in—a pattern that allows tasks to voluntarily check for cancellation requests and clean up properly.

C++20 introduced a powerful set of primitives for cooperative cancellation: std::stop_token, std::stop_source, and std::jthread. These tools revolutionize how we write cancellable code, moving away from brittle flag-based approaches to a standardized, composable solution. In this comprehensive guide, we’ll explore these mechanisms and build production-ready cancellation patterns.

The Problem: Why We Need Cooperative Cancellation

Consider a typical scenario: you’re processing a large dataset, making network requests, or running a background computation. Suddenly, the user wants to cancel the operation, or your system needs to shut down gracefully. Without proper cancellation support, you face several challenges:

  1. Resource Leaks: Threads continue running, holding onto memory, file handles, or network connections
  2. Blocking Shutdowns: Your application hangs during termination, waiting for operations to complete
  3. Wasted CPU Cycles: Cancelled operations continue consuming resources unnecessarily
  4. Poor User Experience: Unresponsive applications that can’t be interrupted

Traditional approaches using atomic flags or condition variables work but lack standardization and composability. C++20’s cooperative cancellation provides a unified, type-safe solution.

Understanding std::stop_token Architecture

The C++20 cancellation mechanism consists of three interconnected components:

Stop Token Architecture
Figure 1 - C++20 Cancellation Architecture

std::stop_source

The stop_source is the control center for cancellation. It maintains the shared cancellation state and provides the ability to request cancellation:

#include <stop_token>
#include <thread>
#include <iostream>
 
void long_running_task(std::stop_token stoken) {
    int iterations = 0;
    
    while (!stoken.stop_requested()) {
        // Simulate work
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        iterations++;
        
        std::cout << "Iteration " << iterations << std::endl;
    }
    
    std::cout << "Task cancelled after " << iterations 
              << " iterations" << std::endl;
}
 
int main() {
    // Create the stop source
    std::stop_source stop_src;
    
    // Launch thread with stop token
    std::jthread worker(long_running_task, stop_src.get_token());
    
    // Let it run for 500ms
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    
    // Request cancellation
    stop_src.request_stop();
    
    // jthread automatically joins on destruction
    return 0;
}

Key properties of std::stop_source:

  • Shared Ownership: Multiple stop tokens can reference the same stop source
  • Thread-Safe: request_stop() can be called from any thread
  • One-Way Operation: Once stopped, the state cannot be reset
  • Copyable: Copies share the same underlying state

std::stop_token

The stop_token is a lightweight, read-only view of the cancellation state. It’s designed to be passed by value to functions that need to check for cancellation:

void process_items(std::stop_token stoken, 
                   const std::vector<int>& items) {
    for (const auto& item : items) {
        // Check before expensive operation
        if (stoken.stop_requested()) {
            std::cout << "Cancellation detected, cleaning up..." 
                      << std::endl;
            return;
        }
        
        // Process item
        expensive_computation(item);
    }
}

Important characteristics:

  • Cheap to Copy: Designed to be passed by value (like string_view)
  • No Ownership: Doesn’t keep the stop source alive
  • Safe to Use: Even if the stop source is destroyed, stop_requested() returns false
  • No Side Effects: Checking doesn’t modify state

std::jthread: The RAII Thread

The std::jthread (joining thread) is a drop-in replacement for std::thread with automatic joining and built-in cancellation support:

#include <iostream>
#include <chrono>
#include <thread>
 
void cancellable_worker(std::stop_token stoken, int id) {
    std::cout << "Worker " << id << " started" << std::endl;
    
    int work_count = 0;
    while (!stoken.stop_requested() && work_count < 10) {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        work_count++;
        std::cout << "Worker " << id << " completed task " 
                  << work_count << std::endl;
    }
    
    if (stoken.stop_requested()) {
        std::cout << "Worker " << id << " cancelled" << std::endl;
    } else {
        std::cout << "Worker " << id << " completed normally" 
                  << std::endl;
    }
}
 
int main() {
    // No need to manually join - RAII handles it
    {
        std::jthread worker1(cancellable_worker, 1);
        std::jthread worker2(cancellable_worker, 2);
        
        // Do some work
        std::this_thread::sleep_for(std::chrono::seconds(1));
        
        // Request stop on worker1
        worker1.request_stop();
        
        // Both threads automatically joined when going out of scope
    }
    
    std::cout << "All workers finished" << std::endl;
    return 0;
}

Benefits over std::thread:

  • Automatic Joining: No need for explicit join() or detach()
  • Built-in Stop Source: Each jthread has its own stop source
  • Exception Safe: Properly joins even if exceptions occur
  • Interruptible: Can request stop via request_stop() method

Advanced Patterns: Cancellation Callbacks

One of the most powerful features is std::stop_callback - a mechanism to execute cleanup code when cancellation is requested:

Stop Callback Execution Flow
Figure 2 - Callback Execution Model
#include <stop_token>
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
 
class NetworkConnection {
private:
    std::atomic<bool> is_connected{true};
    
public:
    void close() {
        if (is_connected.exchange(false)) {
            std::cout << "Closing network connection..." << std::endl;
            // Actual cleanup logic
        }
    }
    
    bool is_open() const { return is_connected; }
};
 
void network_operation(std::stop_token stoken) {
    NetworkConnection conn;
    
    // Register callback for automatic cleanup
    std::stop_callback cleanup(stoken, [&conn]() {
        std::cout << "Cancellation requested, cleaning up..." 
                  << std::endl;
        conn.close();
    });
    
    // Simulate network operations
    int packet_count = 0;
    while (conn.is_open() && !stoken.stop_requested()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        packet_count++;
        std::cout << "Sent packet " << packet_count << std::endl;
    }
    
    std::cout << "Operation completed. Total packets: " 
              << packet_count << std::endl;
}
 
int main() {
    std::jthread worker(network_operation);
    
    // Let it run briefly
    std::this_thread::sleep_for(std::chrono::milliseconds(350));
    
    // Request stop - callback will be invoked
    worker.request_stop();
    
    return 0;
}

Critical Callback Behaviors

1. Synchronous Execution

Callbacks execute synchronously in the thread that calls request_stop():

std::stop_source source;
 
std::stop_callback cb(source.get_token(), []() {
    std::cout << "Callback executing in thread: " 
              << std::this_thread::get_id() << std::endl;
});
 
std::cout << "Main thread: " << std::this_thread::get_id() << std::endl;
 
// Callback runs in this thread
source.request_stop();  
// All callbacks have completed when this returns

2. Registration Race Conditions

If stop is already requested when registering a callback, it executes immediately:

std::stop_source source;
source.request_stop();  // Stop already requested
 
// Callback executes immediately in this thread
std::stop_callback cb(source.get_token(), []() {
    std::cout << "Executed immediately!" << std::endl;
});

3. Multiple Callbacks

Callbacks are executed in reverse order of registration:

std::stop_source source;
 
std::stop_callback cb1(source.get_token(), []() {
    std::cout << "First registered" << std::endl;
});
 
std::stop_callback cb2(source.get_token(), []() {
    std::cout << "Second registered" << std::endl;
});
 
source.request_stop();
// Output:
// Second registered
// First registered

Production Pattern: Hierarchical Cancellation

In real-world systems, you often need to cancel a hierarchy of operations. Here’s a robust pattern:

#include <stop_token>
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>
 
class TaskCoordinator {
private:
    std::stop_source master_source_;
    std::vector<std::jthread> workers_;
    
public:
    void start_subtask(int id, std::stop_token parent_token) {
        workers_.emplace_back([this, id, parent_token]
                               (std::stop_token worker_token) {
            // Forward parent cancellation to worker
            std::stop_callback forward_cancel(parent_token, [this]() {
                master_source_.request_stop();
            });
            
            // Use combined token for actual work
            int count = 0;
            while (!worker_token.stop_requested() && count < 5) {
                std::this_thread::sleep_for(
                    std::chrono::milliseconds(200));
                count++;
                std::cout << "Subtask " << id << " iteration " 
                          << count << std::endl;
            }
            
            std::cout << "Subtask " << id << " finished" << std::endl;
        });
    }
    
    void request_stop_all() {
        master_source_.request_stop();
    }
    
    std::stop_token get_token() {
        return master_source_.get_token();
    }
    
    ~TaskCoordinator() {
        request_stop_all();
        // All jthreads automatically joined
    }
};
 
void coordinated_operation(std::stop_token parent_token) {
    TaskCoordinator coordinator;
    
    // Launch multiple subtasks
    coordinator.start_subtask(1, parent_token);
    coordinator.start_subtask(2, parent_token);
    coordinator.start_subtask(3, parent_token);
    
    // Wait or do other work
    std::this_thread::sleep_for(std::chrono::milliseconds(600));
    
    // Coordinator automatically stops all on destruction
}
 
int main() {
    std::jthread main_operation(coordinated_operation);
    
    // Let it run for a bit
    std::this_thread::sleep_for(std::chrono::milliseconds(400));
    
    // Cancel the entire hierarchy
    main_operation.request_stop();
    
    return 0;
}

Integrating with Condition Variables

Cooperative cancellation works beautifully with condition variables for wait operations:

#include <stop_token>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <iostream>
#include <thread>
 
template<typename T>
class CancellableQueue {
private:
    std::mutex mutex_;
    std::condition_variable_any cv_;
    std::queue<T> queue_;
    
public:
    // Returns false if cancelled, true if item retrieved
    bool wait_and_pop(T& result, std::stop_token stoken) {
        std::unique_lock lock(mutex_);
        
        // Wait with cancellation support
        bool cancelled = !cv_.wait(lock, stoken, [this] { 
            return !queue_.empty(); 
        });
        
        if (cancelled) {
            return false;  // Cancelled
        }
        
        result = std::move(queue_.front());
        queue_.pop();
        return true;
    }
    
    void push(T value) {
        {
            std::lock_guard lock(mutex_);
            queue_.push(std::move(value));
        }
        cv_.notify_one();
    }
};
 
void consumer(CancellableQueue<int>& queue, std::stop_token stoken) {
    std::cout << "Consumer started" << std::endl;
    
    while (true) {
        int value;
        if (!queue.wait_and_pop(value, stoken)) {
            std::cout << "Consumer cancelled" << std::endl;
            break;
        }
        
        std::cout << "Consumed: " << value << std::endl;
    }
}
 
int main() {
    CancellableQueue<int> queue;
    
    std::jthread consumer_thread(consumer, std::ref(queue));
    
    // Produce some items
    for (int i = 0; i < 3; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        queue.push(i);
    }
    
    // Cancel consumer
    consumer_thread.request_stop();
    
    return 0;
}

Performance Considerations

Understanding the performance characteristics is crucial for production code:

Operation Costs

OperationTypical CostNotes
stop_requested()~5-10 nanosecondsSingle atomic load - extremely fast
Empty token check~1-2 nanosecondsNull pointer check - negligible
Callback registration~20-50 nanosecondsLock + linked list insertion
request_stop() with callbacksVariesDepends on callback cost - can block!

Checking Cost

// Benchmark: stop_requested() check
void benchmark_check(std::stop_token stoken, int iterations) {
    auto start = std::chrono::high_resolution_clock::now();
    
    for (int i = 0; i < iterations; ++i) {
        volatile bool result = stoken.stop_requested();
    }
    
    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>
                    (end - start);
    
    std::cout << "Average check time: " 
              << duration.count() / iterations << " ns" << std::endl;
}

Typical Performance:

  • stop_requested(): ~5-10 nanoseconds (atomic load)
  • Empty token (no stop source): ~1-2 nanoseconds (null check)
  • Callback registration: ~20-50 nanoseconds

Error Handling and Edge Cases

Handling Stop Source Destruction

std::stop_token orphaned_token;
 
{
    std::stop_source source;
    orphaned_token = source.get_token();
    // source destroyed here
}
 
// Safe to use - returns false
bool stopped = orphaned_token.stop_requested();  // false

Race Conditions with Shared State

class SafeResource {
private:
    std::mutex mutex_;
    bool is_valid_ = true;
    
public:
    void use(std::stop_token stoken) {
        std::unique_lock lock(mutex_);
        
        if (!is_valid_) return;
        
        // Check after acquiring lock
        if (stoken.stop_requested()) {
            cleanup();
            return;
        }
        
        // Use resource safely
        do_work();
    }
    
    void cleanup() {
        // Lock already held
        is_valid_ = false;
        // Release resources
    }
};

Real-World Example: HTTP Server Graceful Shutdown

Here’s a complete example of a cancellable HTTP server worker:

#include <stop_token>
#include <thread>
#include <iostream>
#include <chrono>
#include <atomic>
#include <vector>
 
class HTTPWorker {
private:
    int worker_id_;
    std::atomic<int> requests_processed_{0};
    
    void process_request(int request_id) {
        // Simulate request processing
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        requests_processed_++;
        std::cout << "Worker " << worker_id_ 
                  << " processed request " << request_id << std::endl;
    }
    
public:
    explicit HTTPWorker(int id) : worker_id_(id) {}
    
    void run(std::stop_token stoken) {
        std::cout << "Worker " << worker_id_ << " started" << std::endl;
        
        // Register shutdown callback
        std::stop_callback on_shutdown(stoken, [this]() {
            std::cout << "Worker " << worker_id_ 
                      << " shutting down gracefully..." << std::endl;
        });
        
        int request_id = 0;
        while (!stoken.stop_requested()) {
            // Simulate accepting request
            request_id++;
            process_request(request_id);
            
            // Small delay between requests
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
        
        std::cout << "Worker " << worker_id_ 
                  << " stopped. Total requests: " 
                  << requests_processed_ << std::endl;
    }
    
    int get_requests_processed() const {
        return requests_processed_;
    }
};
 
class HTTPServer {
private:
    std::vector<std::jthread> workers_;
    std::vector<std::unique_ptr<HTTPWorker>> worker_objects_;
    
public:
    void start(int num_workers) {
        std::cout << "Starting HTTP server with " << num_workers 
                  << " workers..." << std::endl;
        
        for (int i = 0; i < num_workers; ++i) {
            auto worker = std::make_unique<HTTPWorker>(i);
            auto* worker_ptr = worker.get();
            worker_objects_.push_back(std::move(worker));
            
            workers_.emplace_back([worker_ptr](std::stop_token stoken) {
                worker_ptr->run(stoken);
            });
        }
    }
    
    void shutdown() {
        std::cout << "\nInitiating graceful shutdown..." << std::endl;
        
        // Request stop on all workers
        for (auto& worker : workers_) {
            worker.request_stop();
        }
        
        // jthreads automatically join
    }
    
    void print_statistics() {
        std::cout << "\n=== Server Statistics ===" << std::endl;
        int total = 0;
        for (size_t i = 0; i < worker_objects_.size(); ++i) {
            int processed = worker_objects_[i]->get_requests_processed();
            std::cout << "Worker " << i << ": " << processed 
                      << " requests" << std::endl;
            total += processed;
        }
        std::cout << "Total: " << total << " requests" << std::endl;
    }
    
    ~HTTPServer() {
        shutdown();
    }
};
 
int main() {
    HTTPServer server;
    server.start(4);  // 4 worker threads
    
    // Let server run for 2 seconds
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    // Graceful shutdown
    server.shutdown();
    
    server.print_statistics();
    
    return 0;
}

Key Takeaways

Understanding cooperative cancellation in C++ is essential for building robust systems:

  • Standardization: C++20 provides a unified, composable cancellation mechanism
  • Thread Safety: All components are thread-safe and exception-safe
  • RAII Integration: Works seamlessly with std::jthread and modern C++ patterns
  • Performance: Minimal overhead when designed correctly
  • Composability: Easily integrate with condition variables, futures, and custom primitives
  • Graceful Shutdown: Enable proper resource cleanup and responsive applications

Best Practices Summary

  1. Always check cancellation in long-running loops
  2. Pass stop_token by value - it’s designed for it
  3. Keep callbacks fast - they block request_stop()
  4. Use hierarchical patterns for complex systems
  5. Combine with RAII for automatic resource management
  6. Check after acquiring locks to avoid races
  7. Balance check frequency with performance needs

Conclusion

Cooperative cancellation transforms how we build asynchronous C++ systems. By leveraging std::stop_token, std::stop_source, and std::jthread, we can create responsive, maintainable code that handles cancellation gracefully. These patterns scale from simple background tasks to complex distributed systems, providing a solid foundation for production-grade C++ applications.

As you integrate these patterns into your codebase, remember that cooperative cancellation is about collaboration: the canceller requests, and the task cooperates. This contract, when respected by both parties, creates reliable systems that can start, stop, and clean up resources predictably—exactly what production systems demand.

Further Reading


Have questions or insights about cooperative cancellation? I’d love to hear your experiences implementing these patterns in production systems. Feel free to reach out!