Is .from_source() necessary for a Flow Factory design pattern? #20285
Replies: 8 comments
-
|
Hey @davidbernat! We have some of these guards in place because we save the entrypoint to the flow (which comprises the file path and then name of the decorated function in the file) so that Prefect can load the flow during remote execution. Overall, I don't think that this flow factory pattern is going to play well with Prefect as it currently exist because we assume there is static code somewhere that Prefect can load your load from for execution. If you share more info on why you're trying you're using this pattern, we can work together to find something that will work. |
Beta Was this translation helpful? Give feedback.
-
|
@desertaxle The documentation across sources of the website could use a lot of improvement, and the various LLMs across the Internet (c.f. Gemini, Prefect MCP, etc.) tend to provide surprisingly poor results, including mixing Prefect 1, Prefect 2, and Prefect 3 documentation, and referencing methods and constraints that do not exist. You should be made aware of that, and the problem is at least as bad as when I first ducked into Prefect in May 2025, and you surely want to make improvements with the appropriate teams internally. It matters, as the learning curve is severe, for reasons which have nothing to do with the use case design patterns Prefect is providing as its business case, and subsequently, also, the code generation capability of the state-of-the-art LLMs is far, far inferior to other out-of-the-box packages, in the few weeks of kicking tires that I tried. Prefect has an immense value proposition, and I am excited to begin deploying it internally in my house. But I was quite surprised each time (including May 2025) and enough so that I discussed this (ad nauseam) to other people in my cohorts and collaborations. It is almost certainly a simple, few month fix for Prefect with a dedicated part-time team to be aware of the new requirements. Now, to discuss my substantial ignorances. In answer to your question to my question, I found some progress that supported my earlier presumption, and found new, somewhat unrelated errors that result in a new formulation of a blocker bug, which I would like to discuss. I also found the practice of robustly creating workflows and workers in the Python SDK to be surprisingly difficult, and error prone (including the above) and I still needed to rely on subprocesses, and almost assuredly have holes in the management of possible redundancies. So I will add my How to Factory Flow? pool_name = ProcessingPipeline.ensure_prefect_worker_pool_created()
kwargs = dict(cron=cron, name="deploy", work_pool_name=pool_name,
parameters=parameters, entrypoint_type=EntrypointType.MODULE_PATH)This correctly bypasses the earlier error, and appears to correctly identify the appropriate runtime files in the foreign directory outside the runtime cwd. The correct module is loaded correctly, but something inside the Flow instance itself applies the wrong logic. I will explain after the error, which in my production code is: AttributeError: module 'cloudnode.base.iaas.extensions.pipelines' has no attribute 'flow_modify_cloudnode_data'
prefect.exceptions.MissingFlowError: Flow function with name 'flow_modify_cloudnode_data' not found in 'cloudnode.base.iaas.extensions.pipelines'.This module error is, actually, correct. flow_modify_cloudnode_data does not exist in that module. To put this in terms of the toy code above, the errors would be: AttributeError: module 'foreign.FlowFactory' has no attribute 'example_task'
prefect.exceptions.MissingFlowError: Flow function with name 'example_task' not found in 'foreign.FlowFactory'.As you can see, this is correct. Module foreign.FlowFactory has example_task bound inside function FlowFactory. In the sense that you are describing, this is entirely a correct behavior of Prefect. When the Flow is deployed, the function example_task exists only in memory of the client process creating the deployment. Over on the server, even with the same code repository accessible, the function example_task itself does not exist because FlowFactory would need to be called to create the function example_task. It does not feel like an impossible situation, and it may even mean that FlowFactory() itself should be the Flow (with access to the original input arguments greeting and n), which is no less robust than the original approach, although a Flow creating a Task at runtime is almost certainly not the intended design pattern. What would you do? How to Python SDK ensure deployment? @staticmethod
async def ensure_prefect_worker_pool_created():
"""Helper function for making sure the specific WorkerPool assigned to ProcessingPipeline exists."""
# Prefect 3 requires async client for full worker management functionality
# Sync client lacks read_workers_for_work_pool() and proper worker status
async with get_client() as client:
try:
await client.read_work_pool(WORKER_POOL_NAME)
logger.info(f"WorkPool={WORKER_POOL_NAME} exists")
except Exception:
work_pool = WorkPoolCreate(name=WORKER_POOL_NAME, type="process",
description="CloudNode default WorkPool for ProcessingPipeline Flows",)
await client.create_work_pool(work_pool)
logger.info(f"Created WorkPool={WORKER_POOL_NAME}")
# this should only ever return 0 or 1 workers, named WORKER_POOL_NAME, in work_pool named WORKER_POOL_NAME.
workers = await client.read_workers_for_work_pool(WORKER_POOL_NAME)
workers = [w for w in workers if w.name == WORKER_POOL_NAME]
if len(workers) > 1: raise RuntimeError(f"Multiple workers found with name={WORKER_POOL_NAME}")
if len(workers) == 1:
worker = workers[0]
import datetime
heartbeat_cutoff = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=60)
if worker.last_heartbeat_time and worker.last_heartbeat_time > heartbeat_cutoff:
logger.info(f"Worker {WORKER_POOL_NAME} is running and active")
else:
logger.warning(f"Worker {WORKER_POOL_NAME} exists but not actively polling, restarting")
# Worker has to be reformated as ProcessWorker; still do not know whether this created duplicates.
worker = ProcessWorker(name=WORKER_POOL_NAME, work_pool_name=WORKER_POOL_NAME, limit=5)
# create_task(worker.start())
prefect_start_worker_in_background(worker)
logger.info(f"Restarted worker {WORKER_POOL_NAME}")
else:
worker = ProcessWorker(name=WORKER_POOL_NAME, work_pool_name=WORKER_POOL_NAME, limit=5)
prefect_start_worker_in_background(worker)
# create_task(worker.start())
logger.info(f"Started worker {WORKER_POOL_NAME}")
return WORKER_POOL_NAME
# Prefect 3 starts workers as blocking calls and needs their own threading to push to the background; feels weird to me.
def prefect_start_worker_in_background(process_worker):
import subprocess
proc = subprocess.Popen([
"prefect", "worker", "start",
"--pool", WORKER_POOL_NAME,
"--name", WORKER_POOL_NAME,
"--limit", "5"
])
# thread = threading.Thread(target=process_worker.start, daemon=True)
# thread.start()Very excited. Thanks. |
Beta Was this translation helpful? Give feedback.
-
|
Hi @davidbernat, your usage of Prefect is quite unique and does not align with how most people work with the product. Based on what you've described, I think the simplest thing is to use the Here's a simple example (let's call it from prefect import flow
@flow
def my_flow(name: str):
print(f"hello {name}")
if __name__ == "__main__":
my_flow.serve("example-deployment")https://docs.prefect.io/v3/api-ref/python/prefect-flows#serve-2 Run This registers the Flow as a deployment, then starts a built-in worker to make the flow available to you. It will run until you press Control + C to stop the process. Note that the You can serve multiple Flows at once: https://docs.prefect.io/v3/how-to-guides/deployment_infra/run-flows-in-local-processes#serve-multiple-flows-at-once Something like systemd could be used to daemonize the script to ensure only one instance is running at any time. |
Beta Was this translation helpful? Give feedback.
-
|
Hey @bdalpe thank you for the compliment 😉. The example presented above (regarding local compute and its simplicity) is a template model for a larger scale factor that we already operate, so we are quite confident that a solution to our design pattern exists here. (And we thank you for your suggestion. There are problems of their own in monitoring exactly what happens by simply running concurrent background .serve threads, and the solution would not scale). No doubt though that I do not have an matching "Prefectic.ly" mindset for how the creators of the suite use their software in the design factors that are presented here. I think we should focus on the specific form factor that I requested (for reasons that are more technically sophisticated than my example discussed), but I do want to know how you would approach a problem (since this form factor is so different than yours). In particular because the problem is the Flow.deploy() issue, in which a function has been constructed in memory, and as such does not exist on the server. So, for the most part, the issue of foreign directories outside of working directories is solved by my last change. So, acknowledging this in-memory function is a fundamental design challenge (which needs to be replicated on the server, and can be done so), from scratch I am curious how you would solve this problem "Prefectic.ly". But again I do not want to deviate from my original question. But the specific case this was designed for is this: |
Beta Was this translation helpful? Give feedback.
-
|
@davidbernat I'm happy to continue this discussion here. Polling the database using a Deployment on a schedule is a good approach. You can pass each row to a @flow
def dispatcher(batch_size: int = 200) -> None:
routes = load_routes()
all_futures = []
for r in routes:
# dynamic tags for observability + concurrency control
with tags(f"table:{r.table}", f"handler:{r.handler_ref}"):
items = claim_items(r.table, batch_size)
for item in items:
fut = process_one.submit(item, table=r.table, handler_ref=r.handler_ref)
all_futures.append(fut)
# fan-in
for fut in all_futures:
fut.result()Regarding dynamic imports, you can accomplish this through the Something like this: # ----- dynamic handler execution -----
# example handler_ref: myapp.handlers.orders:process_order
def resolve_handler(handler_ref: str) -> Callable[[dict[str, Any]], Any]:
module_path, func_name = handler_ref.split(":", 1)
mod = importlib.import_module(module_path)
fn = getattr(mod, func_name)
if not callable(fn):
raise TypeError(f"{handler_ref} is not callable")
return fn
@task
def process_one(item: dict[str, Any], table: str, handler_ref: str) -> None:
logger = get_run_logger()
item_id = item.get("id")
fn = resolve_handler(handler_ref)
logger.info("Processing %s id=%s with %s", table, item_id, handler_ref)
try:
fn(item) # your business logic
mark_done(table, item_id)
except Exception as exc:
mark_failed(table, item_id, reason=str(exc))
raise |
Beta Was this translation helpful? Give feedback.
-
|
@bdalpe agree with your solution in broad strokes but trying very specifically not to implement that pattern. The same is true for how we bundle and compile our functions as API endpoints for larger system deployment. I will have to return to this in several days or weeks when I want to think about the architecture. In the meantime, we keep getting bugs in the Python SDK that are minor, but in this case concrete. This is called from the async client immediately after WorkerPool is created. It is intended to start one Worker if and only if a Worker does not exist running. Then the come continues and our app picks up waiting flows and begins to deploy them to the WorkerPool. Everything works correctly using subprocess. But using threading and the Prefect recommended create_task both fail silently. And since worker is async by design I am not completely sure why simply starting the async coroutine directly does not also work. It probably is a subtle syntax mistake on my part why these other three options do not work. Pretty cool stuff, remote deployment. # happening inside async client context manager
worker = ProcessWorker(name=WORKER_POOL_NAME, work_pool_name=WORKER_POOL_NAME, limit=5)
prefect_start_worker_in_background(worker)
# worker.start() # Fails unknown: async, so work happening in background, as we want
# create_task(worker.start()) # Fails unknown: so work happening in background, as we want
# includes as a function elsewhere the above code snippet references
import threading
def prefect_start_worker_in_background(process_worker):
import subprocess
proc = subprocess.Popen([
"prefect", "worker", "start",
"--pool", WORKER_POOL_NAME,
"--name", WORKER_POOL_NAME,
"--limit", "5"
])
# FAILS unknown: why does this not eventually demonstrate its started worker?
# thread = threading.Thread(target=process_worker.start, daemon=True)
# thread.start()Hope it makes sense to your team why your founders decided to refer to your company as the toothpick in the sandwich. |
Beta Was this translation helpful? Give feedback.
-
|
You're not just creating a co-routine in that FlowFactory. You're creating a "closure" https://realpython.com/python-closure/ A typical flow is a function and some parameters, but a closure includes an encapsulated namespace that was created at runtime. Until I googled it, I had no idea a closure even could be serialized -- https://www.py4u.org/blog/python-serialize-lexical-closures/ If you are really wanting this pattern, I would suggest serializing the closure and passing it to the remote server as a function argument, where it could be deserialized and executed by a receiving flow that knows how to do that. This could obviously be a code-injection pathway, so think about security if it's not on an isolated network. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Bug summary
First time user, excited enthusiast. My use case requires me to create a Flow Wrapper Factory design path in a shared directory outside the application cwd repository. The cwd directory imports the Flow Wrapper Factory machinery, and creates a Flow object for the local application. This has all the correct characteristics, except its decorated function exists in a foreign directory outside the cwd. Upon deployment this triggers the `ValueError: '/foreign/factory.py' is not in the subpath of '/cwd' which is correct.
All searches point me to needing to construct the Flow object dynamically with .from_source() but this is not the correct design pattern for this use case. Mine is a simpler important problem, it seems.
Code looks a little something like this in the foreign directory outside the cwd:
And here inside the app directory which uses the foreign code as a common infrastructure with other apps.
This feels like something that should be inherited automatically from the filepath knowledge of FlowFactory, somewhere in the guts of Prefect which is doing some version of inspecting where the code is. (i.e., Flow.deploy() knows where the totality of the code is located; to the exactitude that at first I assumed this was some Prefect chosen permissions limitation to block malicious code from executing pulled anywhere in the machine, or a derivative of a PYTHONPATH problem disconnecting inside Prefect Flow.) It would clearly be conceivable to do with the design pattern above for clarity reasons, and Prefect Flow clearly knows what code should be executed. It would clearly be the incorrect design pattern to create a short text file Python script for each variation of the FlowFactory() instantiation simply so that I could have a path file to load with Flow.from_source(). It may be that I do not understand what that set of signatures is supposed to look like. Thanks a metric ton. This is clearly a huge utility and I am very eager to get this system up and running tomorrow morning. Warm regards.
Version info
Additional context
No response
Beta Was this translation helpful? Give feedback.
All reactions