当前位置:网站首页>TFX airflow experience
TFX airflow experience
2022-07-28 02:52:00 【Paper scum】
1、 Introduce :
airflow Introduction to :Apache Airflow 2.3.0 Released on May Day !_ The blog of the data society -CSDN Blog
airflow Official documents of :https://airflow.apache.org/docs/apache-airflow/2.2.3/installation/index.html
tfx Introduction to :https://www.tensorflow.org/tfx/tutorials/tfx/components_keras
2、 install
Overall reference :TFX (TensorFlow Extended) research _ Blog of data technology group -CSDN Blog
among , The version corresponds to pip install --upgrade
python Version is 3.8
pip install --upgrade pip
pip install -U tfx
pip install apache-airflow==3.6.1
pip install tfx==1.6.1
If you use the default latest version to install and start airflow Report errors , There will be version conflicts
3、 Create user rights :
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
4、 Initialize database :
airflow db init
5、 start-up airflow( Note that if the version does not correspond , An error will be reported when starting here ):
airflow webserver -p 8080
If the version corresponds to ,airflow Initiate error reporting , It is suggested to delete ~/airflow Entire directory , And then from 3 Step on .
6、 Start timing task :
airflow scheduler
7、airflow The test script : Put in ~/airflow/dags/, The file name can be customized
from datetime import datetime
from airflow.operators.bash import BashOperator
from airflow.models import DAG
# newly build dag example
dag = DAG(
dag_id='xyz',
start_date=datetime.now(),
schedule_interval='0 0 * * *')
# New three task Mission
t1 = BashOperator(
task_id='first',
bash_command='echo "1"',
dag=dag)
t2 = BashOperator(
task_id='second',
bash_command='echo "2"',
dag=dag)
t3 = BashOperator(
task_id='third',
bash_command='echo "3"',
dag=dag)
# Configure task flow
t1 >> t2 >> t3
8、tfx Start model task related :
Code section :
import datetime
import os
from tfx.components import CsvExampleGen
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.components.base import executor_spec
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner
from tfx.orchestration.airflow.airflow_dag_runner import AirflowPipelineConfig
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.components import StatisticsGen
from tfx.components import SchemaGen
from tfx.components import ExampleValidator
from tfx.components import Transform
from tfx.components import Trainer
from tfx.proto import trainer_pb2
import tensorflow_model_analysis as tfma
from tfx.components import Evaluator
from tfx.dsl.components.common import resolver
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.components import Pusher
from tfx.proto import pusher_pb2
pipeline_name = 'keras'
airflow_root = os.path.join(os.environ['HOME'], 'airflow')
data_root = os.path.join(airflow_root, 'data', 'keras')
module_file = os.path.join(airflow_root, 'code', 'udf.py')
serving_model_dir = os.path.join(airflow_root, 'serving_model', pipeline_name)
tfx_root = os.path.join(airflow_root, 'tfx')
pipeline_root = os.path.join(tfx_root, 'pipelines', pipeline_name)
metadata_path = os.path.join(tfx_root, 'metadata', pipeline_name, 'metadata.db')
example_gen = CsvExampleGen(input_base=data_root)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
infer_schema = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False
)
validate_stats = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=infer_schema.outputs['schema']
)
transform = Transform(
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
module_file=module_file
)
trainer = Trainer(
module_file=module_file,
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=infer_schema.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000)
)
model_resolver = resolver.Resolver(
strategy_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing)
).with_id('latest_blessed_model_resolver')
eval_config = tfma.EvalConfig(
model_specs=[tfma.ModelSpec(label_key='tips')],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='BinaryAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.6}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10})))
])
]
)
model_analyzer = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
eval_config=eval_config
)
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=model_analyzer.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir))
)
pip = pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen,
statistics_gen,
infer_schema,
validate_stats,
transform,
trainer,
model_resolver,
model_analyzer,
pusher
],
enable_cache=True,
metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path),
beam_pipeline_args=['--direct_running_mode=multi_processing', '--direct_num_workers=0']
)
airflow_config = {
'schedule_interval': None,
'start_date': datetime.datetime(2022, 1, 1)
}
DAG = AirflowDagRunner(AirflowPipelineConfig(airflow_config)).run(pip)
among , Description of relevant components :
ExampleGen: Data input source , The above data sources :https://github.com/LahiruTjay/Machine-Learning-With-Python/blob/master/datasets/diabetes.csv Put in ~/airflow/data/keras/
StatisticsGen: Pipeline components generate feature statistics based on training data and application data , For other assembly line components .StatisticsGen Use Beam To expand to large datasets
SchemaGen: schema Information ,schema.proto An example of , It can specify the data type of characteristic value 、 Whether there must be features in all samples 、 Allowed value range and other attributes .
ExampleValidator: Identify abnormal data in the data set
Transform: Assembly line components can be matched from ExampleGen Sent by the component tf.Examples Perform feature Engineering ( Use by SchemaGen Created data schema ) And publish SavedModel. Execution time ,SavedModel Will accept from ExampleGen Sent by the component tf.Examples Component and converted feature data
Trainer: Assembly line is used for training TensorFlow Model
Tuner: Used to adjust the superparameter of the model
Evaluator: Make an in-depth analysis of the training results of the model , To help you understand the implementation of the model on the data subset .Evaluator It can also help you verify the exported model
InfraValidator: Used as an early warning layer before putting the model into production
Pusher: Push the validated model to the deployment target
9、 give the result as follows :

