Post

Why Async Code Can Be Slower: Common Pitfalls and Performance Solutions

🤔 Curiosity: Why Is My Async Code Slower Than Synchronous Code?

You’ve refactored your Python code to use

1
async/await
, expecting significant performance improvements. But instead of getting faster, your application is actually slower. What went wrong?

Curiosity: If async code is supposed to be faster, why does it sometimes perform worse than synchronous code? And what are the hidden pitfalls that make async code slower?

The reality: Async code doesn’t automatically make everything faster. In fact, misusing asyncio can introduce overhead, blocking operations, and performance bottlenecks that make your code slower than the synchronous version.

As someone who’s debugged production async applications, I’ve seen this pattern repeatedly: developers assume

1
async/await
is a magic performance button, but without understanding the underlying mechanics, they create code that’s slower, harder to debug, and more resource-intensive.

The question: What are the specific mistakes that make async code slower, and how do we fix them?


📚 Retrieve: Understanding Async Performance Fundamentals

Core Concepts: Event Loop, Coroutines, and Futures

Before diving into performance issues, let’s understand the building blocks of asyncio:

graph TB
    A[asyncio.run] --> B[Event Loop]
    B --> C[Coroutines]
    C --> D[Tasks]
    D --> E[Futures]

    B --> F[Scheduler]
    F --> G[I/O Operations]
    F --> H[CPU Operations]

    style B fill:#4ecdc4,stroke:#0a9396,stroke-width:3px,color:#fff
    style C fill:#ff6b6b,stroke:#c92a2a,stroke-width:2px,color:#fff
    style F fill:#ffe66d,stroke:#f4a261,stroke-width:2px,color:#000

1. Event Loop

The Event Loop is the core orchestrator of all asynchronous operations in an asyncio application. When you call

1
asyncio.run()
, a single-threaded event loop starts and schedules all coroutines and tasks.

Key Characteristics:

  • Single-threaded execution
  • Cooperative multitasking (tasks yield control voluntarily)
  • I/O-bound operations are non-blocking
  • CPU-bound operations block the entire loop

2. Coroutines

Functions defined with

1
async def
are coroutines. When a coroutine encounters
1
await
, it pauses execution and returns control to the event loop, allowing other tasks to run.

Example:

1
2
3
4
async def fetch_data():
    # This pauses and yields control
    response = await http_client.get('https://api.example.com/data')
    return response.json()

3. Futures

Futures are low-level objects that represent the eventual result of an asynchronous operation. When a coroutine completes, its result is stored in a Future.

The Five Major Performance Pitfalls

1. Sequential Awaits Causing Bottlenecks

The Problem:

When you await multiple coroutines sequentially, each operation waits for the previous one to complete, eliminating any performance benefit from async.

1
2
3
4
5
6
7
8
9
10
11
# ❌ BAD: Sequential execution
async def get_user_dashboard():
    notifications = await get_user_notifications()  # Wait 100ms
    activity = await get_recent_activity()          # Wait 150ms
    messages = await get_unread_messages()          # Wait 200ms
    return {
        'notifications': notifications,
        'activity': activity,
        'messages': messages
    }
# Total time: 450ms (sequential)

Why It’s Slow:

sequenceDiagram
    participant Client
    participant EventLoop
    participant API1
    participant API2
    participant API3

    Client->>EventLoop: await get_user_notifications()
    EventLoop->>API1: Request
    API1-->>EventLoop: Response (100ms)
    EventLoop-->>Client: Result

    Client->>EventLoop: await get_recent_activity()
    EventLoop->>API2: Request
    API2-->>EventLoop: Response (150ms)
    EventLoop-->>Client: Result

    Client->>EventLoop: await get_unread_messages()
    EventLoop->>API3: Request
    API3-->>EventLoop: Response (200ms)
    EventLoop-->>Client: Result

    Note over Client,API3: Total: 450ms

The Solution: Parallel Execution

1
2
3
4
5
6
7
8
9
10
11
12
13
# ✅ GOOD: Parallel execution
async def get_user_dashboard():
    notifications, activity, messages = await asyncio.gather(
        get_user_notifications(),    # All three start
        get_recent_activity(),        # simultaneously
        get_unread_messages()         # and run in parallel
    )
    return {
        'notifications': notifications,
        'activity': activity,
        'messages': messages
    }
# Total time: 200ms (longest operation)

Performance Improvement:

ApproachExecution TimeImprovement
Sequential450msBaseline
Parallel (gather)200ms55% faster

2. Using Synchronous (Blocking) Libraries

The Problem:

