读CSV文件支持从OSS、HTTP、HDFS读取CSV类型的文件数据。本文为您介绍读CSV文件的参数配置。

使用限制

  • 通过可视化方式使用时,支持的计算资源为MaxCompute或Flink。
  • 通过PyAlink脚本方式使用时,需要使用PyAlink脚本组件进行代码调用,详情请参见PyAlink脚本

前提条件

(可选)已完成OSS授权,详情请参见云产品依赖与授权:Designer
说明文件来源选择OSS时,需要完成此操作。

组件配置

您可以通过以下任意一种方式,配置读CSV文件组件参数。

方式一:可视化方式

在PAI-Designer工作流页面配置组件参数。
页签 参数名称 描述
参数设置 文件来源 支持OSS和OTHERS。
文件路径 读取CSV文件的路径:
  • 文件来源选择OSS时,支持输入或选择OSS文件路径,对于1 GB以内的单文件,支持在选择控件中直接上传至指定的OSS路径。
  • 文件来源选择OTHERS时,支持输入HTTP或HDFS类型的文件地址。
Schema 配置每一列对应的数据类型,格式为colname0 coltype0[, colname1 coltype1[, ...]]。例如:f0 string,f1 bigint,f2 double
重要 配置的数据类型需要与CSV文件每一列的数据类型保持一致,否则该列内容会读取失败。
字段分隔符 字段的分隔符,默认为半角逗号(,)。
处理无效值的方法 解析Tensor、Vector和MTable类型的数据失败时,处理无效数据的方法。它们是Alink算法框架所定义的数据类型,有固定的解析格式。处理无效值的方法取值如下:
  • ERROR:解析失败时,运行中止。
  • SKIP:解析失败时,跳过该数据。
是否忽略第一行数据 是否忽略第一行的数据,当CSV文件首行为表头信息时,需要勾选此参数。
是否容错 输入数据式与Schema中定义的不一致时,例如类型不一致或列数不一致的容错方法:
  • 打开是否容错开关:当解析失败时,丢弃该行数据。
  • 关闭是否容错开关:当解析失败时,运行终止,并打印出错行。
引号字符 默认为半角双引号(")。
行分隔符 默认为换行符(\n)。
是否忽略空行 是否忽略空行。
执行调优 节点个数 节点个数,与参数单个节点内存大小配对使用,正整数。范围[1, 9999]。
单个节点内存大小,单位M 单个节点内存大小,单位MB,正整数。范围[1024, 64*1024]。

方式二:PyAlink脚本方式

使用PyAlink脚本方式,配置该组件参数。您可以使用PyAlink脚本组件进行Python代码调用,组件详情请参见PyAlink脚本
参数名称 是否必选 描述 默认值
schemaStr 格式为colname0 coltype0[, colname1 coltype1[, ...]]。例如f0 string,f1 bigint,f2 double
filePath 文件路径。
fieldDelimiter 字段分隔符。 半角逗号(,)
handleInvalidMethod 解析Tensor、Vector和MTable类型的数据失败时,处理无效数据的方法。它们是Alink算法框架所定义的数据类型,有固定的解析格式。处理无效值的方法取值如下:
  • ERROR:解析失败时,运行中止。
  • SKIP:解析失败时,跳过该数据。
ERROR
ignoreFirstLine 是否忽略第一行的数据,当CSV文件首行为表头信息时,需要将ignoreFirstLine设为True。 False
lenient 输入数据的格式与Schema中定义的不一致时,例如类型不一致或列数不一致的容错方法:
  • true:当解析失败时,丢弃该行数据。
  • false:当解析失败时,抛异常。
False
quoteString 引号字符。 半角双引号(")
rowDelimiter 行分隔符。 换行符(\n)
skipBlankLine 是否忽略空行。 True
PyAlink代码方式的使用示例。
filePath = 'https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv'
schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
csvSource = CsvSourceBatchOp()\
    .setFilePath(filePath)\
    .setSchemaStr(schema)\
    .setFieldDelimiter(",")
BatchOperator.collectToDataframe(csvSource)