边栏推荐
- 基于stm32的恒功率无线充电
- TypeScript(零) —— 简介、环境搭建、第一个实例
- When iPhone copies photos to the computer, the device connection often fails and the transmission is interrupted. Here's the way
- AWS elastic three swordsman
- Canonical Address
- 0 dynamic programming medium leetcode873. Length of the longest Fibonacci subsequence
- [wechat applet development (V)] the interface is intelligently configured according to the official version of the experience version of the development version
- ROS的调试经验
- POC simulation attack weapon - Introduction to nucleus (I)
- Pytest the best testing framework
猜你喜欢

Some shortest path problems solved by hierarchical graph

【LeetCode】13. Linked List Cycle·环形链表

Canvas from getting started to persuading friends to give up (graphic version)

JS 事件对象 offsetX/Y clientX Y PageX Y

How to simply realize the function of menu dragging and sorting
![[TA frost wolf \u may hundred people plan] Figure 3.5 early-z and z-prepass](/img/85/2b6c9cf83340ee8bc01e85f77ec101.png)
[TA frost wolf \u may hundred people plan] Figure 3.5 early-z and z-prepass

Chapter III queue

怎么简单实现菜单拖拽排序的功能

Selenium+pytest+allure comprehensive exercise

LETV responded that employees live an immortal life without internal problems and bosses; Apple refuses to store user icloud data in Russia; Dapr 1.8.0 release | geek headlines
随机推荐
JS中的reduce()函数介绍
trivy【1】工具扫描运用
[leetcode] 13. linked list cycle · circular linked list
2022.7.8 eth price analysis
【ELM分类】基于核极限学习机和极限学习机实现UCI数据集分类附matlab代码
[data processing] boxplot drawing
[tutorial of using idea] shortcut key of idea
阿憨的故事
selenium+pytest+allure综合练习
CNN循环训练的解释 | PyTorch系列(二十二)
Four methods of modifying MySQL password (suitable for beginners)
@Valid的作用(级联校验)以及常用约束注解的解释说明
基于FPGA的64位8级流水线加法器
P6118 [JOI 2019 Final]珍しい都市 题解
“29岁,普通功能测试,我是如何在一周内拿到5份Offer的?”
[wechat applet development (VI)] draw the circular progress bar of the music player
写英文IEEE论文的技巧
Redis aof日志持久化
ROS的调试经验
树的孩子兄弟表示法