async-cache

May 28, 2026 · View on GitHub

.. image:: https://img.shields.io/pypi/v/async-cache.svg :target: https://pypi.python.org/pypi/async-cache .. image:: https://img.shields.io/readthedocs/async-cache/latest.svg :target: https://async-cache.readthedocs.io/en/latest/ .. image:: https://www.codetriage.com/iamsinghrajat/async-cache/badges/users.svg :target: https://pypi.python.org/pypi/async-cache .. image:: https://static.pepy.tech/personalized-badge/async-cache?period=total&units=international_system&left_color=black&right_color=blue&left_text=Downloads :target: https://pepy.tech/project/async-cache

Production-ready asyncio cache with thundering herd protection, batch loading, disk persistence, distributed Redis caching, and comprehensive metrics.

Installation

.. code-block:: shell

pip install async-cache

Requires Python 3.8+. No external dependencies.

See full documentation at https://async-cache.readthedocs.io/

Core Usage: Function API for Microservices

Use AsyncCache for flexible caching:

.. code-block:: python

from cache import AsyncCache

cache = AsyncCache(maxsize=1000, default_ttl=300)  # TTL in seconds

async def get_data(key):
    return await cache.get(
        key,
        loader=lambda: db_query(key),  # auto-caches on miss
    )

# Warmup hot keys at startup
await cache.warmup({"hot:key": lambda: preload_hot()})

# Metrics for observability
print(cache.get_metrics())  # hits, misses, size, hit_rate

Key Features & Examples

Thundering Herd Protection Prevents duplicate work under concurrent load (e.g., popular keys). Without it, 100 misses = 100 DB hits; with it, = 1.

.. code-block:: python

    cache = AsyncCache()
    async def loader(): 
        return await db_query()  # expensive
    # 100 concurrent -> 1 loader call
    results = await asyncio.gather(*[cache.get('key', loader=loader) for _ in range(100)])

DataLoader-Style Batching Groups concurrent gets into one batch call (reduces DB load; configurable window/size).

.. code-block:: python

    async def batch_loader(keys):
        # one DB query for batch
        return {k: await db_batch_query(k) for k in keys}
    # auto-groups within 5ms window
    await asyncio.gather(
        cache.get(1, batch_loader=batch_loader),
        cache.get(2, batch_loader=batch_loader)
    )

Cache Warmup Preload at startup to avoid cold misses.

.. code-block:: python

    await cache.warmup({
        "user:1": lambda: load_user(1),
        "config:global": lambda: load_config(),
    })

Metrics Observability for hit rate, size, etc. (global or per-function).

.. code-block:: python

    metrics = cache.get_metrics()  # or func.get_metrics()
    # {'hits': 950, 'misses': 50, 'size': 200, 'hit_rate': 0.95}
    # Use for Prometheus/monitoring

TTL & Invalidation Per-key control + size-based eviction.

.. code-block:: python

    await cache.set('key', value, ttl=60)  # override
    await cache.delete('key')  # or func.invalidate_cache(args)
    cache.clear()

Decorator Convenience

For simple/readable code (uses core API under the hood):

.. code-block:: python

from cache import AsyncLRU, AsyncTTL

@AsyncLRU(maxsize=128)
async def func(*args):
    ...

@AsyncTTL(time_to_live=60, skip_args=1)  # e.g. skip 'self'
async def method(self, arg):
    ...

Distributed Redis Cache (NEW)

Share cached data across multiple application instances with the built-in pure-Python Redis client — no external dependencies required:

.. code-block:: python

from cache import AsyncCache, RedisBackend

redis = RedisBackend(host="localhost", port=6379)
cache = AsyncCache(maxsize=1000, default_ttl=300, remote_cache=redis)

async def get_user(user_id):
    return await cache.get(
        f"user:{user_id}",
        loader=lambda: fetch_from_database(user_id),
    )

# Read: L1 (local memory) → L2 (Redis) → loader
# Write: L1 sync + L2 async (background, non-blocking)
# If Redis is down: degrades to L1 only (app never crashes)

Works with decorators:

.. code-block:: python

from cache import AsyncLRU, AsyncTTL, RedisBackend

redis = RedisBackend(host="redis.example.com", port=6379)

@AsyncLRU(maxsize=128, remote_cache=redis)
async def get_product(product_id):
    return await db.query_product(product_id)

@AsyncTTL(time_to_live=60, remote_cache=redis)
async def get_session(session_id):
    return await db.get_session(session_id)

Install with: pip install async-cache[redis]

Disk Persistence

Survive process restarts with zero new dependencies:

.. code-block:: python

from cache import AsyncCache, DiskBackend

backup = DiskBackend("/tmp/my_cache.pkl")
cache = AsyncCache(maxsize=10000, default_ttl=3600, backup=backup)
# Cache loads from disk on init

# On shutdown, persist to disk
cache.save_to_backup()

Works with decorators too:

.. code-block:: python

from cache import AsyncLRU, DiskBackend

@AsyncLRU(maxsize=1000, backup=DiskBackend("/tmp/products.pkl"))
async def get_product(product_id):
    return await db.query_product(product_id)

