Variables and functions
Inside a Python script, you get access to some useful variables and functions.
context
Variable
context
is an object with information about the current script context.
context.current_model
This propery holds information relevant to the model, which is associated with the running script. For the meta
Syntax example, we would get the following:
context.current_model.name
# str
#= historical_ozone_levels
context.current_model.status
# NodeStatus, enum: 'success' | 'error' | 'skipped'
context.current_model.columns
# Dict[str, ColumnInfo(name: str, tags: List[str], meta: Dict)]
context.current_model.tests
# List[CurrentTest(name: str, modelname: str, column: str, status: str)]
context.current_model.meta
# meta information in the schema.yml
#= {'owner': '@me'}
context.current_model
object also has access to test information related to the current model. If the previous dbt command was either test
or build
, the context.current_model.test
property is populated with a list of tests:
context.current_model.tests
#= [CurrentTest(name='not_null', modelname='historical_ozone_levels, column='ds', status='Pass')]
Another relevant property of the current_model
is adapter_response
. It contains information that was received from the dbt SQL adapter after computing the model:
context.current_model.adapter_response
#= CurrentAdapterResponse(message='SELECT 10', code='SELECT', rows_affected=10)
Read functions
The familiar dbt functions ref
and source
are available in fal scripts to read the models and sources as a Pandas DataFrame.
ref
function
The ref
function is used exactly like in dbt
. You reference a model in your project
# returned as `pandas.DataFrame`
df = ref('model_name')
Or a package model (package first, model second)
df = ref('dbt_artifacts', 'dim_dbt__exposures')
You can use the context variable to the the associated model data
df = ref(context.current_model.name)
source
function
The source
function is used exactly like in dbt
. You reference a source in your project
# returned as `pandas.DataFrame`
df = source('source_name', 'table_name')
execute_sql
function
You can execute artbitrary SQL from within your Python scripts and get results as pandas DataFrames:
my_df = execute_sql('SELECT * FROM {{ ref("my_model") }}')
As you can see, the query strings support jinja.
list_models
function
You can access model information for all models in the dbt project:
my_models = list_models()
my_models[0].status
# <NodeStatus.Success: 'success'>
my_models[0].name
# 'zendesk_ticket_data'
list_models
returns a list of DbtModel
objects that contain model and related test information.
list_sources
function
You can access source information for all sources in the dbt project:
my_sources = list_sources()
my_sources[0].name
# 'zendesk_ticket_data'
my_sources[0].tests
# []
list_sources
returns a list of DbtSource
objects that contain source and related test information.
Write functions
It is also possible to send data back to your data warehouse. This makes it easy to get the data, process it, and upload it back into dbt territory.
write_to_source
function
You first have to define the source in your schema. This operation appends to the existing source by default and should only be used targetting tables, not views.
# Upload a `pandas.DataFrame` back to the data warehouse
write_to_source(df, 'source_name', 'table_name2')
write_to_source
also accepts an optional dtype
argument, which lets you specify datatypes of columns. It works the same way as dtype
argument for DataFrame.to_sql
function.
from sqlalchemy.types import Integer
# Upload but specifically create the `value` column with type `integer`
# Can be useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'value': Integer()})
write_to_model
function
This operation overwrites the existing relation by default and should only be used targetting tables, not views.
For example, if the script is attached to the zendesk_ticket_metrics
model,
models:
- name: zendesk_ticket_metrics
meta:
fal:
scripts:
after:
- from_zendesk_ticket_data.py
write_to_model
will write to the zendesk_ticket_metrics
table:
df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)
# Upload a `pandas.DataFrame` back to the data warehouse
write_to_model(df) # writes to attached model: zendesk_ticket_metrics
NOTE: When used with
fal flow run
orfal run
commands,write_to_model
does not accept a model name, it only operates on the associated model.
But when importing fal
as a Python module, you have to specify the model to write to:
from fal import FalDbt
faldbt = FalDbt(profiles_dir="~/.dbt", project_dir="../my_project")
faldbt.list_models()
# [
# DbtModel(name='zendesk_ticket_data' ...),
# DbtModel(name='agent_wait_time' ...)
# ]
df = faldbt.ref('stg_zendesk_ticket_data')
df = add_zendesk_metrics_info(df)
faldbt.write_to_model(df, 'zendesk_ticket_metrics') # specify the model
Specifying column types
The functions write_to_source
and write_to_model
also accept an optional dtype
argument, which lets you specify datatypes of columns.
It works the same way as dtype
argument for DataFrame.to_sql
function.
from sqlalchemy.types import Integer
# Upload but specifically create the `my_col` column with type `integer`
# Can be specially useful if data has `None` values
write_to_source(df, 'source', 'table', dtype={'my_col': Integer()})
Modes of writing
These functions accepts two modes of writing: append
and overwrite
.
They are passed with the optional mode
argument (append
is the default value).
# Overwrite the table with the dataframe data, deleting old data
write_to_source(df, 'source_name', 'table_name', mode='overwrite')
write_to_model(df, 'model_name', mode='overwrite') # default mode
# Append more data to the existing table (create it if it does not exist)
write_to_source(df2, 'source_name', 'table_name', mode='append') # default mode
write_to_model(df2, 'model_name', mode='apend')
The append
mode
- creates the table if it does not exist yet
- insert data into the table
The overwrite
mode
- creates a temporal table
- insert data into the temporal table
- drops the old table if it exists
- renames the temporal table to the final table name
meta
syntax
models:
- name: historical_ozone_levels
...
meta:
owner: "@me"
fal:
scripts:
- send_slack_message.py
- another_python_script.py # will be run sequentially
Use the fal
and scripts
keys underneath the meta
config to let fal CLI know where to look for the Python scripts.
You can pass a list of scripts as shown above to run one or more scripts as a post-hook operation after a dbt run.