读CSV文件

CSV文件支持从OSS、HTTP、HDFS读取CSV类型的文件数据。本文为您介绍读CSV文件的参数配置。

使用限制

  • 通过可视化方式使用时,支持的计算资源为MaxComputeFlink。

  • 通过PyAlink脚本方式使用时,需要使用PyAlink脚本组件进行代码调用,详情请参见PyAlink脚本

前提条件

(可选)已完成OSS授权,详情请参见云产品依赖与授权:Designer

说明

文件来源选择OSS时,需要完成此操作。

组件配置

您可以通过以下任意一种方式,配置读CSV文件组件参数。

方式一:可视化方式

Designer工作流页面配置组件参数。

页签

参数名称

描述

参数设置

文件来源

支持OSSOTHERS。

文件路径

读取CSV文件的路径:

  • 文件来源选择OSS时,支持输入或选择OSS文件路径,对于1 GB以内的单文件,支持在选择控件中直接上传至指定的OSS路径。

  • 文件来源选择OTHERS时,支持输入HTTPHDFS类型的文件地址。

Schema

配置每一列对应的数据类型,格式为colname0 coltype0, colname1 coltype1,colname2 coltype2...。例如:f0 string,f1 bigint,f2 double

重要

配置的数据类型需要与CSV文件每一列的数据类型保持一致,否则该列内容会读取失败。

字段分隔符

字段的分隔符,默认为半角逗号(,)。

处理无效值的方法

解析Tensor、VectorMTable类型的数据失败时,处理无效数据的方法。它们是Alink算法框架所定义的数据类型,有固定的解析格式。处理无效值的方法取值如下:

  • ERROR:解析失败时,运行中止。

  • SKIP:解析失败时,跳过该数据。

是否忽略第一行数据

是否忽略第一行的数据,当CSV文件首行为表头信息时,需要勾选此参数。

是否容错

输入数据与Schema中定义的不一致时,例如类型不一致或列数不一致的容错方法:

  • 打开是否容错开关:当解析失败时,丢弃该行数据。

  • 关闭是否容错开关:当解析失败时,运行终止,并打印出错行。

引号字符

默认为半角双引号(")。

行分隔符

默认为换行符(\n)。

是否忽略空行

是否忽略空行。

执行调优

节点个数

节点个数,与参数单个节点内存大小配对使用,正整数。范围[1, 9999]。

单个节点内存大小,单位M

单个节点内存大小,单位MB,正整数。范围[1024, 64*1024]。

方式二:PyAlink脚本方式

使用PyAlink脚本方式,配置该组件参数。您可以使用PyAlink脚本组件进行Python代码调用,组件详情请参见PyAlink脚本

参数名称

是否必选

描述

默认值

schemaStr

格式为colname0 coltype0[, colname1 coltype1[, ...]]。例如f0 string,f1 bigint,f2 double

filePath

文件路径。

fieldDelimiter

字段分隔符。

半角逗号(,)

handleInvalidMethod

解析Tensor、VectorMTable类型的数据失败时,处理无效数据的方法。它们是Alink算法框架所定义的数据类型,有固定的解析格式。处理无效值的方法取值如下:

  • ERROR:解析失败时,运行中止。

  • SKIP:解析失败时,跳过该数据。

ERROR

ignoreFirstLine

是否忽略第一行的数据,当CSV文件首行为表头信息时,需要将ignoreFirstLine设为True。

False

lenient

输入数据的格式与Schema中定义的不一致时,例如类型不一致或列数不一致的容错方法:

  • true:当解析失败时,丢弃该行数据。

  • false:当解析失败时,抛异常。

False

quoteString

引号字符。

半角双引号(")

rowDelimiter

行分隔符。

换行符(\n)

skipBlankLine

是否忽略空行。

True

PyAlink代码方式的使用示例。

filePath = 'https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv'
schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
csvSource = CsvSourceBatchOp()\
    .setFilePath(filePath)\
    .setSchemaStr(schema)\
    .setFieldDelimiter(",")
BatchOperator.collectToDataframe(csvSource)