Accessing Configuration Parameters Passed To Airflow Through Cli
Solution 1:
This is probably a continuation of the answer provided by devj
.
At
airflow.cfg
the following property should be set to true:dag_run_conf_overrides_params=True
While defining the PythonOperator, pass the following argument
provide_context=True
. For example:
get_row_count_operator = PythonOperator(task_id='get_row_count', python_callable=do_work, dag=dag, provide_context=True)
- Define the python callable (Note the use of
**kwargs
):
def do_work(**kwargs): table_name = kwargs['dag_run'].conf.get('table_name') # Rest of the code
- Invoke the dag from command line:
airflow trigger_dag read_hive --conf '{"table_name":"my_table_name"}'
I have found this discussion to be helpful.
Solution 2:
There are two ways in which one can access the params passed in airflow trigger_dag
command.
In the callable method defined in PythonOperator, one can access the params as
kwargs['dag_run'].conf.get('account_list')
given the field where you are using this thing is templatable field, one can use
{{ dag_run.conf['account_list'] }}
The schedule_interval
for the externally trigger-able DAG is set as None
for the above approaches to work
Solution 3:
In the case you are trying to access the Airflow system-wide config (instead of a DAG config), the following might help:
Firstly, import this
from airflow.configurationimport conf
Secondly, get the value somewhere
conf.get("core", "my_key")
Possible, set a value with
conf.set("core", "my_key", "my_val")
Solution 4:
For my use case, I had to pass arguments to the airflow workflow(or task) using the API. My workflow is as follows: Lambda is triggered when a new file lands in the S3 bucket, the Lambda in turn triggered an airflow DAG and passed the bucket name and the key of the file.
Here's my solution:
s3 = boto3.client('s3')
mwaa = boto3.client('mwaa')
deflambda_handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
mwaa_cli_token = mwaa.create_cli_token(
Name=mwaa_env_name
)
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname'])
conf = {'bucket': bucket, 'key': key}
raw_data = """{0} {1} --conf '{2}'""".format(mwaa_cli_command, dag_name, json.dumps(conf))
# pass the key and bucket name to airflow to initiate the workflow
requests.post(
mwaa_webserver_hostname,
headers={
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
data=raw_data
)
Post a Comment for "Accessing Configuration Parameters Passed To Airflow Through Cli"