Using blocking libraries inside async code stops the entire event loop, making all other tasks wait.

Common Blocking Libraries:

LibraryBlocking OperationAsync Alternative
1
requests
HTTP requests
1
aiohttp
,
1
httpx
1
pathlib
, file I/O
File operations
1
aiofiles
1
time.sleep()
Sleep
1
asyncio.sleep()
1
sqlite3
Database queries
1
aiosqlite
1
urllib
URL operations
1
aiohttp

Example:

1
2
3
4
5
6
7
8
9
10
11
# ❌ BAD: Blocking library in async code
async def fetch_multiple_urls(urls):
    results = []
    for url in urls:
        # This blocks the entire event loop!
        response = requests.get(url)  # Synchronous, blocking
        results.append(response.json())
    return results

# Even with gather, this runs sequentially because requests blocks
await asyncio.gather(*[fetch_url(url) for url in urls])

Why It’s Slow:

graph TB
    A[Event Loop] --> B[Task 1: requests.get]
    B --> C[Blocks Event Loop]
    C --> D[All Other Tasks Wait]
    D --> E[Task 2: requests.get]
    E --> C

    style C fill:#ff6b6b,stroke:#c92a2a,stroke-width:3px,color:#fff
    style D fill:#ffe66d,stroke:#f4a261,stroke-width:2px,color:#000

The Solution: Use Async Libraries

1
2
3
4
5
6
7
8
9
10
11
12
# ✅ GOOD: Async library
import aiohttp

async def fetch_multiple_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

3. CPU-Bound Tasks Blocking the Event Loop

The Problem:

Asyncio is designed for I/O-bound operations. CPU-intensive tasks block the single-threaded event loop, preventing other tasks from running.

Example:

1
2
3
4
5
6
7
8
9
10
11
12
# ❌ BAD: CPU-bound work blocks event loop
async def process_data(data_list):
    results = []
    for data in data_list:
        # This blocks the event loop!
        result = heavy_computation(data)  # CPU-bound, blocking
        results.append(result)
    return results

def heavy_computation(data):
    # CPU-intensive work
    return sum(x**2 for x in range(1000000))

Why It’s Slow:

graph LR
    A[Event Loop] --> B[CPU Task 1]
    B --> C[Blocks 500ms]
    C --> D[CPU Task 2]
    D --> E[Blocks 500ms]
    E --> F[I/O Tasks Wait]

    style C fill:#ff6b6b,stroke:#c92a2a,stroke-width:3px,color:#fff
    style E fill:#ff6b6b,stroke:#c92a2a,stroke-width:3px,color:#fff
    style F fill:#ffe66d,stroke:#f4a261,stroke-width:2px,color:#000

The Solution: Use Executors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ✅ GOOD: CPU work in executor
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def process_data(data_list):
    loop = asyncio.get_running_loop()

    # Run CPU-bound work in thread pool
    tasks = [
        loop.run_in_executor(None, heavy_computation, data)
        for data in data_list
    ]

    results = await asyncio.gather(*tasks)
    return results

def heavy_computation(data):
    # CPU-intensive work (runs in separate thread)
    return sum(x**2 for x in range(1000000))

Performance Comparison:

ApproachExecution ModelEvent Loop Blocked?
Direct callSingle thread✅ Yes (blocks everything)
ThreadPoolExecutorMultiple threads❌ No (I/O tasks continue)
ProcessPoolExecutorMultiple processes❌ No (for CPU-heavy work)

4. Awaiting Non-Critical Tasks

The Problem:

Awaiting low-priority tasks (like logging) delays the main response.

Example:

1
2
3
4
5
6
7
8
# ❌ BAD: Logging delays response
async def handle_user_request(user_id):
    profile = await get_user_profile(user_id)

    # This delays the response!
    await send_logs_to_external_service(profile)  # 50ms delay

    return profile  # User waits extra 50ms

The Solution: Background Tasks

1
2
3
4
5
6
7
8
# ✅ GOOD: Fire and forget for non-critical work
async def handle_user_request(user_id):
    profile = await get_user_profile(user_id)

    # Don't await - run in background
    asyncio.create_task(send_logs_to_external_service(profile))

    return profile  # Immediate response

When to Use Background Tasks:

Task TypePriorityShould Await?
User response dataHigh✅ Yes
LoggingLow❌ No (use create_task)
AnalyticsLow❌ No (use create_task)
Cache updatesMedium⚠️ Depends on requirements
Email notificationsLow❌ No (use create_task)

5. Excessive Task Creation

The Problem:

Creating too many small tasks causes context switching overhead.

Example:

