当前位置: 首页 > news >正文

a片做视频网站百度seo刷排名工具

a片做视频网站,百度seo刷排名工具,wordpress如何制作网页,wordpress 临时域名系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言DataX的Writer写入流…

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel

文章目录

  • 系列文章目录
  • 前言
  • DataX的Writer写入流程
  • Writer组件如何处理各类数据源
  • writer相关源码


前言

在 DataX 中,writer 是数据同步过程中的一个核心组件,负责将数据写入到目标数据源。下面是对 DataX 中 writer 组件的源码分析:
Writer 接口定义:
DataX 的 writer 组件首先定义了一个 Writer 接口,该接口定义了 writer 需要实现的基本方法,如 init(), write(), post() 等。
不同的数据源插件需要实现这个接口,提供对应的数据写入逻辑。
Writer 插件实现:
对于每种目标数据源,DataX 都会有一个对应的 writer 插件实现。例如,对于 MySQL 数据源,会有一个 MysqlWriter 类实现 Writer 接口。
每个 writer 插件的实现中,会包含与目标数据源交互的逻辑,如建立连接、执行 SQL 语句、批量插入数据等。
Writer 配置:
在 DataX 的 JSON 配置文件中,会指定 writer 的类型和相应的配置参数。
这些配置参数会被传递给 writer 插件的 init() 方法,用于初始化 writer 实例。
数据写入逻辑:
在 write() 方法中,writer 会从上游的 reader 中获取数据,并将其写入到目标数据源。
根据不同的数据源和写入策略,writer 可能会采用批量插入、逐条插入等方式进行数据写入。
writer 还会处理写入过程中的异常和错误,确保数据的完整性和一致性。
Writer 清理和关闭:
在数据写入完成后,writer 会执行 post() 方法,进行一些清理和关闭操作。
这可能包括关闭数据库连接、释放资源等。
通过对 DataX 中 writer 组件的源码分析,我们可以了解到 writer 是如何与目标数据源进行交互的,以及它是如何处理和写入数据的。


DataX的Writer写入流程

  • 初始化和准备:
    根据配置文件中指定的目标数据源类型和参数,初始化Writer实例。
    建立与目标数据源的连接,这通常涉及到网络连接、认证授权等步骤。
    准备写入操作所需的各种资源,如缓冲区、事务等。
  • 数据接收:
    Writer从上游的Reader组件接收数据。这些数据可能是经过转换和处理的,已经符合目标数据源的要求。Writer将数据暂存到本地缓冲区或内存中,等待批量写入或逐条写入。
  • 数据格式化和处理:
    根据目标数据源的要求,Writer可能需要对接收到的数据进行格式化处理,如将数据转换为特定的文本格式、二进制格式或JSON格式等。
  • 数据写入:
    Writer将格式化处理后的数据写入目标数据源。写入操作可能涉及到网络通信、数据库操作等。根据目标数据源的特性,Writer会采用批量写入、流式写入等不同的写入方式以提高性能。对于支持事务的数据源,Writer会在每个写入操作前开启一个事务,并在写入完成后提交事务以确保数据的一致性。
  • 错误处理和重试:
    在写入过程中,Writer需要处理可能出现的各种错误和异常,如网络中断、数据格式错误等。根据配置文件中指定的错误处理策略,Writer可能会进行重试、跳过错误数据、记录错误日志等操作。
  • 写入完成和清理:
    当所有数据都成功写入目标数据源后,Writer会执行一些清理操作,如关闭数据库连接、释放资源等。Writer还会向上游的Reader或整个DataX任务发送完成信号,以通知整个任务流程已经完成。

Writer组件如何处理各类数据源

不同的数据源具有不同的写入特性和要求,因此Writer组件需要针对不同的数据源实现相应的写入逻辑。以下是一般情况下,DataX Writer组件如何处理各类数据源的大致步骤和考虑因素:

  • 数据源连接:
    Writer组件首先需要与目标数据源建立连接。这可能涉及到网络通信、认证授权、连接池管理等操作。
    根据数据源类型的不同,Writer可能会使用不同的连接协议和库,如JDBC、ODBC、API等。
  • 写入前准备:
    根据目标数据源的表结构,Writer可能需要创建表、索引或分区。
    Writer可能还需要准备写入数据的格式,如文本、二进制、JSON等。
    对于支持事务的数据源,Writer可能会开启一个事务来确保数据的一致性。
  • 数据写入:
    Writer从Reader组件接收数据,并将其写入目标数据源。
    根据数据源的特点,Writer可能会采用批量写入、逐条写入、流式写入等不同的写入方式。对于一些支持并行写入的数据源,Writer可能需要将数据分片并分配给多个线程或进程进行并发写入。
  • 错误处理:
    Writer需要处理写入过程中可能出现的异常和错误,如网络中断、数据格式错误、数据冲突等。
    根据不同的错误类型,Writer可能会采取重试、跳过、记录错误日志等不同的处理策略。
  • 写入优化:
    对于不同的数据源,Writer可能会采用不同的优化策略来提高写入性能,如使用批量插入、调整事务大小、优化网络传输等。
    Writer还可能利用目标数据源的特定功能,如批量提交、索引优化等,来进一步提高写入效率。
  • 写入后处理:
    在数据写入完成后,Writer可能会执行一些后处理操作,如提交事务、关闭连接、清理临时文件等。
    对于一些需要额外处理的数据源,Writer可能还会执行数据校验、更新统计信息等操作。
  • 扩展性和灵活性:
    DataX的Writer组件设计通常具有高度的扩展性和灵活性,以便支持新的数据源类型。通过实现统一的接口和抽象类,可以方便地添加新的Writer插件来支持新的数据源。

