当前位置:网站首页>tfx airflow 使用体验
tfx airflow 使用体验
2022-07-28 01:49:00 【会发paper的学渣】
1、介绍:
airflow的介绍:Apache Airflow 2.3.0 在五一重磅发布!_数据社的博客-CSDN博客
airflow的官方文档:https://airflow.apache.org/docs/apache-airflow/2.2.3/installation/index.html
tfx的介绍:https://www.tensorflow.org/tfx/tutorials/tfx/components_keras
2、安装
整体参考:TFX (TensorFlow Extended) 调研_数据技术组的博客-CSDN博客
其中,版本对应关pip install --upgrade
python版本为3.8
pip install --upgrade pip
pip install -U tfx
pip install apache-airflow==3.6.1
pip install tfx==1.6.1
如果使用默认的最新版本安装启动airflow报错,会有版本冲突
3、创建用户权限:
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
4、初始化数据库:
airflow db init
5、启动airflow(注意如果版本不对应,此处启动会报错):
airflow webserver -p 8080
如果版本对应,airflow启动报错,建议删除~/airflow整个目录,然后从3步开始。
6、启动定时任务:
airflow scheduler
7、airflow测试脚本:放入到~/airflow/dags/,文件名可自定义
from datetime import datetime
from airflow.operators.bash import BashOperator
from airflow.models import DAG
#新建dag实例
dag = DAG(
dag_id='xyz',
start_date=datetime.now(),
schedule_interval='0 0 * * *')
#新建三个task任务
t1 = BashOperator(
task_id='first',
bash_command='echo "1"',
dag=dag)
t2 = BashOperator(
task_id='second',
bash_command='echo "2"',
dag=dag)
t3 = BashOperator(
task_id='third',
bash_command='echo "3"',
dag=dag)
#配置任务流
t1 >> t2 >> t3
8、tfx启动模型任务相关:
代码部分:
import datetime
import os
from tfx.components import CsvExampleGen
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.components.base import executor_spec
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.airflow.airflow_dag_runner import AirflowDagRunner
from tfx.orchestration.airflow.airflow_dag_runner import AirflowPipelineConfig
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.components import StatisticsGen
from tfx.components import SchemaGen
from tfx.components import ExampleValidator
from tfx.components import Transform
from tfx.components import Trainer
from tfx.proto import trainer_pb2
import tensorflow_model_analysis as tfma
from tfx.components import Evaluator
from tfx.dsl.components.common import resolver
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.components import Pusher
from tfx.proto import pusher_pb2
pipeline_name = 'keras'
airflow_root = os.path.join(os.environ['HOME'], 'airflow')
data_root = os.path.join(airflow_root, 'data', 'keras')
module_file = os.path.join(airflow_root, 'code', 'udf.py')
serving_model_dir = os.path.join(airflow_root, 'serving_model', pipeline_name)
tfx_root = os.path.join(airflow_root, 'tfx')
pipeline_root = os.path.join(tfx_root, 'pipelines', pipeline_name)
metadata_path = os.path.join(tfx_root, 'metadata', pipeline_name, 'metadata.db')
example_gen = CsvExampleGen(input_base=data_root)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
infer_schema = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False
)
validate_stats = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=infer_schema.outputs['schema']
)
transform = Transform(
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
module_file=module_file
)
trainer = Trainer(
module_file=module_file,
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=infer_schema.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000)
)
model_resolver = resolver.Resolver(
strategy_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing)
).with_id('latest_blessed_model_resolver')
eval_config = tfma.EvalConfig(
model_specs=[tfma.ModelSpec(label_key='tips')],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='BinaryAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.6}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10})))
])
]
)
model_analyzer = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
eval_config=eval_config
)
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=model_analyzer.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir))
)
pip = pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen,
statistics_gen,
infer_schema,
validate_stats,
transform,
trainer,
model_resolver,
model_analyzer,
pusher
],
enable_cache=True,
metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path),
beam_pipeline_args=['--direct_running_mode=multi_processing', '--direct_num_workers=0']
)
airflow_config = {
'schedule_interval': None,
'start_date': datetime.datetime(2022, 1, 1)
}
DAG = AirflowDagRunner(AirflowPipelineConfig(airflow_config)).run(pip)
其中,相关组件说明:
ExampleGen:数据输入源,上述数据源:https://github.com/LahiruTjay/Machine-Learning-With-Python/blob/master/datasets/diabetes.csv放入到~/airflow/data/keras/
StatisticsGen:流水线组件根据训练数据和应用数据来生成特征统计信息,以供其他流水线组件使用。StatisticsGen 使用Beam来扩展为大型数据集
SchemaGen: schema信息,schema.proto 的一个实例,它可以指定特征值的数据类型、是否在所有示样本中都必须存在特征、允许的值范围以及其他属性。
ExampleValidator:识别数据集中的异常数据
Transform:流水线组件可对从ExampleGen组件发出的tf.Examples执行特征工程(使用由SchemaGen创建的数据架构)并发布SavedModel。执行时,SavedModel将接受从ExampleGen组件发出的tf.Examples组件并转换后的特征数据
Trainer:流水线组件用于训练 TensorFlow 模型
Tuner:用于调节模型的超参数
Evaluator:对模型的训练结果进行深入分析,以帮助您了解模型对数据子集的执行情况。Evaluator 还可以帮助您验证导出的模型
InfraValidator:在将模型投入生产之前用作预警层
Pusher:将经过验证的模型推送到部署目标
9、结果如下:

