环球观察:大数据NiFi(十八):离线同步MySQL数据到HDFS
离线同步MySQL数据到HDFS
案例:使用NiFi将MySQL中数据导入到HDFS中。
(资料图片仅供参考)
以上案例用到的处理器有“QueryDatabaseTable”、“ConvertAvroToJSON”、“SplitJson”、“PutHDFS”四个处理器。
一、配置“QueryDatabaseTable”处理器
该处理器主要使用提供的SQL语句或者生成SQL语句来查询MySQL中的数据,查询结果转换成Avro格式。该处理器只能运行在主节点上。
关于“QueryDatabaseTable”处理器的“Properties”配置的说明如下:
配置项 | 默认值 | 允许值 | 描述 |
---|---|---|---|
Database Connection Pooling Service(数据库连接池服务) | 用于获得与数据库的连接的Controller Service。 | ||
Database Type(数据库类型) | Generic | 选择数据库类型。Generic 通用类型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL | |
Table Name(表名) | 查询数据库的表名,当使用“Custom Query”时,此为查询结果的别名,并作为FlowFile中的属性。 | ||
Columns to Return(返回的列) | 查询返回的列,多个列使用逗号分隔。如果列中有特殊名称需要加引号,则所有列都需要加引号处理。 | ||
Additional WHERE clause(where条件) | 在构建SQL查询时添加到WHERE条件中的自定义子句。 | ||
Custom Query(自定义SQL查询) | 自定义的SQL语句。该查询被构建成子查询,设置后不会从其他属性构建SQL查询。自定义SQL不支持Order by查询。 | ||
Maximum-value Columns(最大值列) | 指定增量查询获取最大值的列,多列使用逗号分开。指定后,这个处理器只能检索到添加/更新的行。不能设置无法比较大小的列,例如:boolean/bit。如果不指定,则参照表中所有的列来查询全量数据,这会对性能产生影响。 | ||
Max Wait Time(最大超时时间) | 0 seconds | SQL查询最大时长,默认为0没有限制,设置小于0的时间默认为0。 | |
Fetch Size(拉取数据量) | 0 | 每次从查询结果中拉取的数据量。 | |
Max Rows Per Flow File(每个FlowFile行数) | 0 | 在一个FlowFile文件中的数据行数。通过这个参数可以将很大的结果集分到多个FlowFile中。默认设置为0,所有结果存入一个FlowFile。 | |
Output Batch Size(数据输出批次量) | 0 | 输出的FlowFile批次数据大小,当设置为0代表所有数据输出到下游关系。如果数据量很大,则有可能下游很久没有收到数据,如果设置了,则每次达到该数据量就释放数据,传输到下游。 | |
Maximum Number of Fragments(最大片段数) | 0 | 设置返回的最大数据片段数,设置0默认将所有数据片段返回,如果表非常大,设置后可以防止OOM错误。 | |
Normalize Table/Column Names(标准表/列名) | false | truefalse | 是否将列名中不兼容avro的字符修改为兼容avro的字符。例如,冒号和句点将被更改为下划线,以构建有效的Avro记录。 |
Transaction Isolation Level | 设置事务隔离级别。 | ||
Use Avro Logical Types(使用Avro逻辑类型) | false | truefalse | 是否对DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro逻辑类型。 |
Default Decimal Precision(Decimal数据类型位数) | 10 | 当 DECIMAL/NUMBER 数据类型转换成Avro类型数据时,指定的数据位数。 | |
Default Decimal Scale(Decimal 数据类型小数位数) | 0 | 当 DECIMAL/NUMBER 数据类型转换成Avro类型数据时,指定的小数点后的位数。 |
Table Name(表名)查询数据库的表名,当使用“Custom Query”时,此为查询结果的别名,并作为FlowFile中的属性。 Columns to Return (返回的列) 查询返回的列,多个列使用逗号分隔。如果列中有特殊名称需要加引号,则所有列都需要加引号处理。 Additional WHERE clause (where条件) 在构建SQL查询时添加到WHERE条件中的自定义子句。 Custom Query (自定义SQL查询) 自定义的SQL语句。该查询被构建成子查询,设置后不会从其他属性构建SQL查询。自定义SQL不支持Order by查询。 Maximum-value Columns (最大值列) 指定增量查询获取最大值的列,多列使用逗号分开。指定后,这个处理器只能检索到添加/更新的行。不能设置无法比较大小的列,例如:boolean/bit。如果不指定,则参照表中所有的列来查询全量数据,这会对性能产生影响。 Max Wait Time(最大超时时间)0 seconds SQL查询最大时长,默认为0没有限制,设置小于0的时间默认为0。 Fetch Size(拉取数据量)0 每次从查询结果中拉取的数据量。 Max Rows Per Flow File(每个FlowFile行数)0 在一个FlowFile文件中的数据行数。通过这个参数可以将很大的结果集分到多个FlowFile中。默认设置为0,所有结果存入一个FlowFile。 Output Batch Size(数据输出批次量)0 输出的FlowFile批次数据大小,当设置为0代表所有数据输出到下游关系。如果数据量很大,则有可能下游很久没有收到数据,如果设置了,则每次达到该数据量就释放数据,传输到下游。 Maximum Number of Fragments(最大片段数)0 设置返回的最大数据片段数,设置0默认将所有数据片段返回,如果表非常大,设置后可以防止OOM错误。 Normalize Table/Column Names(标准表/列名)false true false 是否将列名中不兼容avro的字符修改为兼容avro的字符。例如,冒号和句点将被更改为下划线,以构建有效的Avro记录。 Transaction Isolation Level 设置事务隔离级别。 Use Avro Logical Types(使用Avro逻辑类型)false true false 是否对DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro逻辑类型。 Default Decimal Precision(Decimal数据类型位数)10 当 DECIMAL/NUMBER 数据类型转换成Avro类型数据时,指定的数据位数。 Default Decimal Scale(Decimal 数据类型小数位数)0 当 DECIMAL/NUMBER 数据类型转换成Avro类型数据时,指定的小数点后的位数。
配置步骤如下:
1、新建“QueryDatabaseTable”处理器
2、配置“SCHEDULING”调度时间
这里调度时间配置为99999s,读取数据库,这里读取一次即可,默认0会不间断读取数据库会对服务器造成非常大压力。执行仅支持“Primary”主节点运行。
3、配置“PROPERTIES”
配置“Database Connection Pooling Service”选择创建,在弹出页面中可以按照默认选择直接点击“Create”。
点击“->”继续配置MySQL连接:
在弹出的页面中填入:
连接MysqlURL:jdbc:mysql://192.168.179.5:3306/mynifi?characterEncoding=UTF-8&useSSL=false
MySQL驱动类:com.mysql.jdbc.DriverMySQL jar包路径:需要提前在NiFI集群各个节点上创建对应目录并上传jar包。连接mysql的用户名和密码。通过以上配置好连接mysql如下:
配置其他属性如下:
二、配置“ConvertAvroToJSON”处理器
此处理器是将二进制Avro记录转换为JSON对象,提供了一个从Avro字段到JSON字段的直接映射,这样得到的JSON将具有与Avro文档相同的层次结构。输出的JSON编码为UTF-8编码,如果传入的FlowFile包含多个Avro记录,则转换后的FlowFile是一个含有所有Avro记录的JSON数组或一个JSON对象序列(每个Json对象单独成行)。如果传入的FlowFile不包含任何记录,则输出一个空JSON对象。
关于“ConvertAvroToJSON”处理器的“Properties”配置的说明如下:
配置项 | 默认值 | 允许值 | 描述 |
---|---|---|---|
JSON container options(Json选择) | array | nonearray | 如何解析Json对象,none:解析Json将每个Json对象写入新行。array:解析到的json存入JsonArray一个对象 |
Wrap Single Record(数据库类型) | false | truefalse | 指定解析到的空记录或者单条记录是否按照“JSON container options”配置包装对象。 |
Avro schema(表名) | 如果Avro数据没有Schema信息,需要配置。 |
配置步骤如下:
1、创建“ConvertAvroToJSON”处理器
2、配置“PROPERTIES”
3、连接“QueryDatabaseTable”处理器和“CovertAvroToJSON”处理器
连接好两个处理器后,可以配置“Connection”为负载均衡方式传递数据:
三、配置“SplitJson”处理器
该处理器使用JsonPath表达式指定需要的Json数组元素,将Json数组中的多个Json对象切分出来,形成多个FlowFile。每个生成的FlowFile都由指定数组中的一个元素组成,并传输到关系"split",原始文件传输到关系"original"。如果没有找到指定的JsonPath,或者没有对数组元素求值,则将原始文件路由到"failure",不会生成任何文件。
关于“SplitJson”处理器的“Properties”配置的说明如下:
配置项 | 默认值 | 允许值 | 描述 |
---|---|---|---|
JsonPath Expression(Json表达式) | 一个JsonPath表达式,它指定用以分割的数组元素。 | ||
Null Value Representation(Null值表示) | empty string | empty stringthe string "null" | 指定结果为空值时的表示形式。 |
配置步骤如下:
1、创建“SplitJson”处理器
2、配置“PROPERTIES”
3、连接“ConvertAvroToJSON”处理器和“SplitJson”处理器
连接后,连接关系选择“success”:
同时配置“ConverAvroToJSON”处理失败的数据自动终止:
四、配置“PutHDFS”处理器
该处理器是将FlowFile数据写入到HDFS分布式文件系统中。关于“PutHDFS”处理器的“Properties”主要配置的说明如下:
配置项 | 默认值 | 允许值 | 描述 |
---|---|---|---|
Hadoop Configuration Resources(Hadoop配置) | nonearray | HDFS配置文件,一个文件或者由逗号分隔的多个文件。不配置将在ClassPath中寻找‘core-site.xml’或者‘hdfs-site.xml’文件。 | |
Directory(目录) | 需要写入文件的HDFS父目录。如果目录不存在,将创建该目录。 | ||
Conflict Resolution Strategy(冲突解决) | fail | replaceignorefailappend | 指示当输出目录中已经存在同名文件时如何处理。 |
配置步骤如下:
1、创建“PutHDFS”处理器
2、配置“PROPERTIES”
注意:以上需要在各个NiFi集群节点上创建“/root/test”目录,并且在该目录下上传hdfs-site.xml和core-site.xml文件。
3、连接“SplitJson”处理器和“PutHDFS”处理器
同时设置“SplitJson”处理器中“failure”和“original”数据关系自动终止。
设置“PutHDFS”处理器“success”和“failure”数据关系自动终止:
配置好的连接关系如下:
五、运行测试
1、在MySQL创建库“mynifi”,并且创建表“test1”,向表中插入10条数据
mysql> create database mynifi;Query OK, 1 row affected (0.02 sec)mysql> use mynifi;Database changedmysql> create table test1(id int,name varchar(255),age int );Query OK, 0 rows affected (0.07 sec)mysql> insert into test1 values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tt",22)
2、首先启动“QueryDatabaseTable”处理器观察队列数据
3、单独启动“ConvertAvroToJson”处理器观察队列数据
4、单独启动“SplitJson”处理器观察队列数据
5、单独启动“PutHDFS”处理器观察HDFS对应目录数据
查看数据:
注意:
如果在“QueryDatabaseTable”处理器中设置增属性“Maximum-value Columns”为id,那么每次查询都是大于id的增量数据。如果想要存入HDFS文件为多行而不是一行,可以将“CovertAvroToJson”处理器属性“JSON container options”设置为none,直接解析Avro文件得到一个个json数据,然后直接连接“PutHDFS”处理器即可。标签: PostgreSQL JSON 云数据库 Server
-
15
2023-02传iPhone15Pro将拥有更窄的边框 2023年秋季发布
一位通常准确的消息人士再次强调了他们之前关于iPhone15 Pro将拥有更窄的边框的报告,尽管没有透露任何新的细节。预计iPhone 15 Pro将于 -
14
2023-02中正评测曝光RTX4060跑分 性能比3060强太多
近日,数码博主中正评测曝光了RTX4060的跑分成绩。据悉,这次测试采用的是搭载i9-13900HX处理器的雷神ZERO。硬件方面,RTX 4060拥有3072个 -
06
2023-02惠普新款IPSBlack面板显示器上架 配备雷电4接口
惠普新款IPS Black面板显示器Z32k G3已上架,价格为6999元。新款显示器最大的亮点就是配备雷电4接口,拥有40Gbps的数据传输速率,通过单U -
03
2023-02售价15999元!苹果全新MacBookPro上市开售
2月3日消息,今日,苹果全新MacBook Pro正式上市开售,搭载M2 Pro芯片的14英寸版本起售价15999元,M2 Pro芯片16英寸版本起售价为19999 -
02
2023-02三星GalaxyS23系列正式发布 搭载高通第二代骁龙8系
今日凌晨2点,三星Galaxy S23系列正式发布。和上一代不同,Galaxy S23全系搭载高通第二代骁龙8移动平台,没有Exynos版本。不仅如此,Gala -
15
2022-11vivoX90系列正式发布 蔡司一英寸T+自研芯片
在当下的手机市场环境中,为了寻求突围行业瓶颈,厂商们都把发力点之一放到了提升创新性方面。包括对手机形态、具体关键技术等的改善。尤其 -
14
2022-11骁龙8Gen2性能再提升20% 单核跑出了1524分
今年的高通骁龙峰会提前到了11月15日在美国夏威夷举行,无疑,此次的绝对主角将是骁龙8 Gen2。已经见识过骁龙8 Gen2样机的爆料人Yogesh -
14
2022-11魅蓝lifeme140W氮化镓充电器发布 体积缩小了20%以上
魅蓝lifeme推出了一款新品140W GaN快速充电器,售价369元。这款140W氮化镓充电器外观设计简约,采用纯白配色,折叠式插脚设计方案,体积较 -
14
2022-11iPhone14系列Pro版销量喜人 组装工厂目前产能大幅降低
iPhone 14系列自从发布以来,标准版就明显预冷,但Pro版热度一直居高不下,销量喜人。此前苹果发布的第三季度财报还显示,iPhone销量明显 -
14
2022-11全球首款12英寸高分辨率可拉伸显示屏发布 具有全彩色RGB功能
LG显示 (LG Display) 发布了全球首款12英寸高分辨率可拉伸显示屏 (Stretchable Display)。据了解,这款显示屏具有橡皮筋般的柔韧性, -
24
2022-05俄谈判代表团团长:从未拒绝与乌克兰谈判
新华社明斯克5月22日电(记者鲁金博)俄罗斯谈判代表团团长、总统助理梅金斯基22日在接受白俄罗斯媒体视频采访时 -
24
2022-05加强核心技术攻关 着力解决“好用”问题
“担大任,要有大能力。特别是要加快培育农机原创技术策源地,围绕解决‘卡脖子’问题,深入开展重点产品核心技术
卷出一块好曲屏 真我10系列新品发布会举行
英国猴痘病例数预计将大幅上升
上海:视情适当延长毕业生在校生身份时间
国家电网确定新型电力系统科技攻关十大重点项目
比亚迪发布CTB电池车身一体化技术
商务部:坚定致力于实现全面、高水平的亚太自贸区
中办国办印发《意见》 推进实施国家文化数字化战略
初夏看市场:“菜篮子”产品生产供应充足 蔬菜在田面积达9877.2万亩
上海浦东重点生产企业复工复产超1100家
泰国羽毛球公开赛落幕 中国队收获一冠两亚
-
1
配置拉满的电竞神机 雷神ZERO2023大黄蜂发布
-
2
真我10Pro系列发布 首发量产2160Hz超高频调光技术
-
3
阿富汗塔利班组建正规军
-
4
萨赫勒地区反恐形势面临新变数
-
5
北约北扩加剧欧洲安全风险
-
6
贵州毕节七星关区百所学校创办百个“红军班”
-
7
湖北省孝感军分区组织军地联合应急救援研究性演练
-
8
青藏高原等区域将新设一批国家公园
-
9
河北省承德军分区退役军人担纲教练主力
-
10
军校“大思政”:画出教育同心圆