总之,DataX的Writer组件通过针对不同数据源实现特定的写入逻辑和优化策略,能够高效地处理各类数据源,并确保数据的正确性和一致性。同时,其扩展性和灵活性的设计也使得DataX能够轻松应对不断变化的数据处理需求。

writer相关源码


/*** 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。* * * */
public abstract class Writer extends BaseObject {/*** 每个Writer插件必须实现Job内部类*/public abstract static class Job extends AbstractJobPlugin {/*** 切分任务。<br>* * @param mandatoryNumber*            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!* * */public abstract List<Configuration> split(int mandatoryNumber);}/*** 每个Writer插件必须实现Task内部类*/public abstract static class Task extends AbstractTaskPlugin {public abstract void startWrite(RecordReceiver lineReceiver);public boolean supportFailOver(){return false;}}
}
public class MysqlWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterJob;@Overridepublic void preCheck(){this.init();this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterJob.init(this.originalConfig);}// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)@Overridepublic void prepare() {//实跑先不支持 权限 检验//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);this.commonRdbmsWriterJob.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);}// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)@Overridepublic void post() {this.commonRdbmsWriterJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterJob.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterTask;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterTask.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);}//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterTask.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);}@Overridepublic boolean supportFailOver(){String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);return "replace".equalsIgnoreCase(writeMode);}}}

public class RdbmsWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("writer", "rdbms");}public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();// warn:not like mysql, only support insert mode, don't useString writeMode = this.originalConfig.getString(Key.WRITE_MODE);if (null != writeMode) {throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("写入模式(writeMode)配置有误. 因为不支持配置参数项 writeMode: %s, 仅使用insert sql 插入数据. 请检查您的配置并作出修改.",writeMode));}this.commonRdbmsWriterMaster = new SubCommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterMaster.init(this.originalConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterMaster.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterMaster.split(this.originalConfig,mandatoryNumber);}@Overridepublic void post() {this.commonRdbmsWriterMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterMaster.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterSlave;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterSlave = new SubCommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterSlave.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);}public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver,this.writerSliceConfig, super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterSlave.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);}}}
http://www.zhongyajixie.com/news/5395.html

相关文章:

  • wordpress网站模板怎么用培训优化
  • 正规游戏代理平台关键词seo排名怎么做的
  • 石家庄市网站制作价格网络公司主要做哪些
  • 枞阳做网站跨境电商网站开发
  • 成人用品网站怎么做网站链接提交
  • 赣州网站建设方案考研最靠谱的培训机构
  • 《营销型网站建设实战》成都业务网络推广平台
  • 公司做网站提供资料pc网站优化排名
  • iis怎么建设网站离我最近的广告公司
  • 库存网站建设定制信息发布
  • 江西求做网站企业网址怎么注册
  • 初学ssm做的网站南宁求介绍seo软件
  • 北京市网站建设 维护推荐seo公司发展前景
  • 成都网站建设优秀公司seo接单平台
  • 中英文网站怎么做的sem代运营托管公司
  • 网站怎么做关键词搜索排面链接交换公司
  • 布吉网站开发黑马培训是正规学校吗
  • 网站关键词描述字数客户推广渠道有哪些
  • 黔西南州做网站聊石家庄seo
  • 成都cms建站推广品牌的方法
  • seo做网站赚钱吗关键词推广软件排名
  • 龙游手机网站制作优化大师平台
  • wordpress注册会员插件网站快速优化排名app
  • 淘宝客网站源码和模版有什么区别对网络营销的认识800字
  • 优化服务seo咨询常德
  • 网站上传图片尺寸免费推广引流app
  • 广州专业网站建设有哪些百度热线电话
  • 外贸平台免费网站网络推广主要内容
  • 代做毕业设计比较靠谱的网站百度权重网站排名
  • 360建筑招聘网官网青岛网站seo推广