Skip to main content

Variables and functions

Inside a Python script, you get access to some useful variables and functions.

context Variable

context is an object with information about the current script context.

context.current_model

This propery holds information relevant to the model, which is associated with the running script. For the meta Syntax example, we would get the following:

context.current_model.name
# str
#= historical_ozone_levels

context.current_model.status
# NodeStatus, enum: 'success' | 'error' | 'skipped'

context.current_model.columns
# Dict[str, ColumnInfo(name: str, tags: List[str], meta: Dict)]

context.current_model.tests
# List[CurrentTest(name: str, modelname: str, column: str, status: str)]

context.current_model.meta
# meta information in the schema.yml
#= {'owner': '@me'}

context.current_model object also has access to test information related to the current model. If the previous dbt command was either test or build, the context.current_model.test property is populated with a list of tests:

context.current_model.tests
#= [CurrentTest(name='not_null', modelname='historical_ozone_levels, column='ds', status='Pass')]

Another relevant property of the current_model is adapter_response. It contains information that was received from the dbt SQL adapter after computing the model:

context.current_model.adapter_response
#= CurrentAdapterResponse(message='SELECT 10', code='SELECT', rows_affected=10)

Read functions

The familiar dbt functions ref and source are available in fal scripts to read the models and sources as a Pandas DataFrame.

ref function

The ref function is used exactly like in dbt. You reference a model in your project

# returned as `pandas.DataFrame`
df = ref('model_name')

Or a package model (package first, model second)

df = ref('dbt_artifacts', 'dim_dbt__exposures')

You can use the context variable to the the associated model data

df = ref(context.current_model.name)

source function

The source function is used exactly like in dbt. You reference a source in your project

# returned as `pandas.DataFrame`
df = source('source_name', 'table_name')

execute_sql function

You can execute artbitrary SQL from within your Python scripts and get results as pandas DataFrames:

my_df = execute_sql('SELECT * FROM {{ ref("my_model") }}')

As you can see, the query strings support jinja.

Note that the use of ref inside execute_sql does not create a node in a dbt dag. So in the case of Python models, you still need to specify dependencies in a comment at the top of the file. For more details, see here.

list_models function

You can access model information for all models in the dbt project:

my_models = list_models()

my_models[0].status
# <NodeStatus.Success: 'success'>

my_models[0].name
# 'zendesk_ticket_data'

list_models returns a list of DbtModel objects that contain model and related test information.

list_sources function

You can access source information for all sources in the dbt project:

my_sources = list_sources()

my_sources[0].name
# 'zendesk_ticket_data'

my_sources[0].tests
# []

list_sources returns a list of DbtSource objects that contain source and related test information.

Write functions

It is also possible to send data back to your data warehouse. This makes it easy to get the data, process it, and upload it back into dbt territory.

write_to_source function

You first have to define the source in your schema. This operation appends to the existing source by default and should only be used targetting tables, not views.

# Upload a `pandas.DataFrame` back to the data warehouse
write_to_source(df, 'source_name', 'table_name2')

write_to_source also accepts an optional dtype argument, which lets you specify datatypes of columns. It works the same way as dtype argument for DataFrame.to_sql function.

from sqlalchemy.types import Integer
# Upload but specifically create the `value` column with type `integer`
# Can be useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'value': Integer()})

write_to_model function

This operation overwrites the existing relation by default and should only be used targetting tables, not views.

For example, if the script is attached to the zendesk_ticket_metrics model,

models:
- name: zendesk_ticket_metrics
meta:
fal:
scripts:
after:
- from_zendesk_ticket_data.py

write_to_model will write to the zendesk_ticket_metrics table:

df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)

# Upload a `pandas.DataFrame` back to the data warehouse
write_to_model(df) # writes to attached model: zendesk_ticket_metrics

NOTE: When used with fal flow run or fal run commands, write_to_model does not accept a model name, it only operates on the associated model.

But when importing fal as a Python module, you have to specify the model to write to:

from fal import FalDbt
faldbt = FalDbt(profiles_dir="~/.dbt", project_dir="../my_project")

faldbt.list_models()
# [
# DbtModel(name='zendesk_ticket_data' ...),
# DbtModel(name='agent_wait_time' ...)
# ]

df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)

faldbt.write_to_model(df, 'zendesk_ticket_metrics') # specify the model

Specifying column types

The functions write_to_source and write_to_model also accept an optional dtype argument, which lets you specify datatypes of columns. It works the same way as dtype argument for DataFrame.to_sql function.

from sqlalchemy.types import Integer

# Upload but specifically create the `my_col` column with type `integer`
# Can be specially useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'my_col': Integer()})

Modes of writing

These functions accepts two modes of writing: append and overwrite.

They are passed with the optional mode argument (append is the default value).

# Overwrite the table with the dataframe data, deleting old data
write_to_source(df, 'source_name', 'table_name', mode='overwrite')
write_to_model(df, 'model_name', mode='overwrite') # default mode

# Append more data to the existing table (create it if it does not exist)
write_to_source(df2, 'source_name', 'table_name', mode='append') # default mode
write_to_model(df2, 'model_name', mode='apend')

The append mode

  1. creates the table if it does not exist yet
  2. insert data into the table

The overwrite mode

  1. creates a temporal table
  2. insert data into the temporal table
  3. drops the old table if it exists
  4. renames the temporal table to the final table name

meta syntax

models:
- name: historical_ozone_levels
...
meta:
owner: "@me"
fal:
scripts:
- send_slack_message.py
- another_python_script.py # will be run sequentially

Use the fal and scripts keys underneath the meta config to let fal CLI know where to look for the Python scripts. You can pass a list of scripts as shown above to run one or more scripts as a post-hook operation after a dbt run.