Skip to content

dbt Flow

prefect_dbt_flow.flow

Functions to create a prefect flow for a dbt project.

dbt_flow(project, profile=None, dag_options=None, flow_kwargs=None)

Create a PrefectFlow for executing a dbt project.

Parameters:

Name Type Description Default
project DbtProject

A Class that represents a dbt project configuration.

required
profile Optional[DbtProfile]

A Class that represents a dbt profile configuration.

None
dag_options Optional[DbtDagOptions]

A Class to add dbt DAG configurations.

None
flow_kwargs Optional[dict]

A dict of prefect @flow arguments

None

Returns:

Name Type Description
dbt_flow Flow

A Prefec Flow.

Source code in prefect_dbt_flow/flow.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def dbt_flow(
    project: DbtProject,
    profile: Optional[DbtProfile] = None,
    dag_options: Optional[DbtDagOptions] = None,
    flow_kwargs: Optional[dict] = None,
) -> Flow:
    """
    Create a PrefectFlow for executing a dbt project.

    Args:
        project: A Class that represents a dbt project configuration.
        profile: A Class that represents a dbt profile configuration.
        dag_options: A Class to add dbt DAG configurations.
        flow_kwargs: A dict of prefect @flow arguments

    Returns:
        dbt_flow: A Prefec Flow.
    """
    all_flow_kwargs = {
        "name": project.name,
        **(flow_kwargs or {}),
    }

    dbt_graph = graph.parse_dbt_project(project, profile, dag_options)

    @flow(**all_flow_kwargs)
    def dbt_flow():
        """
        Function that configurates and runs a Prefect flow.

        Returns:
            A prefect flow
        """
        tasks.generate_tasks_dag(
            project,
            profile,
            dag_options,
            dbt_graph,
            dag_options.run_test_after_model if dag_options else False,
        )

    return dbt_flow