# On shutdown
get_product.save_to_backup()

Custom backends can be built by subclassing CacheBackend (disk) or RemoteCacheBackend (remote/async).

Invalidation Decorators (NEW)

Tie write/mutation functions to automatically invalidate cached reads. Invalidation happens before the mutation executes, so stale data is never served even if the mutation raises:

.. code-block:: python

from cache import AsyncLRU, AsyncLRUInvalidator

@AsyncLRU(maxsize=128)
async def get_user(user_id):
    return await db.get_user(user_id)

@AsyncLRUInvalidator(get_user)
async def update_user(user_id, data):
    await db.update_user(user_id, data)

await get_user(1)              # miss -> loads
await get_user(1)              # hit
await update_user(1, {"name": "Alice"})  # invalidates get_user(1), then runs mutation
await get_user(1)              # miss -> reloads

Also available: AsyncTTLInvalidator for @AsyncTTL cached functions.

When the mutation's args don't match the cached function's args, use key_fn:

.. code-block:: python

@AsyncLRUInvalidator(get_user, key_fn=lambda args, kw: (args[:1], {}))
async def update_user_profile(user_id, name, email):
    await db.update_profile(user_id, name=name, email=email)

.. note::

skip_args has been removed from invalidators to prevent silent key mismatches. Use key_fn instead (see docs for migration guide).

Benchmarks

Run the built-in benchmark suite:

.. code-block:: shell

python benchmarks/run.py

Sample results (Python 3.12, 10K iterations):

  • Hit latency: ~0.8µs median (>1M ops/s)
  • Miss latency: ~3.2µs median
  • Thundering herd: 100 coroutines, 1 loader call (verified)
  • Batch throughput: ~60K keys/s

Supports comparison with aiocache and cachetools if installed.

Agent Cache — AI Agent-Aware Caching

AgentCache extends async-cache into an AI-agent-aware caching layer with tool execution caching, resource-based invalidation, session scoping, and loop detection.

.. code-block:: python

from agent_cache import AgentCache, AgentCacheInvalidator, AgentCacheSession

Read Tool Caching

Cache results of read-only agent tools with resource tagging and TTL:

.. code-block:: python

@AgentCache(resource="cart", scope="global", ttl=60)
async def get_cart(user_id):
    return await db.fetch_cart(user_id)

# First call fetches from DB; subsequent calls return cached result
cart = await get_cart("user_1")
cart = await get_cart("user_1")  # cache hit — no DB call

Write/Mutation Invalidation

Automatically invalidate related cached reads when mutations occur. Invalidation fires before the mutation, ensuring no stale reads even on mutation failure:

.. code-block:: python

@AgentCacheInvalidator(resource="cart", scope="global")
async def add_to_cart(user_id, item):
    await db.add_item(user_id, item)

await get_cart("user_1")       # cached
await add_to_cart("user_1", "laptop")  # invalidates all "cart" entries, then runs mutation
await get_cart("user_1")       # re-fetched from DB

Session Scoping

Isolate cache state per agent session. Session-scoped caches are cleared when the session ends — different agents or users never share stale data:

.. code-block:: python

@AgentCache(resource="ticket", scope="session")
async def get_ticket(ticket_id):
    return await api.get_ticket(ticket_id)

async with AgentCacheSession(session_id="agent-A") as session:
    await get_ticket("T-100")  # fetched and cached for this session
    await get_ticket("T-100")  # cache hit

# Session ended — cache cleared. A new session starts fresh.

Loop Detection

Detect and halt infinite agent tool loops — oscillations, recursive cycles, retry storms:

.. code-block:: python

async with AgentCacheSession(
    loop_detection=True,
    max_tool_repeats=5,       # max identical (tool+args) calls
    max_execution_depth=50,   # max total tool calls
    on_loop="raise",          # "raise", "warn", or "short_circuit"
) as session:
    # If the agent enters A -> B -> A -> B, raises AgentLoopDetectedError
    # If a tool is called > 5 times with same args, raises
    # If total calls exceed 50, raises

Observability

Expose metrics at session and global level:

.. code-block:: python

from agent_cache import get_metrics

async with AgentCacheSession() as session:
    # ... agent workflow ...
    print(session.get_metrics())
    # {'hits': 12, 'misses': 5, 'invalidations': 2, 'loop_detections': 0, 'hit_rate': 0.706}

print(get_metrics())  # global aggregate

Real-World Use Cases


**1. E-commerce Shopping Agent**

An AI agent assists users with online shopping: browsing products, managing carts,
and checking out. Without caching, the agent redundantly fetches the same cart on every
step. Without invalidation, stale cart data causes incorrect pricing.