1
2
3
4
5
# ❌ BAD: Too many small tasks
async def fetch_all_items(item_ids):
    tasks = [fetch_item(item_id) for item_id in item_ids]  # 10,000 tasks!
    results = await asyncio.gather(*tasks)
    return results

Why It’s Slow:

  • Context switching overhead
  • Memory usage for task objects
  • Scheduler overhead

The Solution: Batching and Semaphores

1
2
3
4
5
6
7
8
9
10
11
# ✅ GOOD: Limit concurrency with Semaphore
async def fetch_all_items(item_ids, max_concurrent=10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_limit(item_id):
        async with semaphore:
            return await fetch_item(item_id)

    tasks = [fetch_with_limit(item_id) for item_id in item_ids]
    results = await asyncio.gather(*tasks)
    return results

Alternative: Batching

1
2
3
4
5
6
7
8
# ✅ GOOD: Process in batches
async def fetch_all_items(item_ids, batch_size=100):
    results = []
    for i in range(0, len(item_ids), batch_size):
        batch = item_ids[i:i + batch_size]
        batch_results = await asyncio.gather(*[fetch_item(id) for id in batch])
        results.extend(batch_results)
    return results

Parallel Execution Tools Comparison

ToolUse CaseProsCons
asyncio.gatherMultiple independent tasksSimple, intuitiveFails fast (one exception cancels all)
asyncio.create_taskBackground tasksIndividual controlManual management required
asyncio.TaskGroup (Python 3.11+)Structured concurrencySafe, automatic cleanupPython 3.11+ only

asyncio.gather Example:

1
2
3
4
5
6
7
# All tasks run in parallel
results = await asyncio.gather(
    task1(),
    task2(),
    task3()
)
# If any task raises, all are cancelled

asyncio.create_task Example:

1
2
3
4
5
6
7
8
9
10
# Create tasks individually
task1 = asyncio.create_task(coro1())
task2 = asyncio.create_task(coro2())

# Can cancel individually
task1.cancel()

# Must await manually
result1 = await task1
result2 = await task2

asyncio.TaskGroup Example (Python 3.11+):

1
2
3
4
5
6
7
8
# Structured concurrency - automatic cleanup
async with asyncio.TaskGroup() as tg:
    task1 = tg.create_task(coro1())
    task2 = tg.create_task(coro2())
    task3 = tg.create_task(coro3())

# All tasks automatically awaited when exiting block
# If any task fails, all others are cancelled

💡 Innovation: Debugging and Performance Analysis

Debugging Tools

asyncio Debug Mode

Enable debug mode to identify common issues:

1
2
# Enable debug mode
asyncio.run(main(), debug=True)

What Debug Mode Detects:

  • Tasks that were created but never awaited
  • Coroutines that were never scheduled
  • Slow callbacks (> 100ms)
  • Event loop blocking operations

Example Output:

1
2
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<slow_operation()>>

Performance Profiling

Using cProfile with asyncio:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import cProfile
import pstats
import asyncio

async def main():
    # Your async code here
    pass

# Profile async code
profiler = cProfile.Profile()
profiler.enable()
asyncio.run(main())
profiler.disable()

stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)  # Top 20 functions

Using asyncio’s Built-in Timing:

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import time

async def measure_time(coro):
    start = time.time()
    result = await coro
    elapsed = time.time() - start
    print(f"Execution time: {elapsed:.2f}s")
    return result

# Usage
result = await measure_time(my_async_function())

Choosing the Right Concurrency Model

Decision Tree:

graph TB
    A[Need Concurrency?] --> B{I/O Bound?}
    B -->|Yes| C[Use asyncio]
    B -->|No| D{CPU Bound?}
    D -->|Yes| E{Heavy Computation?}
    E -->|Yes| F[Use ProcessPoolExecutor]
    E -->|No| G[Use ThreadPoolExecutor]
    D -->|Mixed| H[Hybrid: asyncio + Executors]

    C --> I[Single Thread<br/>Event Loop]
    F --> J[Multiple Processes<br/>True Parallelism]
    G --> K[Multiple Threads<br/>GIL Limited]
    H --> L[asyncio + run_in_executor]

    style C fill:#4ecdc4,stroke:#0a9396,stroke-width:2px,color:#fff
    style F fill:#ff6b6b,stroke:#c92a2a,stroke-width:2px,color:#fff
    style G fill:#ffe66d,stroke:#f4a261,stroke-width:2px,color:#000

Comparison Table:

