Skip to main content

Variables and functions

Inside a Python script, you get access to some useful variables and functions.

context Variable​

context is an object with information about the current script context.

context.current_model​

This propery holds information relevant to the model, which is associated with the running script. For the meta Syntax example, we would get the following:

context.current_model.name
# str
#= historical_ozone_levels

context.current_model.status
# NodeStatus, enum: 'success' | 'error' | 'skipped'

context.current_model.columns
# Dict[str, ColumnInfo(name: str, tags: List[str], meta: Dict)]

context.current_model.tests
# List[CurrentTest(name: str, modelname: str, column: str, status: str)]

context.current_model.meta
# meta information in the schema.yml
#= {'owner': '@me'}

context.current_model object also has access to test information related to the current model. If the previous dbt command was either test or build, the context.current_model.test property is populated with a list of tests:

context.current_model.tests
#= [CurrentTest(name='not_null', modelname='historical_ozone_levels, column='ds', status='Pass')]

Another relevant property of the current_model is adapter_response. It contains information that was received from the dbt SQL adapter after computing the model:

context.current_model.adapter_response
#= CurrentAdapterResponse(message='SELECT 10', code='SELECT', rows_affected=10)

Read functions​

The familiar dbt functions ref and source are available in fal scripts to read the models and sources as a Pandas DataFrame.

ref function​

The ref function is used exactly like in dbt. You reference a model in your project

# returned as `pandas.DataFrame`
df = ref('model_name')

Or a package model (package first, model second)

df = ref('dbt_artifacts', 'dim_dbt__exposures')

You can use the context variable to the the associated model data

df = ref(context.current_model.name)

source function​

The source function is used exactly like in dbt. You reference a source in your project

# returned as `pandas.DataFrame`
df = source('source_name', 'table_name')

execute_sql function​

You can execute artbitrary SQL from within your Python scripts and get results as pandas DataFrames:

my_df = execute_sql('SELECT * FROM {{ ref("my_model") }}')

As you can see, the query strings support jinja.

Note that the use of ref inside execute_sql does not create a node in a dbt dag. So in the case of Python models, you still need to specify dependencies in a comment at the top of the file. For more details, see here.

list_models function​

You can access model information for all models in the dbt project:

my_models = list_models()

my_models[0].status
# <NodeStatus.Success: 'success'>

my_models[0].name
# 'zendesk_ticket_data'

list_models returns a list of DbtModel objects that contain model and related test information.

list_sources function​

You can access source information for all sources in the dbt project:

my_sources = list_sources()

my_sources[0].name
# 'zendesk_ticket_data'

my_sources[0].tests
# []

list_sources returns a list of DbtSource objects that contain source and related test information.

Write functions​

It is also possible to send data back to your data warehouse. This makes it easy to get the data, process it, and upload it back into dbt territory.

write_to_source function​

You first have to define the source in your schema. This operation appends to the existing source by default and should only be used targetting tables, not views.

# Upload a `pandas.DataFrame` back to the data warehouse
write_to_source(df, 'source_name', 'table_name2')

write_to_source also accepts an optional dtype argument, which lets you specify datatypes of columns. It works the same way as dtype argument for DataFrame.to_sql function.

from sqlalchemy.types import Integer
# Upload but specifically create the `value` column with type `integer`
# Can be useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'value': Integer()})

write_to_model function​

This operation overwrites the existing relation by default and should only be used targetting tables, not views.

For example, if the script is attached to the zendesk_ticket_metrics model,

models:
- name: zendesk_ticket_metrics
meta:
fal:
scripts:
after:
- from_zendesk_ticket_data.py

write_to_model will write to the zendesk_ticket_metrics table:

df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)

# Upload a `pandas.DataFrame` back to the data warehouse
write_to_model(df) # writes to attached model: zendesk_ticket_metrics

NOTE: When used with fal flow run or fal run commands, write_to_model does not accept a model name, it only operates on the associated model.

But when importing fal as a Python module, you have to specify the model to write to:

from fal import FalDbt
faldbt = FalDbt(profiles_dir="~/.dbt", project_dir="../my_project")

faldbt.list_models()
# [
# DbtModel(name='zendesk_ticket_data' ...),
# DbtModel(name='agent_wait_time' ...)
# ]

df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)

faldbt.write_to_model(df, 'zendesk_ticket_metrics') # specify the model

Specifying column types​

The functions write_to_source and write_to_model also accept an optional dtype argument, which lets you specify datatypes of columns. It works the same way as dtype argument for DataFrame.to_sql function.

from sqlalchemy.types import Integer

# Upload but specifically create the `my_col` column with type `integer`
# Can be specially useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'my_col': Integer()})

Modes of writing​

These functions accepts two modes of writing: append and overwrite.

They are passed with the optional mode argument (append is the default value).

# Overwrite the table with the dataframe data, deleting old data
write_to_source(df, 'source_name', 'table_name', mode='overwrite')
write_to_model(df, 'model_name', mode='overwrite') # default mode

# Append more data to the existing table (create it if it does not exist)
write_to_source(df2, 'source_name', 'table_name', mode='append') # default mode
write_to_model(df2, 'model_name', mode='apend')

The append mode​

  1. creates the table if it does not exist yet
  2. insert data into the table

The overwrite mode​

  1. creates a temporal table
  2. insert data into the temporal table
  3. drops the old table if it exists
  4. renames the temporal table to the final table name

meta syntax​

models:
- name: historical_ozone_levels
...
meta:
owner: "@me"
fal:
scripts:
- send_slack_message.py
- another_python_script.py # will be run sequentially

Use the fal and scripts keys underneath the meta config to let fal CLI know where to look for the Python scripts. You can pass a list of scripts as shown above to run one or more scripts as a post-hook operation after a dbt run.