.. code-block:: python

    @AgentCache(resource="cart", scope="global", ttl=120)
    async def get_cart(user_id):
        return await shop_api.get_cart(user_id)

    @AgentCache(resource="product", scope="global", ttl=300)
    async def get_product(product_id):
        return await shop_api.get_product(product_id)

    @AgentCacheInvalidator(resource="cart", scope="global")
    async def add_to_cart(user_id, product_id):
        return await shop_api.add_to_cart(user_id, product_id)

    @AgentCacheInvalidator(resource="cart", scope="global")
    async def checkout(user_id):
        return await shop_api.checkout(user_id)

    async with AgentCacheSession(loop_detection=False) as session:
        cart = await get_cart("user_1")        # DB fetch
        product = await get_product("laptop")  # DB fetch
        await add_to_cart("user_1", "laptop")  # invalidates cart cache
        cart = await get_cart("user_1")        # re-fetched (correct total)
        await checkout("user_1")               # invalidates cart cache

**2. Customer Support Agent (Session-Scoped)**

Multiple support agents handle tickets concurrently. Each agent's session caches
are isolated — Agent A updating a ticket doesn't serve stale data to Agent B.

.. code-block:: python

    @AgentCache(resource="ticket", scope="session")
    async def get_ticket(ticket_id):
        return await support_api.get_ticket(ticket_id)

    @AgentCacheInvalidator(resource="ticket", scope="session")
    async def update_ticket(ticket_id, status):
        return await support_api.update_ticket(ticket_id, status)

    # Agent A
    async with AgentCacheSession(session_id="agent-A"):
        ticket = await get_ticket("T-100")          # fetched
        await update_ticket("T-100", "in_progress") # invalidates session cache
        ticket = await get_ticket("T-100")          # re-fetched

    # Agent B — independent session, no stale data from Agent A
    async with AgentCacheSession(session_id="agent-B"):
        ticket = await get_ticket("T-100")  # fresh fetch

**3. Research Agent — Preventing Search Loops**

A research agent searches the web and synthesizes results. Without loop detection,
a confused agent can endlessly repeat the same search or oscillate between search
and summarize.

.. code-block:: python

    @AgentCache(resource="search", scope="global", ttl=600)
    async def search_web(query):
        return await search_api.search(query)

    @AgentCache(resource="page", scope="global", ttl=600)
    async def fetch_page(url):
        return await http.get(url)

    async with AgentCacheSession(
        max_tool_repeats=3,
        max_execution_depth=20,
        on_loop="raise",
    ) as session:
        results = await search_web("async python caching")
        results = await search_web("async python caching")  # cache hit
        # If the agent calls search_web("async python caching") 4 times
        # → AgentLoopDetectedError raised, halting the runaway loop

**4. Inventory Management Agent — Oscillation Detection**

An inventory agent checks stock and updates quantities. A buggy planning loop
can oscillate between checking and updating endlessly.

.. code-block:: python

    @AgentCache(resource="stock", scope="global")
    async def check_stock(item_id):
        return await inventory_api.get_stock(item_id)

    @AgentCacheInvalidator(resource="stock", scope="global")
    async def update_stock(item_id, delta):
        return await inventory_api.adjust(item_id, delta)

    async with AgentCacheSession(max_tool_repeats=100, on_loop="raise") as session:
        await check_stock("SKU-42")
        await update_stock("SKU-42", -1)
        await check_stock("SKU-42")
        await update_stock("SKU-42", -1)
        # → AgentLoopDetectedError: Cycle detected: check_stock -> update_stock

**5. Multi-API Orchestrator — Retry Storm Detection**

An agent calling flaky external APIs can get stuck retrying. Loop detection catches
retry storms before they exhaust rate limits or budgets.

.. code-block:: python

    @AgentCacheInvalidator(resource="payment", scope="global")
    async def charge_payment(order_id, amount):
        return await payment_api.charge(order_id, amount)

    async with AgentCacheSession(max_tool_repeats=3, on_loop="raise") as session:
        for attempt in range(10):
            try:
                await charge_payment("ORD-1", 99.99)
                break
            except PaymentError:
                continue  # retry
        # After 4th attempt → AgentLoopDetectedError stops the retry storm

Testing
-------

Run the full test suite:

.. code-block:: shell

    python -m unittest discover tests -v

Run benchmarks:

.. code-block:: shell

    python benchmarks/run.py
    python benchmarks/run.py --iterations 50000 --json results.json

A local test dashboard is also available for interactive testing:

.. code-block:: shell

    python demo/app.py  # Runs on http://localhost:5001

Use it to verify caching behavior, metrics, and concurrent load handling.

Documentation
-------------

Full documentation is available at https://async-cache.readthedocs.io/ and in the
``docs/`` directory:

- **User Guide**: When to use which feature, configuration reference
- **Remote Cache**: Distributed Redis caching, two-tier L1/L2 architecture
- **Disk Persistence**: Setup, custom backends, DiskBackend details
- **Cache Invalidation**: Decorator invalidators, key mapping, clear-all mode
- **Agent Cache**: AI-agent-aware caching, session scoping, loop detection
- **Benchmarks**: How to run, sample results, competitor comparison
- **Real-World Examples**: FastAPI, Django, aiohttp, ML inference
- **Migration Guide**: Upgrading from v2.0, migrating from other libraries
- **Performance Tuning**: maxsize, TTL, batch loader, monitoring