ModelBest ForThreadsProcessesGIL Impact
asyncioI/O-bound tasks11N/A (single thread)
ThreadsI/O with some CPUMultiple1Limited by GIL
ProcessesCPU-bound tasks1 per processMultipleNo GIL (separate processes)

When to Use Each:

Use asyncio when:

  • ✅ Network I/O (HTTP, WebSockets, databases)
  • ✅ File I/O (with aiofiles)
  • ✅ Many concurrent connections
  • ✅ Single-threaded execution is acceptable

Use Threads when:

  • ✅ I/O-bound with some CPU work
  • ✅ Need to call blocking libraries
  • ✅ GUI applications (keep UI responsive)

Use Processes when:

  • ✅ CPU-intensive computations
  • ✅ Need true parallelism
  • ✅ Can tolerate process overhead

Real-World Performance Optimization Example

Before (Slow):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ❌ Multiple performance issues
async def generate_user_report(user_id):
    # Sequential awaits
    profile = await get_user_profile(user_id)  # 100ms
    orders = await get_user_orders(user_id)    # 150ms
    reviews = await get_user_reviews(user_id)  # 120ms

    # Blocking library
    data = requests.get(f'https://api.example.com/analytics/{user_id}').json()

    # CPU-bound work blocking event loop
    processed_data = heavy_data_processing(data)  # 200ms

    # Unnecessary await
    await log_report_generation(user_id)  # 50ms

    return {
        'profile': profile,
        'orders': orders,
        'reviews': reviews,
        'analytics': processed_data
    }
# Total: ~620ms + blocking time

After (Optimized):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# ✅ Optimized version
import aiohttp
from concurrent.futures import ThreadPoolExecutor

async def generate_user_report(user_id):
    loop = asyncio.get_running_loop()

    # Parallel data fetching
    profile, orders, reviews = await asyncio.gather(
        get_user_profile(user_id),
        get_user_orders(user_id),
        get_user_reviews(user_id)
    )

    # Async HTTP client
    async with aiohttp.ClientSession() as session:
        async with session.get(f'https://api.example.com/analytics/{user_id}') as resp:
            data = await resp.json()

    # CPU work in executor (non-blocking)
    processed_data = await loop.run_in_executor(
        None, heavy_data_processing, data
    )

    # Background logging (don't await)
    asyncio.create_task(log_report_generation(user_id))

    return {
        'profile': profile,
        'orders': orders,
        'reviews': reviews,
        'analytics': processed_data
    }
# Total: ~200ms (longest parallel operation) + executor time (non-blocking)

Performance Improvement:

MetricBeforeAfterImprovement
Sequential I/O370ms150ms (parallel)59% faster
Blocking HTTPBlocks event loopNon-blockingNo blocking
CPU ProcessingBlocks 200msNon-blockingEvent loop free
Logging Delay50ms0ms (background)Immediate response

Video Tutorial


🎯 Key Takeaways

InsightImplicationAction Item
Async doesn’t mean automatic speedUnderstanding is crucialLearn event loop mechanics
Sequential awaits kill performanceAlways parallelize independent operationsUse
1
gather
or
1
TaskGroup
Blocking libraries break asyncUse async alternativesReplace
1
requests
1
aiohttp
CPU work blocks the event loopUse executors for CPU tasks
1
run_in_executor()
Don’t await non-critical tasksFire and forget for logging/analyticsUse
1
create_task()

Why This Matters

Understanding async performance pitfalls is critical because:

  1. False Expectations: Developers assume async = fast, leading to disappointment
  2. Production Issues: Slow async code causes real user-facing problems
  3. Resource Waste: Inefficient async code uses more resources than sync code
  4. Debugging Difficulty: Async bugs are harder to diagnose without proper tools

The Challenge: Async code requires understanding the event loop, I/O vs CPU operations, and when to use different concurrency models. But with the right knowledge, async code can deliver significant performance improvements.


🤔 New Questions This Raises

  1. How do we measure async performance in production? What metrics matter most?

  2. When should we use asyncio vs threads vs processes? What’s the decision framework?

  3. How do we debug async performance issues? What tools and techniques work best?

  4. What’s the overhead of asyncio itself? When does the overhead outweigh benefits?

  5. How do we handle async in mixed codebases? What’s the migration strategy?

Next experiment: Build a benchmark comparing sequential vs parallel async operations, measure the performance difference, and identify the break-even point where async overhead is justified.


References

Original Article:

Video Tutorial:

Python asyncio:

Async Libraries:

Performance Optimization:

Concurrency Models:

Debugging Tools:

Best Practices:

Related Topics:

Community Resources:

This post is licensed under CC BY 4.0 by the author.