In a previous article, we explored how to manage background tasks in FastAPI using ARQ and Redis. While that setup is great for offloading work, real-world applications must handle failure. Network connections drop, external APIs return errors, and services temporarily go offline. A robust system doesn't just run tasks; it anticipates and recovers from these failures.
This guide dives into ARQ's powerful retry mechanisms. We'll cover how to automatically handle interruptions, implement custom retry logic with exponential backoff, and build resilient, production-grade task queues, a key component of any production-ready FastAPI application.
One of ARQ's most critical features for reliability is its default behavior during shutdowns or cancellations. If an ARQ worker is stopped while a job is running (for example, during a deployment or by pressing Ctrl+C), it doesn't just discard the job. Instead, it gracefully cancels the task and immediately requeues it to be run again later.
When the worker is restarted, it picks up the cancelled job from the queue and runs it from the beginning.
You can see this in the worker's log output:
➤ arq worker.WorkerSettings 12:42:38: Starting worker... 12:42:38: 10.23s → job_id:the_task() delayed=10.23s 12:42:40: shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 1 ongoing to cancel 12:42:40: 1.16s ↻ job_id:the_task cancelled, will be run again
When the worker starts again:
➤ arq worker.WorkerSettings 12:42:50: Starting worker... 12:42:50: 21.78s → job_id:the_task() try=2 delayed=21.78s 12:42:55: 5.00s ← job_id:the_task ●
Notice the try=2 in the log. ARQ automatically tracks retry attempts. This default behavior ensures that transient interruptions don't cause jobs to be lost, providing a strong foundation for a reliable system.
This automatic requeueing behavior means that ARQ guarantees at-least-once delivery. A job will run at least one time, but it might run more than once if it's interrupted.
Because of this, your tasks must be idempotent. An idempotent operation is one that can be applied multiple times without changing the result beyond the initial application. For example, setting a user's status to active is idempotent, but adding a $10 charge to their account is not.
To make non-idempotent operations safe, you can:
For failures that happen within your task logic—like a failed API call—you can manually trigger a retry by raising the arq.worker.Retry exception. This tells the worker to stop the current execution and requeue the job.
You can also specify a defer period to tell the worker how long to wait before trying again. This is essential for implementing backoff strategies to avoid overwhelming a struggling service.
Let's create a task that downloads content from a URL using the popular httpx library. If the API returns a non-200 status code, we'll retry the job with an increasing delay. This is known as a linear backoff.
# in tasks.py import asyncio from httpx import AsyncClient from arq import Retry # This function will be registered as an ARQ task async def download_content(ctx, url: str): """ Downloads content from a URL, with retries on failure. """ session: AsyncClient = ctx['session'] job_try = ctx.get("job_try", 1) max_tries = ctx.get("max_tries", 5) print(f"Attempt {job_try}/{max_tries}: Downloading {url}...") try: response = await session.get(url) response.raise_for_status() # Raises an exception for 4xx/5xx responses except Exception as e: print(f"Download failed: {e}") # If we have retries left, raise Retry with a delay if job_try < max_tries: # Delays will be 5s, 10s, 15s, 20s defer_by = job_try * 5 print(f"Retrying in {defer_by} seconds...") raise Retry(defer=defer_by) from e else: # If no retries left, let the exception bubble up to fail the job print("Max retries reached. Job will fail.") raise e print("Download successful.") return len(response.text) # --- Worker Configuration --- # in worker.py async def startup(ctx): """ Creates an httpx.AsyncClient instance for the worker to use. """ ctx['session'] = AsyncClient() async def shutdown(ctx): """ Closes the httpx.AsyncClient instance. """ await ctx['session'].aclose() class WorkerSettings: functions = [download_content] on_startup = startup on_shutdown = shutdown max_tries = 5 # Set the default max_tries for all jobs
In this example:
ctx dictionary, which ARQ provides to every job, to get the current job_try number.job_try < max_tries). For robust monitoring, this is where you would also implement structured logging to record the failure.Retry, passing a defer value that increases with each attempt.Sometimes, you may have a function that you want to use both as an ARQ task and as a regular async function elsewhere in your code. If you raise Retry outside of an ARQ worker context, it will cause an unhandled exception.
To solve this, you can check for the existence of the ctx dictionary to determine if the function is running as an ARQ job.
import requests from arq import Retry async def call_external_service(ctx: dict | None, payload: dict): try: # ... logic to call the service ... response = requests.post("https://api.example.com/data", json=payload) response.raise_for_status() return response.json() except requests.exceptions.ConnectionError as e: # This logic runs ONLY if the function is executed by an ARQ worker if ctx: job_try = ctx.get("job_try", 1) max_tries = ctx.get("max_tries", 5) if job_try < max_tries: # Retry with a fixed 20-second delay raise Retry(defer=20) # Re-raise the exception for arq if no retries are left raise Exception("Connection Error: The service is currently unavailable. Please try again later.") # rest of the function logic..
This pattern makes your functions more versatile. When run by ARQ, it leverages the retry system. When called directly, it fails fast by raising a standard exception, allowing the calling code to handle the error immediately.
Building a reliable system requires planning for failure. ARQ provides simple yet powerful tools to make your background tasks resilient to transient errors and interruptions. By combining automatic requeueing with manual Retry logic and backoff strategies, you can ensure that your important jobs eventually get done, even when things go wrong. This approach is fundamental to creating production-ready applications with FastAPI.
About the Author
David Muraya is a Solutions Architect specializing in Python, FastAPI, and Cloud Infrastructure. He is passionate about building scalable, production-ready applications and sharing his knowledge with the developer community. You can connect with him on LinkedIn.
Related Blog Posts
Enjoyed this blog post? Check out these related posts!

Managing Background Tasks in FastAPI: BackgroundTasks vs ARQ + Redis
A practical guide to background processing in FastAPI, comparing built-in BackgroundTasks with ARQ and Redis for scalable async job queues.
Read More...

How to Protect Your FastAPI OpenAPI/Swagger Docs with Authentication
A Guide to Securing Your API Documentation with Authentication
Read More...

A Practical Guide to FastAPI Security
A Comprehensive Checklist for Production-Ready Security for a FastAPI Application
Read More...

How to Handle File Uploads in FastAPI
A Practical Guide to Streaming and Validating File Uploads
Read More...
Contact Me
Have a project in mind? Send me an email at hello@davidmuraya.com and let's bring your ideas to life. I am always available for exciting discussions.