An Airflow operator to call the main function from the dbt-core Python package

Overview

airflow-dbt-python

PyPI version GitHub build status Code style: black

An Airflow operator to call the main function from the dbt-core Python package

Motivation

Airflow running in a managed environment

Although dbt is meant to be installed and used as a CLI, we may not have control of the environment where Airflow is running, disallowing us the option of using dbt as a CLI.

This is exactly what happens when using Amazon's Managed Workflows for Apache Airflow or MWAA: although a list of Python requirements can be passed, the CLI cannot be found in the worker's PATH.

There is a workaround which involves using Airflow's BashOperator and running Python from the command line:

from airflow.operators.bash import BashOperator

BASH_COMMAND = "python -c 'from dbt.main import main; main()' run"
operator = BashOperator(
    task_id="dbt_run",
    bash_command=BASH_COMMAND,
)

But it can get sloppy when appending all potential arguments a dbt run command (or other subcommand) can take.

As you may expect, airflow-dbt-python abstracts the complexity of handling CLI arguments by defining an operator for each dbt subcommand, and having each operator be defined with attribute for each possible CLI argument.

An alternative to airflow-dbt that works without the dbt CLI

The existing airflow-dbt package, by default, would not work if the dbt CLI is not in PATH, which means it would not be usable in MWAA. There is a workaround via the dbt_bin argument, which can be set to "python -c 'from dbt.main import main; main()' run", in similar fashion as the BashOperator example. Yet this approach is not without its limitations:

  • airflow-dbt works by wrapping the dbt CLI, which makes our code dependent on the environment in which it runs.
  • airflow-dbt does not support the full range of arguments a command can take. For example, DbtRunOperator does not have an attribute for fail_fast.
  • airflow-dbt does not return anything after the execution, which no information is available for downstream tasks to pull via XCom. An even if it tried to, since it works by wrapping the CLI, it could only attempt to parse the lines printed by dbt to STDOUT. On the other hand, airflow-dbt-python will try to return the information of a dbt result class, as defined in dbt.contracts.results, which opens up possibilities for downstream tasks to condition their execution on the result of a dbt command.

Avoid installing unnecessary dbt plugins

Finally, airflow-dbt-python does not depend on dbt but on dbt-core. The connectors: dbt-redshift, dbt-postgres, dbt-snowflake, and dbt-bigquery are available as installation extras instead of being bundled up by default, which happens when you attempt to install dbt via python -m pip install dbt.

This allows you to easily control what is installed in your environment. One particular example of when this is extremely useful is in the case of the dbt-snowflake connector, which depends on cryptography. This dependency requires the Rust toolchain to run, and this is not supported in a few distributions (like the one MWAA runs on). Even if that's not the case, airflow-dbt-python results in a lighter installation due to only depending on dbt-core.

Usage

Currently, the following dbt commands are supported:

  • clean
  • compile
  • debug
  • deps
  • ls
  • parse
  • run
  • run-operation
  • seed
  • snapshot
  • source (Not well tested)
  • test

Examples

> dbt_seed >> dbt_run ">
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
    DbtRunOperator,
    DbtSeedOperator,
    DbtTestoperator,
)

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='example_dbt_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
) as dag:
    dbt_test = DbtTestOperator(
        task_id="dbt_test",
        selector="pre-run-tests",
    )

    dbt_seed = DbtSeedOperator(
        task_id="dbt_seed",
        select=["/path/to/first.csv", "/path/to/second.csv"],
        full_refresh=True,
    )

    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        models=["/path/to/models"],
        full_refresh=True,
        fail_fast=True,
    )

    dbt_test >> dbt_seed >> dbt_run

Requirements

airflow-dbt-python is tested in Python 3.7, 3.8, and 3.9, although it could also support older versions.

On the Airflow side, we unit test with versions 1.10.12 and upwards, including the latest version 2 release. Regardless, more testing is planned to ensure compatibility with version 2 of Airflow.

Finally, airflow-dbt-python requires at least dbt version 0.19. Unit tests have verified to pass with version 0.20 after minor changes that should not have major effects anywhere else. Regardless, support for version 0.20 of dbt should be considered experimental.

Installing

From PyPI:

pip install airflow-dbt-python

Any dbt connectors you require may be installed by specifying extras:

pip install airflow-dby-python[snowflake,postgres]

From this repo:

Clone the repo:

git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python

With poetry:

poetry install

Install any extras you need, and only those you need:

poetry install -E postgres -E redshift

Testing

Tests are written using pytest, can be located in test/, and they can be run locally with poetry:

poetry run pytest -vv

License

This project is licensed under the MIT license. See LICENSE.

Comments
  • dbt not writing log to file

    dbt not writing log to file

    It seems that the dbt.log file is not being written inside the temp directory generated by the run.

    Is there any extra config that needs to be done?

    Thanks!

    opened by gvillafanetapia 11
  • Loading dbt from S3 zip file doesn't work

    Loading dbt from S3 zip file doesn't work

    Hello, testing version airflow-dbt-python==0.11.0 on MWAA 2.0.2, it works flawlessly with:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/s3-prefix/dbt/", profiles_dir="s3://s3-bucket/s3-prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    but if I try using a dbt.zip file:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/prefix/dbt/dbt.zip", profiles_dir="s3://s3-bucket/prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    where dbt.zip file structure is:

    d----- 24/01/2022 19:05 analyses d----- 24/01/2022 19:05 data d----- 27/01/2022 14:59 logs d----- 24/01/2022 19:05 macros d----- 01/02/2022 18:42 models d----- 27/01/2022 14:48 packages d----- 24/01/2022 19:05 snapshots d----- 27/01/2022 15:01 target d----- 01/02/2022 18:54 tests -a---- 24/01/2022 19:05 29 .gitignore -a---- 27/01/2022 15:19 1339 dbt_project.yml -a---- 27/01/2022 14:50 88 packages.yml -a---- 24/01/2022 19:05 571 README.md

    I can see these logs:

    [2022-02-07 09:57:04,793] {{dbt.py:263}} INFO - Fetching profiles.yml from S3: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,805] {{s3.py:35}} INFO - Downloading dbt profiles file from: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,806] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:04,849] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,849] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,063] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt_profile/profiles.yml') file to: /tmp/airflowtmpmtcgxrxd/profiles.yml [2022-02-07 09:57:05,093] {{dbt.py:271}} INFO - Fetching dbt project from S3: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{s3.py:85}} INFO - Downloading dbt project files from: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:05,137] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,137] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,222] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt/dbt.zip') file to: /tmp/airflowtmpmtcgxrxd/dbt_project.zip [2022-02-07 09:57:05,411] {{dbt.py:152}} INFO - Running dbt configuration: TestTaskConfig(cls=<class 'dbt.task.test.TestTask'>, project_dir='/tmp/airflowtmpmtcgxrxd/', profiles_dir='/tmp/airflowtmpmtcgxrxd/', profile='dev_profile', target='dev', compiled_target=None, fail_fast=None, single_threaded=None, threads=None, use_experimental_parser=None, vars='{}', warn_error=None, log_format=None, log_cache_events=False, record_timing_info=None, debug=None, defer=None, partial_parse=None, use_colors=None, static_parser=None, version_check=None, send_anonymous_usage_stats=None, write_json=None, exclude=None, select=None, selector_name=None, state=None, models=None, generic=None, indirect_selection=None, singular=None, store_failures=None, which='test') [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05.632049 [info ] [MainThread]: Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{log.py:235}} WARNING - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:06,226] {{functions.py:248}} INFO - 09:57:06.226743 [info ] [MainThread]: Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{log.py:235}} WARNING - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{functions.py:248}} INFO - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06.228139 [info ] [MainThread]: [2022-02-07 09:57:06,228] {{log.py:235}} WARNING - 09:57:06 [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06
    [2022-02-07 09:57:06,228] {{functions.py:250}} WARNING - 09:57:06.228662 [warn ] [MainThread]: [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - 09:57:06 [ [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - WARNING [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - ]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{functions.py:250}} WARNING - 09:57:06 [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,282] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=dbt_python_tests, task_id=dbt_run_daily, execution_date=20220207T095658, start_date=20220207T095704, end_date=20220207T095706 [2022-02-07 09:57:06,310] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check [2022-02-07 09:57:09,764] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:09,764] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.

    But tests (there are 6) and macros (more than 500) are not being retrieved. I tried to perform several tests but got no luck. Did I do something wrong?

    question 
    opened by yarimarchetti 10
  • dbt seed operator only runs successfully on first run after reboot of mwaa

    dbt seed operator only runs successfully on first run after reboot of mwaa

    I'm using this very helpful package to run dbt code on mwaa and have encountered a weird bug where the dbt seed operator only works on the first run after the mwaa environment has been created or updated. Subsequent runs fail with a .csv not found error. Notably, dbt seed appears to be looking in the wrong temp directory on the second run.

    An example of the failure: [Errno 2] No such file or directory: '/tmp/airflowtmpeijb2yn7/data/METRICS_SEED.csv' The temp directory, in this case airflowtmpeijb2yn7, does not match the directory that the "fetch from s3" step has downloaded all the dbt files into. Looking at that log, I can see that the .csv file I wanted was downloaded, along with all the other dbt files into a different subdirectory of tmp.

    • All dbt related files live in s3 buckets
    • dbt deps is run in ci so that it isn't called every time in airflow

    I'm not sure if this is even an issue with this package or a bug with mwaa or dbt, but I thought I'd raise it here first.

    bug 
    opened by samLozier 10
  • dbt_deps question

    dbt_deps question

    @tomasfarias I'm playing around with trying to use the dbt_deps operator to install packages during mwaa run. So far I've been able to get the deps operator to run correctly, but the downstream task, dbt_run will fail due to not being able to find the packages that were just installed? Do I need to hand off the dbt_packages folder via xcom somehow?

    I'm happy to add some documentation for the next person, I'm just hoping to get pointed in the right direction.

    Thanks again for making this library!

    enhancement 
    opened by samLozier 8
  • Not all default arguments are being passed to the DbtBuildOperator

    Not all default arguments are being passed to the DbtBuildOperator

    Hey!

    I noticed that the on_failure_callback from the default argument is not being passed to the DBTBuildOperator as wall as the sla argument.

    I can see from the "Instance Details" these arguments are being passed to my other tasks but not to the airflow_dbt_python operators I bring in. Will try and come up with a PR for the fix, but if you have any guidance on where to start that would be great!

    Thanks

    question 
    opened by marcusintrohive 5
  • fix(s3): Split bucket name and key before uploading

    fix(s3): Split bucket name and key before uploading

    S3 files were uploaded to incorrect keys when running Airflow 2. This was caused by differences between Airflow 1.10 and Airflow 2: the latter assumes that when passing both bucket_name and key that the key is to be taken as it is, where as the former seemed to work even when the key contained the full url.

    Now, we do the parsing and splitting ourselves. We would just set bucket_name to None, however with Airflow 2.0 the upload function checks for the presence of the bucket_name argument, to decide whether to parse the url, not if it's set to None (I think this may be a bug).

    opened by tomasfarias 4
  • Push to S3: malformed URL

    Push to S3: malformed URL

    Hi there,

    I'm testing out airflow-dbt-python==0.12.0 on AWS MWAA. It's working brilliantly with push_dbt_project=False, but fails when trying to push things back to S3 using push_dbt_project=True. I definitely have an IAM policy associated with MWAA that gives the correct PutObject permission for the bucket.

    Looking at my log output, I think there's something wrong with the arguments being passed into load_file_handle_replace_error.

    The log message written by the operator shows the correct target URL:

    [2022-02-21, 03:24:01 UTC] {{dbt.py:258}} INFO - Pushing dbt project back to S3: s3://mwaa-test-bucket/dbt/project/dbt.zip
    

    But when the boto3 S3UploadFailedError occurs later, the error output shows a malformed URL:

    boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/airflowtmptr4_9_un/dbt_project.zip to mwaa-test-bucket/s3://mwaa-test-bucket/dbt/project/dbt.zip: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    

    I had a quick look at the S3 hook code, and I think it may be to do with this bit:

    https://github.com/tomasfarias/airflow-dbt-python/blob/54729271fa215442149bc21e8b301f6317a33157/airflow_dbt_python/hooks/s3.py#L174-L181

    As I understand it, what's happening here is that the full S3 URL is being passed in as the object key, where I think it should just be the object key itself (assuming that the underlying Airflow S3 hook code should be able to construct the final S3 URL using the supplied bucket name and key).

    This probably explains why the logs have a malformed URL - it looks like it's concatenating the bucket name (mwaa-test-bucket/) and key (s3://mwaa-test-bucket/dbt/project/dbt.zip).

    bug 
    opened by sparkysean 4
  • Make apache-airflow not required

    Make apache-airflow not required

    Since we wish to support all MWAA supported versions of Airflow, this requires supporting both 1.10.12 and 2.X releases. However, There are several conflicting dependencies between dbt-core and apache-airflow. None of the conflicts should break us, but they do make dependency resolution impossible, which means airflow-dbt-python cannot be installed.

    For this reason we have removed the dependency to apache-airflow. Assuming that as an Airflow operator we are always installing airflow-dbt-python in an environment that already has Airflow installed, this shouldn't cause any issues in production deployments.

    Moreover, this makes testing multiple Airflow versions easier as we can pre-install apache-airflow.

    opened by tomasfarias 4
  • Error downloading dbt project files from s3

    Error downloading dbt project files from s3

    When passing an S3 URL to project_dir parameter, DbtS3Hook tries to download the root folder (prefix) as the temp dir throwing an exception.

    With project_dir="s3://MYBUCKET.com/dbt"

    ...
    [2021-12-14 20:15:22,510] {{dbt.py:259}} INFO - Fetching dbt project from S3: s3://MYBUCKET.com/dbt/
    [2021-12-14 20:15:22,511] {{s3.py:82}} INFO - Downloading dbt project files from: s3://MYBUCKET.com/dbt/
    ...
    [2021-12-14 20:15:24,950] {{s3.py:58}} INFO - Saving s3.Object(bucket_name='MYBUCKET.com', key='dbt/') file to: /tmp/airflowtmpimiwzxx7
    [2021-12-14 20:15:24,952] {{taskinstance.py:1482}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 147, in execute
        with self.dbt_directory() as dbt_dir:  # type: str
      File "/usr/lib64/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 234, in dbt_directory
        self.prepare_directory(tmp_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 262, in prepare_directory
        tmp_dir,
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 112, in get_dbt_project
        bucket_name, s3_object_keys, local_project_dir, key_prefix
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 131, in download_many_s3_keys
        self.download_one_s3_object(local_project_file, s3_object)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 60, in download_one_s3_object
        with open(target, "wb+") as f:
    IsADirectoryError: [Errno 21] Is a directory: '/tmp/airflowtmpimiwzxx7'
    

    I wasn't able to figure out why airflow_dbt_python/hooks/s3.py, line 98:

    s3_object_keys = self.list_keys(bucket_name=bucket_name, prefix=f"{key_prefix}")
    

    Returns the prefix folder itself (dbt/) as one of the keys.

    Then airflow_dbt_python/hooks/s3.py, line 55, fails to open the file as it is actually a folder:

    with open(target, "wb+") as f:
        s3_object.download_fileobj(f)
    

    By adding the following check after line 112, I was able to workaround the error.

    if s3_object_key == prefix:
        continue
    
    bug 
    opened by tpcarneiro 4
  • MWAA and Environment Variables in profiles.yml

    MWAA and Environment Variables in profiles.yml

    Hello! I found your package tonight and am currently trying to get this set up. After finding workarounds for the below by essentially using the /tmp directory on MWAA, I believe I am on my last issue before successful deploy.

    modules-path
    target_path
    log-path
    

    I am trying to get secrets read in through my profiles.yml file. I have tried entries like the below but to no avail.

    host: "{{ env_var('AIRFLOW__DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW_DB_ENDPOINT') }}"
    host: "{{ env_var('DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW__DB_ENDPOINT', 'not_set') }}"
    

    The last option let me bypass the error temporarily before it failed when building the model. I know this may not be entirely specific to your package, but noticed that you have helped people out before with issues trying to get MWAA running.

    https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html - I was using this somewhat but have not yet actually added anything under custom options. As you can see above, I tried adding AIRFLOW_ and AIRFLOW__ to the beginning but no luck.

    Thought I would reach out and see if you have had luck with something similar. We just switched over to using AWS's secret manager in MWAA and its been working like a charm in our Python tools and it still uses Variable.get() (from Airflows library).

    The typical error I get is:

    Env var required but not provided: 'DB_ENDPOINT'

    but have also gotten this:

    Env var required but not provided: 'DB_ENDPOINT'. This can happen when calling a macro that does not exist. Check for typos and/or install package dependencies with "dbt deps".

    Any help would be appreciated if you have the time!

    opened by lukedappleton 4
  • chore: Update flake8 pre-commit URL to point to GitHub repository

    chore: Update flake8 pre-commit URL to point to GitHub repository

    flake8 has been migrated to GitHub, and the GitLab repository has been deleted.

    For existing contributors, pre-commit still works because a virtualenv with the current configured flake8 version is cached. However, cleaning up the cache and trying to run pre-commit run is enough to reproduce the issue.

    bug 
    opened by adamantike 3
  • Debug logs are default - cannot be turned off

    Debug logs are default - cannot be turned off

    Running on MWAA 2.2.2 I upgraded to 0.15.2 and airflow-redshift==1.2.1. Prior to the upgrade, logs appeared like this:

    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | Concurrency: 4 threads (target='prod')
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 |
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 1 of 15 START table model ***.................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 2 of 15 START table model ****.......................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 3 of 15 START table model ***................. [RUN]
    

    After the upgrade,

    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.678976 [debug] [MainThread]: Partial parsing not enabled
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.691536 [debug] [MainThread]: Parsing macros/generate_schema_name.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.692644 [debug] [MainThread]: Parsing macros/adapters.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.710620 [debug] [MainThread]: Parsing macros/catalog.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718226 [debug] [MainThread]: Parsing macros/relations.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718708 [debug] [MainThread]: Parsing macros/adapters/apply_grants.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719046 [debug] [MainThread]: Parsing macros/utils/datediff.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719490 [debug] [MainThread]: Parsing macros/utils/listagg.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.721693 [debug] [MainThread]: Parsing macros/utils/split_part.sql
    

    Not sure if this is due to dbt or airflow-dbt-python Since the filename is functions.py (previously logging_mixin.py), I assume that this is due to the dbt upgrade. But when I run dbt from the command line, I do not see these debug logs, so I wanted to check.

    I've tried

    config:
        debug: False
    

    in profiles.yml but this doesn't seem to help

    opened by cvlendistry 1
  • connection as a target does not work in MWAA

    connection as a target does not work in MWAA

    Running the example code producing the following on AWS MWAA :

    session = settings.Session()  # type: ignore
    existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()
    
    if existing is None:
        # For illustration purposes, and to keep the example self-contained, we create
        # a Connection using Airflow's ORM. However, any method of loading connections would
        # work, like Airflow's UI, Airflow's CLI, or in deployment scripts.
    
        my_conn = Connection(
            conn_id="my_db_connection",
            conn_type="redshift",
            description="redshift connection",
            host="abc.ch0n7gct0zxb.us-west-2.redshift.amazonaws.com",
            login="dbt_process_user",
            port=5439,
            schema="stage",
            password= get_secret("dev/dbt_process_user")['password'],  # pragma: allowlist secret
            # Other dbt parameters can be added as extras
            extra=json.dumps(dict(threads=4, sslmode="require")),
        )
        session.add(my_conn)
        session.commit()
    with DAG(
        dag_id="dbt_tomasfarias", catchup=False, default_args=default_args, tags=["dbt", "loan tape"], schedule_interval="0 11 * * *"
    ) as dag:
        dbt_run = DbtRunOperator(
            task_id="dbt_run",
            target="my_db_connection",
            #dbt_bin="/usr/local/airflow/.local/bin/dbt",
            profiles_dir=None,
            project_dir="/usr/local/airflow/dags/dbt/etl/",
      
        )
    
    [2022-11-02, 16:11:29 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
        self._execute_task_with_callbacks(context)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
        result = self._execute_task(context, self.task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
        result = execute_callable(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
        config = self.get_dbt_config()
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
        return factory.create_config(**config_kwargs)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
        initialize_config_values(config)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/main.py", line 170, in initialize_config_values
        cfg = read_user_config(parsed.profiles_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 74, in read_user_config
        profile = read_profile(directory)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 50, in read_profile
        path = os.path.join(profiles_dir, 'profiles.yml')
      File "/usr/lib64/python3.7/posixpath.py", line 80, in join
        a = os.fspath(a)
    TypeError: expected str, bytes or os.PathLike object, not NoneType
    
    awaiting response 
    opened by liongitusr 1
  • feature: Support pulling dbt artifacts from XCOM

    feature: Support pulling dbt artifacts from XCOM

    Currently, airflow-dbt-python supports pushing artifacts to XCOM. However, we do not offer pulling artifacts from XCOM. This would be particularly useful for stateful runs, like when using the source_status:fresher+ selector method that came with dbt 1.1.

    enhancement 
    opened by tomasfarias 0
  • Implement Apache Airflow provider's hook interface

    Implement Apache Airflow provider's hook interface

    Not quite an interface as it's not defined in BaseHook, but most Apache Airflow providers define proper connection types for each of their hooks. This may be a chance to refactor the hooks module into individual hook classes for each dbt supported target type. Furthermore, this can be the chance to clean up the docstring documentation as it's missing a lot of information.

    Perhaps we can apply to be integrated into Apache Airflow as an official provider? Currently, a dbt Cloud provider package exists, but nothing for "vanilla" dbt.

    documentation enhancement 
    opened by tomasfarias 0
  • Duplicate log lines

    Duplicate log lines

    The dbt loggers are producing multiple lines of output that are essentially the same, but one of them is the debug output.

    This is caused by both the file logger and the stdout logger being fired off. We tried to clear the handlers from one of them in operators.dbt.BaseDbtOperator.override_dbt_logging but this is not working.

    We should dig into this issue further to avoid verbose logging outputs.

    bug 
    opened by tomasfarias 0
Releases(v0.15.2)
Owner
Tomás Farías Santana
I like writing code and sometimes I'm paid for it. Other times I try to blog about code.
Tomás Farías Santana
India Today Astrology App

India Today Astrology App Introduction This repository contains the code for the Backend setup of the India Today Astrology app as a part of their rec

Pranjal Pratap Dubey 4 May 07, 2022
A python script that changes your desktop background based on current weather and time of the day.

Desktop background wallpaper, based on current weather and time A python script that changes your computer's desktop background based on current weath

Maj Gaberšček 1 Nov 16, 2021
A very simple boarding app with DRF

CRUD project with DRF A very simple boarding app with DRF. About The Project 유저 정보를 갖고 게시판을 다루는 프로젝트 입니다. Version Python: 3.9 DB: PostgreSQL 13 Django

1 Nov 13, 2021
Simple script with AminoLab to send ghost messages

Simple script with AminoLab to send ghost messages

Moleey 1 Nov 22, 2021
Runtime profiler for Streamlit, powered by pyinstrument

streamlit-profiler 🏄🏼 Runtime profiler for Streamlit, powered by pyinstrument. streamlit-profiler is a Streamlit component that helps you find out w

Johannes Rieke 23 Nov 30, 2022
Простенький ботик для троллинга с интерфейсом #Yakima_Visus

Bot-Trolling-Vk Простенький ботик для троллинга с интерфейсом #Yakima_Visus Установка pip install vk_api pip install requests если там еще чото будет

Yakima Visus 4 Oct 11, 2022
Script to calculate the italian fiscal code of a person.

fiscal_code Hi! This is my first public repository, so please be kind if it is not well formatted or it contains errors. I started learning Python abo

FrancescoDiMuro 1 Nov 20, 2021
eyes is a Public Opinion Mining System focusing on taiwanese forums such as PTT, Dcard.

eyes is a Public Opinion Mining System focusing on taiwanese forums such as PTT, Dcard. Features 🔥 Article monitor: helps you capture the trend at a

Sean 116 Dec 29, 2022
Chemical equation balancer

Chemical equation balancer Balance your chemical equations with ease! Installation $ git clone

Marijan Smetko 4 Nov 26, 2022
Python Osmium Examples

Python Osmium Examples This is a set (currently of size 1) of examples showing practical usage of PyOsmium, a thin wrapper around the osmium library.

Martijn van Exel 1 Jan 26, 2022
Nmap script to detect a Microsoft Exchange instance version with OWA enabled.

Nmap script to detect a Microsoft Exchange instance version with OWA enabled.

Luciano Righetti 27 Nov 17, 2022
Lags valorant servers by rapidly picking up and throwing shorties.

Lags valorant servers by rapidly picking up and throwing shorties.

Eric Still 9 Dec 30, 2021
Simple Assembler with python

Assembler with python converts assembly source code to machine code Requirements Python 3 🐍 Usage python main.py [source] [output] [source] : Path t

Amir mohammad 1 Dec 24, 2021
Blender pluggin (python script) that adds a randomly generated tree with random branches and bend orientations

Blender pluggin (python script) that adds a randomly generated tree with random branches and bend orientations

Travis Gruber 2 Dec 24, 2021
A simple and efficient computing package for Genshin Impact gacha analysis

GGanalysisLite计算包 这个版本的计算包追求计算速度,而GGanalysis包有着更多计算功能。 GGanalysisLite包通过卷积计算分布列,通过FFT和快速幂加速卷积计算。 测试玩家得到的排名值rank的数学意义是:与抽了同样数量五星的其他玩家相比,测试玩家花费的抽数大于等于比例

一棵平衡树 34 Nov 26, 2022
Custom SLURM wrapper scripts to make finding job histories and system resource usage more easily accessible

SLURM Wrappers Executables job-history A simple wrapper for grabbing data for completed and running jobs. nodes-busy Developed for the HPC systems at

Sara 2 Dec 13, 2021
this is a basic python project that I made using python

this is a basic python project that I made using python. This project is only for practice because my python skills are still newbie.

Elvira Firmansyah 2 Dec 14, 2022
Xbox-Flood is for flood anything

Intruduction Installation Usage Installing Python 3 Wiki Getting Started Creating a Key Intruduction Xbox-Flood is for flooding messages (invitations

kayake 4 Feb 18, 2022
This repository contains all the data analytics projects that I've worked on in python.

93_Python_Data_Analytics_Projects This repository contains all the data analytics projects that I've worked on in python. No. Name 01 001_Cervical_Can

Milaan Parmar / Милан пармар / _米兰 帕尔马 267 Jan 06, 2023
A clock widget for linux ez to use no need for cmd line ;)

A clock widget in LINUX A clock widget for linux ez to use no need for cmd line ;) How to install? oh its ez just go to realese! what are the paltform

1 Feb 15, 2022