Skip to content

dbt Tasks

prefect_dbt_flow.dbt.tasks

Code for generate prefect DAG, includes dbt run and test functions

generate_tasks_dag(project, profile, dag_options, dbt_graph, run_test_after_model=False)

Generate a Prefect DAG for running and testing dbt models.

Parameters:

Name Type Description Default
project DbtProject

A class that represents a dbt project configuration.

required
profile Optional[DbtProfile]

A class that represents a dbt profile configuration.

required
dag_options Optional[DbtDagOptions]

A class to add dbt DAG configurations.

required
dbt_graph List[DbtNode]

A list of dbt nodes (models) to include in the DAG.

required
run_test_after_model bool

If True, run tests after running each model.

False

Returns:

Type Description
None

None

Source code in prefect_dbt_flow/dbt/tasks.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def generate_tasks_dag(
    project: DbtProject,
    profile: Optional[DbtProfile],
    dag_options: Optional[DbtDagOptions],
    dbt_graph: List[DbtNode],
    run_test_after_model: bool = False,
) -> None:
    """
    Generate a Prefect DAG for running and testing dbt models.

    Args:
        project: A class that represents a dbt project configuration.
        profile: A class that represents a dbt profile configuration.
        dag_options: A class to add dbt DAG configurations.
        dbt_graph: A list of dbt nodes (models) to include in the DAG.
        run_test_after_model: If True, run tests after running each model.

    Returns:
        None
    """

    # TODO: refactor this
    all_tasks = {
        dbt_node.unique_id: RESOURCE_TYPE_TO_TASK[dbt_node.resource_type](
            project=project,
            profile=profile,
            dag_options=dag_options,
            dbt_node=dbt_node,
        )
        for dbt_node in dbt_graph
    }

    submitted_tasks: Dict[str, PrefectFuture] = {}
    while node := _get_next_node(dbt_graph, list(submitted_tasks.keys())):
        run_task = all_tasks[node.unique_id]
        task_dependencies = [
            submitted_tasks[node_unique_id] for node_unique_id in node.depends_on
        ]

        run_task_future = run_task.submit(wait_for=task_dependencies)

        if run_test_after_model and node.has_tests:
            test_task = _task_dbt_test(
                project=project,
                profile=profile,
                dag_options=dag_options,
                dbt_node=node,
            )
            test_task_future = test_task.submit(wait_for=run_task_future)

            submitted_tasks[node.unique_id] = test_task_future
        else:
            submitted_tasks[node.unique_id] = run_task_future