边栏推荐
- [hcip] BGP Foundation
- Chapter 3 business function development (batch export of market activities, Apache POI)
- Four methods of modifying MySQL password (suitable for beginners)
- Please, don't use the command line to configure MySQL master-slave replication. Isn't it fragrant to deploy with urlos interface?
- Center Based 3D object detection and tracking (centerpoint) paper notes
- [TA frost wolf \u may - hundred people plan] Figure 3.7 TP (d) r architecture of mobile terminal
- 特征值和特征向量
- Typescript (zero) -- introduction, environment construction, first instance
- mysql: error while loading shared libraries: libtinfo.so. 5 solutions
- Maskedauutoencoders visual learner cvpr2022
猜你喜欢

Chapter III queue

【TA-霜狼_may-《百人计划》】图形3.7 移动端TP(D)R架构

Find - block search

Redis aof日志持久化

【 图像去雾】基于暗通道和非均值滤波实现图像去雾附matlab代码

Design of edit memory path of edit box in Gui

pytest最好的测试框架

First knowledge of C language -- structure, branch and loop statements
![[data processing] boxplot drawing](/img/4e/c4f863d06d8b318e6bb2d40e0c5ed3.png)
[data processing] boxplot drawing
![[solution] solve the problem of SSH connection being inactive for a long time and being stuck and disconnected](/img/66/99bd61223cbe622db3e28474f4fa15.png)
[solution] solve the problem of SSH connection being inactive for a long time and being stuck and disconnected
随机推荐
[signal denoising] signal denoising based on Kalman filter with matlab code
Job 7.27 IO process
Is it safe to buy funds on Alipay? I want to make a fixed investment in the fund
Learn this trick and never be afraid to let the code collapse by mistake
Constant power wireless charging based on stm32
Design of edit memory path of edit box in Gui
Canonical Address
2022.7.8 supplement of empty Luna
Deep Residual Learning for Image Recognition浅读与实现
树的孩子兄弟表示法
Smart contract security -- selfdestroy attack
LoRaWAN中的网关和chirpstack到底如何通信的?UDP?GRPC?MQTT?
Compile and use Qwt in qt|vs2017
Red hat official announced the new president and CEO! Paul Cormier, a key figure in transformation, is "retiring"
Arm32进行远程调试
unity中物体碰撞反弹(学习)
Redis AOF log persistence
Emotional drama in the world Zhou Bingkun lost his job because he saw Tu Zhiqiang and was shot
A brief analysis of the differences between functional testing and non functional testing, recommended by Shanghai haokoubei software testing company
POC simulation attack weapon - Introduction to nucleus (I)