Reusable containers
By default, each task execution in Flyte and Union runs in a fresh container instance that is created just for that execution and then discarded. With reusable containers, the same container can be reused across multiple executions and tasks. This approach reduces start up overhead and improves resource efficiency.
The reusable container feature is only available when running your Flyte code on a Union backend.
How It Works
With reusable containers, the system maintains a pool of persistent containers that can handle multiple task executions.
When you configure a TaskEnvironment with a ReusePolicy, the system does the following:
- Creates a pool of persistent containers.
- Routes task executions to available container instances.
- Manages container lifecycle with configurable timeouts.
- Supports concurrent task execution within containers (for async tasks).
- Preserves the Python execution environment across task executions, allowing you to maintain state through global variables.
Basic Usage
The reusable containers feature currently requires a dedicated runtime library
(
unionai-reuse) to be installed in the task image used by the reusable task.
You can add this library to your task image using the flyte.Image.with_pip_packages method, as shown below.
This library only needs to be added to the task image.
It does not need to be installed in your local development environment.
Enable container reuse by adding a ReusePolicy to your TaskEnvironment:
# /// script
# requires-python = "==3.13"
# dependencies = [
# "flyte>=2.0.0b52",
# ]
# main = "main"
# params = "n=500"
# ///
import flyte
from datetime import timedelta
# Currently required to enable resuable containers
reusable_image = flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.10")
env = flyte.TaskEnvironment(
name="reusable-env",
resources=flyte.Resources(memory="1Gi", cpu="500m"),
reusable=flyte.ReusePolicy(
replicas=2, # Create 2 container instances
concurrency=1, # Process 1 task per container at a time
scaledown_ttl=timedelta(minutes=10), # Individual containers shut down after 5 minutes of inactivity
idle_ttl=timedelta(hours=1) # Entire environment shuts down after 30 minutes of no tasks
),
image=reusable_image # Use the container image augmented with the unionai-reuse library.
)
@env.task
async def compute_task(x: int) -> int:
return x * x
@env.task
async def main() -> list[int]:
# These tasks will reuse containers from the pool
results = []
for i in range(10):
result = await compute_task(i)
results.append(result)
return results
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()
ReusePolicy parameters
For complete parameter documentation, including accepted types, defaults, capacity math, and lifecycle behavior, see the
ReusePolicy API reference.
Understanding parameter relationships
The four ReusePolicy parameters work together to control different aspects of container management:
reuse_policy = flyte.ReusePolicy(
replicas=4, # Infrastructure: How many containers?
concurrency=3, # Throughput: How many tasks per container?
scaledown_ttl=timedelta(minutes=10), # Individual: When do idle containers shut down?
idle_ttl=timedelta(hours=1) # Environment: When does the whole pool shut down?
)
# Total capacity: 4 × 3 = 12 concurrent tasks
# Individual containers shut down after 10 minutes of inactivity
# Entire environment shuts down after 1 hour of no tasksKey relationships
- Total throughput =
replicas × concurrency - Resource usage =
replicas × TaskEnvironment.resources - Cost efficiency: Higher
concurrencyreduces container overhead, morereplicasprovides better isolation - Lifecycle management:
scaledown_ttlmanages individual containers,idle_ttlmanages the environment
Simple example
Here is a simple, but complete, example of reuse with concurrency
First, import the needed modules, set upf logging:
import asyncio
import logging
import flyte
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
env = flyte.TaskEnvironment(
name="reuse_concurrency",
resources=flyte.Resources(cpu=1, memory="1Gi"),
reusable=flyte.ReusePolicy(
replicas=2,
idle_ttl=60,
concurrency=100,
scaledown_ttl=60,
),
image=flyte.Image.from_debian_base().with_pip_packages("unionai-reuse>=0.1.10"),
)
reuse_concurrency task (the main driver task of the workflow) and the noop task that will be executed multiple times reusing the same containers:@env.task
async def noop(x: int) -> int:
logger.debug(f"Task noop: {x}")
return x
@env.task
async def main(n: int = 50) -> int:
coros = [noop(i) for i in range(n)]
results = await asyncio.gather(*coros)
return sum(results)
python reuse_concurrency.py to see it in action: