An Airflow operator to call the main function from the dbt-core Python package

Overview

airflow-dbt-python

PyPI version GitHub build status Code style: black

An Airflow operator to call the main function from the dbt-core Python package

Motivation

Airflow running in a managed environment

Although dbt is meant to be installed and used as a CLI, we may not have control of the environment where Airflow is running, disallowing us the option of using dbt as a CLI.

This is exactly what happens when using Amazon's Managed Workflows for Apache Airflow or MWAA: although a list of Python requirements can be passed, the CLI cannot be found in the worker's PATH.

There is a workaround which involves using Airflow's BashOperator and running Python from the command line:

from airflow.operators.bash import BashOperator

BASH_COMMAND = "python -c 'from dbt.main import main; main()' run"
operator = BashOperator(
    task_id="dbt_run",
    bash_command=BASH_COMMAND,
)

But it can get sloppy when appending all potential arguments a dbt run command (or other subcommand) can take.

As you may expect, airflow-dbt-python abstracts the complexity of handling CLI arguments by defining an operator for each dbt subcommand, and having each operator be defined with attribute for each possible CLI argument.

An alternative to airflow-dbt that works without the dbt CLI

The existing airflow-dbt package, by default, would not work if the dbt CLI is not in PATH, which means it would not be usable in MWAA. There is a workaround via the dbt_bin argument, which can be set to "python -c 'from dbt.main import main; main()' run", in similar fashion as the BashOperator example. Yet this approach is not without its limitations:

  • airflow-dbt works by wrapping the dbt CLI, which makes our code dependent on the environment in which it runs.
  • airflow-dbt does not support the full range of arguments a command can take. For example, DbtRunOperator does not have an attribute for fail_fast.
  • airflow-dbt does not return anything after the execution, which no information is available for downstream tasks to pull via XCom. An even if it tried to, since it works by wrapping the CLI, it could only attempt to parse the lines printed by dbt to STDOUT. On the other hand, airflow-dbt-python will try to return the information of a dbt result class, as defined in dbt.contracts.results, which opens up possibilities for downstream tasks to condition their execution on the result of a dbt command.

Avoid installing unnecessary dbt plugins

Finally, airflow-dbt-python does not depend on dbt but on dbt-core. The connectors: dbt-redshift, dbt-postgres, dbt-snowflake, and dbt-bigquery are available as installation extras instead of being bundled up by default, which happens when you attempt to install dbt via python -m pip install dbt.

This allows you to easily control what is installed in your environment. One particular example of when this is extremely useful is in the case of the dbt-snowflake connector, which depends on cryptography. This dependency requires the Rust toolchain to run, and this is not supported in a few distributions (like the one MWAA runs on). Even if that's not the case, airflow-dbt-python results in a lighter installation due to only depending on dbt-core.

Usage

Currently, the following dbt commands are supported:

  • clean
  • compile
  • debug
  • deps
  • ls
  • parse
  • run
  • run-operation
  • seed
  • snapshot
  • source (Not well tested)
  • test

Examples

> dbt_seed >> dbt_run ">
from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_dbt_python.operators.dbt import (
    DbtRunOperator,
    DbtSeedOperator,
    DbtTestoperator,
)

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='example_dbt_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=60),
    tags=['example', 'example2'],
) as dag:
    dbt_test = DbtTestOperator(
        task_id="dbt_test",
        selector="pre-run-tests",
    )

    dbt_seed = DbtSeedOperator(
        task_id="dbt_seed",
        select=["/path/to/first.csv", "/path/to/second.csv"],
        full_refresh=True,
    )

    dbt_run = DbtRunOperator(
        task_id="dbt_run",
        models=["/path/to/models"],
        full_refresh=True,
        fail_fast=True,
    )

    dbt_test >> dbt_seed >> dbt_run

Requirements

airflow-dbt-python is tested in Python 3.7, 3.8, and 3.9, although it could also support older versions.

On the Airflow side, we unit test with versions 1.10.12 and upwards, including the latest version 2 release. Regardless, more testing is planned to ensure compatibility with version 2 of Airflow.

Finally, airflow-dbt-python requires at least dbt version 0.19. Unit tests have verified to pass with version 0.20 after minor changes that should not have major effects anywhere else. Regardless, support for version 0.20 of dbt should be considered experimental.

Installing

From PyPI:

pip install airflow-dbt-python

Any dbt connectors you require may be installed by specifying extras:

pip install airflow-dby-python[snowflake,postgres]

From this repo:

Clone the repo:

git clone https://github.com/tomasfarias/airflow-dbt-python.git
cd airflow-dbt-python

With poetry:

poetry install

Install any extras you need, and only those you need:

poetry install -E postgres -E redshift

Testing

Tests are written using pytest, can be located in test/, and they can be run locally with poetry:

poetry run pytest -vv

License

This project is licensed under the MIT license. See LICENSE.

Comments
  • dbt not writing log to file

    dbt not writing log to file

    It seems that the dbt.log file is not being written inside the temp directory generated by the run.

    Is there any extra config that needs to be done?

    Thanks!

    opened by gvillafanetapia 11
  • Loading dbt from S3 zip file doesn't work

    Loading dbt from S3 zip file doesn't work

    Hello, testing version airflow-dbt-python==0.11.0 on MWAA 2.0.2, it works flawlessly with:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/s3-prefix/dbt/", profiles_dir="s3://s3-bucket/s3-prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    but if I try using a dbt.zip file:

    dbt_test = DbtTestOperator( task_id="dbt_run_daily", project_dir="s3://s3-bucket/prefix/dbt/dbt.zip", profiles_dir="s3://s3-bucket/prefix/dbt_profile", target="dev", profile="dev_profile", do_xcom_push_artifacts=["run_results.json"], )

    where dbt.zip file structure is:

    d----- 24/01/2022 19:05 analyses d----- 24/01/2022 19:05 data d----- 27/01/2022 14:59 logs d----- 24/01/2022 19:05 macros d----- 01/02/2022 18:42 models d----- 27/01/2022 14:48 packages d----- 24/01/2022 19:05 snapshots d----- 27/01/2022 15:01 target d----- 01/02/2022 18:54 tests -a---- 24/01/2022 19:05 29 .gitignore -a---- 27/01/2022 15:19 1339 dbt_project.yml -a---- 27/01/2022 14:50 88 packages.yml -a---- 24/01/2022 19:05 571 README.md

    I can see these logs:

    [2022-02-07 09:57:04,793] {{dbt.py:263}} INFO - Fetching profiles.yml from S3: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,805] {{s3.py:35}} INFO - Downloading dbt profiles file from: s3://s3-bucket/s3-prefix/dbt_profile [2022-02-07 09:57:04,806] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:04,840] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,840] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:04,849] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:04,849] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,063] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt_profile/profiles.yml') file to: /tmp/airflowtmpmtcgxrxd/profiles.yml [2022-02-07 09:57:05,093] {{dbt.py:271}} INFO - Fetching dbt project from S3: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{s3.py:85}} INFO - Downloading dbt project files from: s3://s3-bucket/s3-prefix/dbt/dbt.zip [2022-02-07 09:57:05,094] {{base_aws.py:368}} INFO - Airflow Connection: aws_conn_id=aws_default [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:179}} INFO - No credentials retrieved from Connection [2022-02-07 09:57:05,127] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,127] {{base_aws.py:87}} INFO - Creating session with aws_access_key_id=None region_name=None [2022-02-07 09:57:05,137] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:05,137] {{base_aws.py:157}} INFO - role_arn is None [2022-02-07 09:57:05,222] {{s3.py:53}} INFO - Saving s3.Object(bucket_name='s3-bucket', key='s3-prefix/dbt/dbt.zip') file to: /tmp/airflowtmpmtcgxrxd/dbt_project.zip [2022-02-07 09:57:05,411] {{dbt.py:152}} INFO - Running dbt configuration: TestTaskConfig(cls=<class 'dbt.task.test.TestTask'>, project_dir='/tmp/airflowtmpmtcgxrxd/', profiles_dir='/tmp/airflowtmpmtcgxrxd/', profile='dev_profile', target='dev', compiled_target=None, fail_fast=None, single_threaded=None, threads=None, use_experimental_parser=None, vars='{}', warn_error=None, log_format=None, log_cache_events=False, record_timing_info=None, debug=None, defer=None, partial_parse=None, use_colors=None, static_parser=None, version_check=None, send_anonymous_usage_stats=None, write_json=None, exclude=None, select=None, selector_name=None, state=None, models=None, generic=None, indirect_selection=None, singular=None, store_failures=None, which='test') [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05.632049 [info ] [MainThread]: Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{log.py:235}} WARNING - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:05,632] {{functions.py:248}} INFO - 09:57:05 Partial parse save file not found. Starting full parse. [2022-02-07 09:57:06,226] {{functions.py:248}} INFO - 09:57:06.226743 [info ] [MainThread]: Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{log.py:235}} WARNING - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,227] {{functions.py:248}} INFO - 09:57:06 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 191 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06.228139 [info ] [MainThread]: [2022-02-07 09:57:06,228] {{log.py:235}} WARNING - 09:57:06 [2022-02-07 09:57:06,228] {{functions.py:248}} INFO - 09:57:06
    [2022-02-07 09:57:06,228] {{functions.py:250}} WARNING - 09:57:06.228662 [warn ] [MainThread]: [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - 09:57:06 [ [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - WARNING [2022-02-07 09:57:06,229] {{log.py:235}} WARNING - ]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,229] {{functions.py:250}} WARNING - 09:57:06 [WARNING]: Nothing to do. Try checking your model configs and model specification args [2022-02-07 09:57:06,282] {{taskinstance.py:1192}} INFO - Marking task as SUCCESS. dag_id=dbt_python_tests, task_id=dbt_run_daily, execution_date=20220207T095658, start_date=20220207T095704, end_date=20220207T095706 [2022-02-07 09:57:06,310] {{taskinstance.py:1246}} INFO - 1 downstream tasks scheduled from follow-on schedule check [2022-02-07 09:57:09,764] {{logging_mixin.py:104}} INFO - [2022-02-07 09:57:09,764] {{local_task_job.py:188}} WARNING - State of this instance has been externally set to success. Terminating instance.

    But tests (there are 6) and macros (more than 500) are not being retrieved. I tried to perform several tests but got no luck. Did I do something wrong?

    question 
    opened by yarimarchetti 10
  • dbt seed operator only runs successfully on first run after reboot of mwaa

    dbt seed operator only runs successfully on first run after reboot of mwaa

    I'm using this very helpful package to run dbt code on mwaa and have encountered a weird bug where the dbt seed operator only works on the first run after the mwaa environment has been created or updated. Subsequent runs fail with a .csv not found error. Notably, dbt seed appears to be looking in the wrong temp directory on the second run.

    An example of the failure: [Errno 2] No such file or directory: '/tmp/airflowtmpeijb2yn7/data/METRICS_SEED.csv' The temp directory, in this case airflowtmpeijb2yn7, does not match the directory that the "fetch from s3" step has downloaded all the dbt files into. Looking at that log, I can see that the .csv file I wanted was downloaded, along with all the other dbt files into a different subdirectory of tmp.

    • All dbt related files live in s3 buckets
    • dbt deps is run in ci so that it isn't called every time in airflow

    I'm not sure if this is even an issue with this package or a bug with mwaa or dbt, but I thought I'd raise it here first.

    bug 
    opened by samLozier 10
  • dbt_deps question

    dbt_deps question

    @tomasfarias I'm playing around with trying to use the dbt_deps operator to install packages during mwaa run. So far I've been able to get the deps operator to run correctly, but the downstream task, dbt_run will fail due to not being able to find the packages that were just installed? Do I need to hand off the dbt_packages folder via xcom somehow?

    I'm happy to add some documentation for the next person, I'm just hoping to get pointed in the right direction.

    Thanks again for making this library!

    enhancement 
    opened by samLozier 8
  • Not all default arguments are being passed to the DbtBuildOperator

    Not all default arguments are being passed to the DbtBuildOperator

    Hey!

    I noticed that the on_failure_callback from the default argument is not being passed to the DBTBuildOperator as wall as the sla argument.

    I can see from the "Instance Details" these arguments are being passed to my other tasks but not to the airflow_dbt_python operators I bring in. Will try and come up with a PR for the fix, but if you have any guidance on where to start that would be great!

    Thanks

    question 
    opened by marcusintrohive 5
  • fix(s3): Split bucket name and key before uploading

    fix(s3): Split bucket name and key before uploading

    S3 files were uploaded to incorrect keys when running Airflow 2. This was caused by differences between Airflow 1.10 and Airflow 2: the latter assumes that when passing both bucket_name and key that the key is to be taken as it is, where as the former seemed to work even when the key contained the full url.

    Now, we do the parsing and splitting ourselves. We would just set bucket_name to None, however with Airflow 2.0 the upload function checks for the presence of the bucket_name argument, to decide whether to parse the url, not if it's set to None (I think this may be a bug).

    opened by tomasfarias 4
  • Push to S3: malformed URL

    Push to S3: malformed URL

    Hi there,

    I'm testing out airflow-dbt-python==0.12.0 on AWS MWAA. It's working brilliantly with push_dbt_project=False, but fails when trying to push things back to S3 using push_dbt_project=True. I definitely have an IAM policy associated with MWAA that gives the correct PutObject permission for the bucket.

    Looking at my log output, I think there's something wrong with the arguments being passed into load_file_handle_replace_error.

    The log message written by the operator shows the correct target URL:

    [2022-02-21, 03:24:01 UTC] {{dbt.py:258}} INFO - Pushing dbt project back to S3: s3://mwaa-test-bucket/dbt/project/dbt.zip
    

    But when the boto3 S3UploadFailedError occurs later, the error output shows a malformed URL:

    boto3.exceptions.S3UploadFailedError: Failed to upload /tmp/airflowtmptr4_9_un/dbt_project.zip to mwaa-test-bucket/s3://mwaa-test-bucket/dbt/project/dbt.zip: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    

    I had a quick look at the S3 hook code, and I think it may be to do with this bit:

    https://github.com/tomasfarias/airflow-dbt-python/blob/54729271fa215442149bc21e8b301f6317a33157/airflow_dbt_python/hooks/s3.py#L174-L181

    As I understand it, what's happening here is that the full S3 URL is being passed in as the object key, where I think it should just be the object key itself (assuming that the underlying Airflow S3 hook code should be able to construct the final S3 URL using the supplied bucket name and key).

    This probably explains why the logs have a malformed URL - it looks like it's concatenating the bucket name (mwaa-test-bucket/) and key (s3://mwaa-test-bucket/dbt/project/dbt.zip).

    bug 
    opened by sparkysean 4
  • Make apache-airflow not required

    Make apache-airflow not required

    Since we wish to support all MWAA supported versions of Airflow, this requires supporting both 1.10.12 and 2.X releases. However, There are several conflicting dependencies between dbt-core and apache-airflow. None of the conflicts should break us, but they do make dependency resolution impossible, which means airflow-dbt-python cannot be installed.

    For this reason we have removed the dependency to apache-airflow. Assuming that as an Airflow operator we are always installing airflow-dbt-python in an environment that already has Airflow installed, this shouldn't cause any issues in production deployments.

    Moreover, this makes testing multiple Airflow versions easier as we can pre-install apache-airflow.

    opened by tomasfarias 4
  • Error downloading dbt project files from s3

    Error downloading dbt project files from s3

    When passing an S3 URL to project_dir parameter, DbtS3Hook tries to download the root folder (prefix) as the temp dir throwing an exception.

    With project_dir="s3://MYBUCKET.com/dbt"

    ...
    [2021-12-14 20:15:22,510] {{dbt.py:259}} INFO - Fetching dbt project from S3: s3://MYBUCKET.com/dbt/
    [2021-12-14 20:15:22,511] {{s3.py:82}} INFO - Downloading dbt project files from: s3://MYBUCKET.com/dbt/
    ...
    [2021-12-14 20:15:24,950] {{s3.py:58}} INFO - Saving s3.Object(bucket_name='MYBUCKET.com', key='dbt/') file to: /tmp/airflowtmpimiwzxx7
    [2021-12-14 20:15:24,952] {{taskinstance.py:1482}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 147, in execute
        with self.dbt_directory() as dbt_dir:  # type: str
      File "/usr/lib64/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 234, in dbt_directory
        self.prepare_directory(tmp_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 262, in prepare_directory
        tmp_dir,
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 112, in get_dbt_project
        bucket_name, s3_object_keys, local_project_dir, key_prefix
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 131, in download_many_s3_keys
        self.download_one_s3_object(local_project_file, s3_object)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/s3.py", line 60, in download_one_s3_object
        with open(target, "wb+") as f:
    IsADirectoryError: [Errno 21] Is a directory: '/tmp/airflowtmpimiwzxx7'
    

    I wasn't able to figure out why airflow_dbt_python/hooks/s3.py, line 98:

    s3_object_keys = self.list_keys(bucket_name=bucket_name, prefix=f"{key_prefix}")
    

    Returns the prefix folder itself (dbt/) as one of the keys.

    Then airflow_dbt_python/hooks/s3.py, line 55, fails to open the file as it is actually a folder:

    with open(target, "wb+") as f:
        s3_object.download_fileobj(f)
    

    By adding the following check after line 112, I was able to workaround the error.

    if s3_object_key == prefix:
        continue
    
    bug 
    opened by tpcarneiro 4
  • MWAA and Environment Variables in profiles.yml

    MWAA and Environment Variables in profiles.yml

    Hello! I found your package tonight and am currently trying to get this set up. After finding workarounds for the below by essentially using the /tmp directory on MWAA, I believe I am on my last issue before successful deploy.

    modules-path
    target_path
    log-path
    

    I am trying to get secrets read in through my profiles.yml file. I have tried entries like the below but to no avail.

    host: "{{ env_var('AIRFLOW__DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW_DB_ENDPOINT') }}"
    host: "{{ env_var('DB_ENDPOINT') }}"
    host: "{{ env_var('AIRFLOW__DB_ENDPOINT', 'not_set') }}"
    

    The last option let me bypass the error temporarily before it failed when building the model. I know this may not be entirely specific to your package, but noticed that you have helped people out before with issues trying to get MWAA running.

    https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html - I was using this somewhat but have not yet actually added anything under custom options. As you can see above, I tried adding AIRFLOW_ and AIRFLOW__ to the beginning but no luck.

    Thought I would reach out and see if you have had luck with something similar. We just switched over to using AWS's secret manager in MWAA and its been working like a charm in our Python tools and it still uses Variable.get() (from Airflows library).

    The typical error I get is:

    Env var required but not provided: 'DB_ENDPOINT'

    but have also gotten this:

    Env var required but not provided: 'DB_ENDPOINT'. This can happen when calling a macro that does not exist. Check for typos and/or install package dependencies with "dbt deps".

    Any help would be appreciated if you have the time!

    opened by lukedappleton 4
  • chore: Update flake8 pre-commit URL to point to GitHub repository

    chore: Update flake8 pre-commit URL to point to GitHub repository

    flake8 has been migrated to GitHub, and the GitLab repository has been deleted.

    For existing contributors, pre-commit still works because a virtualenv with the current configured flake8 version is cached. However, cleaning up the cache and trying to run pre-commit run is enough to reproduce the issue.

    bug 
    opened by adamantike 3
  • Debug logs are default - cannot be turned off

    Debug logs are default - cannot be turned off

    Running on MWAA 2.2.2 I upgraded to 0.15.2 and airflow-redshift==1.2.1. Prior to the upgrade, logs appeared like this:

    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | Concurrency: 4 threads (target='prod')
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 |
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 1 of 15 START table model ***.................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 2 of 15 START table model ****.......................... [RUN]
    [2022-12-01, 11:00:09 UTC] {{logging_mixin.py:109}} INFO - 11:00:09 | 3 of 15 START table model ***................. [RUN]
    

    After the upgrade,

    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.678976 [debug] [MainThread]: Partial parsing not enabled
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.691536 [debug] [MainThread]: Parsing macros/generate_schema_name.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.692644 [debug] [MainThread]: Parsing macros/adapters.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.710620 [debug] [MainThread]: Parsing macros/catalog.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718226 [debug] [MainThread]: Parsing macros/relations.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.718708 [debug] [MainThread]: Parsing macros/adapters/apply_grants.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719046 [debug] [MainThread]: Parsing macros/utils/datediff.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.719490 [debug] [MainThread]: Parsing macros/utils/listagg.sql
    [2022-12-02, 05:48:58 UTC] {{functions.py:230}} DEBUG - 05:48:58.721693 [debug] [MainThread]: Parsing macros/utils/split_part.sql
    

    Not sure if this is due to dbt or airflow-dbt-python Since the filename is functions.py (previously logging_mixin.py), I assume that this is due to the dbt upgrade. But when I run dbt from the command line, I do not see these debug logs, so I wanted to check.

    I've tried

    config:
        debug: False
    

    in profiles.yml but this doesn't seem to help

    opened by cvlendistry 1
  • connection as a target does not work in MWAA

    connection as a target does not work in MWAA

    Running the example code producing the following on AWS MWAA :

    session = settings.Session()  # type: ignore
    existing = session.query(Connection).filter_by(conn_id="my_db_connection").first()
    
    if existing is None:
        # For illustration purposes, and to keep the example self-contained, we create
        # a Connection using Airflow's ORM. However, any method of loading connections would
        # work, like Airflow's UI, Airflow's CLI, or in deployment scripts.
    
        my_conn = Connection(
            conn_id="my_db_connection",
            conn_type="redshift",
            description="redshift connection",
            host="abc.ch0n7gct0zxb.us-west-2.redshift.amazonaws.com",
            login="dbt_process_user",
            port=5439,
            schema="stage",
            password= get_secret("dev/dbt_process_user")['password'],  # pragma: allowlist secret
            # Other dbt parameters can be added as extras
            extra=json.dumps(dict(threads=4, sslmode="require")),
        )
        session.add(my_conn)
        session.commit()
    with DAG(
        dag_id="dbt_tomasfarias", catchup=False, default_args=default_args, tags=["dbt", "loan tape"], schedule_interval="0 11 * * *"
    ) as dag:
        dbt_run = DbtRunOperator(
            task_id="dbt_run",
            target="my_db_connection",
            #dbt_bin="/usr/local/airflow/.local/bin/dbt",
            profiles_dir=None,
            project_dir="/usr/local/airflow/dags/dbt/etl/",
      
        )
    
    [2022-11-02, 16:11:29 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
        self._execute_task_with_callbacks(context)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
        result = self._execute_task(context, self.task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
        result = execute_callable(context=context)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 140, in execute
        config = self.get_dbt_config()
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/operators/dbt.py", line 185, in get_dbt_config
        return factory.create_config(**config_kwargs)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_dbt_python/hooks/dbt.py", line 374, in create_config
        initialize_config_values(config)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/main.py", line 170, in initialize_config_values
        cfg = read_user_config(parsed.profiles_dir)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 74, in read_user_config
        profile = read_profile(directory)
      File "/usr/local/airflow/.local/lib/python3.7/site-packages/dbt/config/profile.py", line 50, in read_profile
        path = os.path.join(profiles_dir, 'profiles.yml')
      File "/usr/lib64/python3.7/posixpath.py", line 80, in join
        a = os.fspath(a)
    TypeError: expected str, bytes or os.PathLike object, not NoneType
    
    awaiting response 
    opened by liongitusr 1
  • feature: Support pulling dbt artifacts from XCOM

    feature: Support pulling dbt artifacts from XCOM

    Currently, airflow-dbt-python supports pushing artifacts to XCOM. However, we do not offer pulling artifacts from XCOM. This would be particularly useful for stateful runs, like when using the source_status:fresher+ selector method that came with dbt 1.1.

    enhancement 
    opened by tomasfarias 0
  • Implement Apache Airflow provider's hook interface

    Implement Apache Airflow provider's hook interface

    Not quite an interface as it's not defined in BaseHook, but most Apache Airflow providers define proper connection types for each of their hooks. This may be a chance to refactor the hooks module into individual hook classes for each dbt supported target type. Furthermore, this can be the chance to clean up the docstring documentation as it's missing a lot of information.

    Perhaps we can apply to be integrated into Apache Airflow as an official provider? Currently, a dbt Cloud provider package exists, but nothing for "vanilla" dbt.

    documentation enhancement 
    opened by tomasfarias 0
  • Duplicate log lines

    Duplicate log lines

    The dbt loggers are producing multiple lines of output that are essentially the same, but one of them is the debug output.

    This is caused by both the file logger and the stdout logger being fired off. We tried to clear the handlers from one of them in operators.dbt.BaseDbtOperator.override_dbt_logging but this is not working.

    We should dig into this issue further to avoid verbose logging outputs.

    bug 
    opened by tomasfarias 0
Releases(v0.15.2)
Owner
Tomás Farías Santana
I like writing code and sometimes I'm paid for it. Other times I try to blog about code.
Tomás Farías Santana
Github dorking tool

gh-dork Supply a list of dorks and, optionally, one of the following: a user (-u) a file with a list of users (-uf) an organization (-org) a file with

Molly White 119 Dec 21, 2022
Curses frontend for Canto daemon

Canto Curses The curses (text) client for canto-daemon. Canto-daemon is required to work and is found at: http://github.com/themoken/canto-next Requir

Jack Miller 86 Dec 28, 2022
1cak - An Indonesian web that provide lot of fun.

An unofficial API of 1cak.com 1cak - An Indonesian web that provide lot of fun. Endpoint Lol - 10 Recent stored posts on database Example: https://on

Dicky Mulia Fiqri 5 Sep 27, 2022
【幼盾】个性化图片徽章服务!

【幼盾】个性化图片徽章服务! 你对方形的徽章感到无聊了吗?想要定制属于自己的开源项目徽章了吗? 快来使用unv-shield吧! unv-shield提供包含自定义图片的徽章服务,可以让你的项目主页更加个性化!

黄巍 130 Dec 23, 2022
Hydralit package is a wrapping and template project to combine multiple independant Streamlit applications into a multi-page application.

Hydralit The Hydralit package is a wrapping and template project to combine multiple independant (or somewhat dependant) Streamlit applications into a

Jackson Storm 108 Jan 08, 2023
One Ansible Module for using LINE notify API to send notification. It can be required in the collection list.

Ansible Collection - hazel_shen.line_notify Documentation for the collection. ansible-galaxy collection install hazel_shen.line_notify --ignore-certs

Hazel Shen 4 Jul 19, 2021
A web-based analysis toolkit for the System Usability Scale providing calculation, plotting, interpretation and contextualization utility

System Usability Scale Analysis Toolkit The System Usability Scale (SUS) Analysis Toolkit is a web-based python application that provides a compilatio

Jonas Blattgerste 3 Oct 27, 2022
Pygments is a generic syntax highlighter written in Python

Welcome to Pygments This is the source of Pygments. It is a generic syntax highlighter written in Python that supports over 500 languages and text for

1.2k Jan 06, 2023
the classic version Of torrentleechx #Unmaintained #Archived

TorrentleechX-Classic Old Modified Version Repo #Unmaintained #Archived for support join here working example group Leech Here For Any Issues/Imroveme

XcodersHub 18 Jan 30, 2022
A tool to nowcast quarterly data with monthly indicators: US consumption example

MIDAS_Nowcaster A tool to nowcast quarterly data with monthly indicators: US consumption example Pulls data directly from FRED from a list of codes -

Gene Kindberg-Hanlon 3 Oct 06, 2022
CD for MachineLearnia

Codebase supporting my talk on CI/CD for MachineLearnia (Nov 12 2021) The dataset used is available here. The point of the talk is to demonstrate a si

0 Feb 23, 2022
Google Fit Sensor Component

Google Fit Sensor Component

Ivan Vojtko 21 Dec 20, 2022
A very terrible python-based programming language that uses folders instead of text files

PYFolders by Lewis L. Foster PYFolders is a very terrible python-based programming language that uses folders instead of regular text files. In this r

Lewis L. Foster 5 Jan 08, 2022
To effectively detect the faulty wafers

wafer_fault_detection Aim of the project: In electronics, a wafer (also called a slice or substrate) is a thin slice of semiconductor, such as crystal

Arun Singh Babal 1 Nov 06, 2021
This is a Fava extension to display a grouped portfolio view in Fava for a set of Beancount accounts.

Fava Portfolio Summary This is a Fava extension to display a grouped portfolio view in Fava for a set of Beancount accounts. It can also calculate MWR

18 Dec 26, 2022
New multi tool im making adding features currently

Emera Multi Tool New multi tool im making adding features currently Current List of Planned Features - Linkvertise Bypasser - Discord Auto Bump - Gith

Lamp 3 Dec 03, 2021
A simple flashcard app built as a final project for a databases class.

CS2300 Final Project - Flashcard app 'FlashStudy' Tech stack Backend Python (Language) Django (Web framework) SQLite (Database) Frontend HTML/CSS/Java

Christopher Spencer 2 Feb 03, 2022
A deployer and package manager for OceanBase open-source software.

OceanBase Deploy OceanBase Deploy (简称 OBD)是 OceanBase 开源软件的安装部署工具。OBD 同时也是包管理器,可以用来管理 OceanBase 所有的开源软件。本文介绍如何安装 OBD、使用 OBD 和 OBD 的命令。 安装 OBD 您可以使用以下方

OceanBase 59 Dec 27, 2022
The Ultimate Widevine Content Ripper (KEY Extract + Download + Decrypt) is REBORN

NARROWVINE-REBORN ** UPDATE 21.12.01 ** As expected Google patched its ChromeCDM Whitebox exploit by Satsuoni with a force-update on the ChromeCDM. Th

Vank0n 104 Dec 07, 2022
用于导出墨墨背单词的词库,并生成适用于 List 背单词,不背单词,欧陆词典等的自定义词库

maimemo-export 用于导出墨墨背单词的词库,并生成适用于 List 背单词,欧陆词典,不背单词等的自定义词库。 仓库内已经导出墨墨背单词所有自带词库(暂不包括云词库),多达 900 种词库,可以在仓库中选择需要的词库下载(下载单个文件的方法),也可以去 蓝奏云(密码:666) 下载打包好

ourongxing 293 Dec 29, 2022