FluxQueue class
This page documents the main Python entry point for FluxQueue.
FluxQueue is the object you create in your app to enqueue work into Redis for the worker to process.
You can import the FluxQueue class from fluxqueue:
fluxqueue.FluxQueue
High-level client for enqueueing Python callables as background tasks.
It uses the Rust-backed core to push tasks into Redis and is intended to be the main entry point used from your application code.
In most cases you create a single instance per application or service and
reuse it. The redis_url parameter controls which Redis instance is used.
Source code in python/fluxqueue/client.py
task
Mark a function as a FluxQueue task.
When you apply to a function that function is being marked as a task function. Running it will enqueue the task and then the worker will execute it.
Parameters
name:
Optional explicit task name. If not set, a name is derived from the
function name.
queue:
Name of the queue to push tasks to. Defaults to "default".
max_retries:
Maximum number of retries the worker will attempt for this task
before treating it as dead.
Example
@fluxqueue.task()
async def send_email_task(name: str, username: str, email: str):
async with get_email_client() as client:
await send_email(
email_client=client,
to_email=email,
subject="Welcome to FluxQueue",
config=email_config,
)
Enqueueing the task
Source code in python/fluxqueue/client.py
task_with_context
Mark a function as a FluxQueue task with context.
This decorator works like the task decorator but adds support for the Context class.
The function must accept a context as its first argument, with Context (or a subclass of Context) as the type hint.
When decorated, the context argument is automatically injected by the worker and is no longer
part of the function's public signature - users calling the function do not need to provide it.
Parameters
name:
Optional explicit task name. If not set, a name is derived from the
function name.
queue:
Name of the queue to push tasks to. Defaults to "default".
max_retries:
Maximum number of retries the worker will attempt for this task
before treating it as dead.
Example: Subclassing Context for Database Resource Pooling
This example demonstrates how to use thread_storage to persist a
SQLAlchemy engine across the lifetime of a worker thread. This pattern
prevents the overhead of recreating database connection pools for
every individual task execution.
from fluxqueue import FluxQueue, Context
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from contextlib import asynccontextmanager
fluxqueue = FluxQueue()
class DbContext(Context):
def __init__(self) -> None:
super().__init__()
def _get_local_session(self) -> async_sessionmaker[AsyncSession]:
if not self.thread_storage.get("session"):
engine = create_async_engine(SQLALCHEMY_DATABASE_URL)
self.thread_storage["session"] = async_sessionmaker(
bind=engine, expire_on_commit=False
)
return self.thread_storage["session"]
@asynccontextmanager
async def session_context(self):
local_session = self._get_local_session()
async with local_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@fluxqueue.task_with_context()
async def create_user_task(ctx: DbContext, email: str, username: str):
async with ctx.session_context() as db_session:
user = User(email=email, username=username)
db_session.add(user)
await create_user_task(email, username)
Source code in python/fluxqueue/client.py
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | |