文档

高速数据导入API

更新时间:

高速数据导入API服务端为gRPC服务器,使用gRPC协议(protobuf)来定义客户端接口及其消息交换格式,您可以提交请求列出云原生数据仓库 AnalyticDB PostgreSQL 版数据库中的表,或向特定的表写入数据。

使用限制

  • 云原生数据仓库 AnalyticDB PostgreSQL 版6.0实例需为v6.6.0及以上版本。云原生数据仓库 AnalyticDB PostgreSQL 版7.0实例需为v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式实例暂不支持。

  • 高速数据导入API目前仅支持INSERT、MERGE(UPSERT)、UPDATE三种语法,暂不支持DELETE与READ。

  • 使用MERGE(UPSERT)或UPDATE时,需要云原生数据仓库 AnalyticDB PostgreSQL 版表有主键索引。

  • 请勿在多个并发连接中使用高速数据导入API更新相同主键的数据,会触发全局死锁错误,造成部分数据更新失败。

开启实时数据服务

  1. 登录云原生数据仓库AnalyticDB PostgreSQL版控制台

  2. 在控制台左上角,选择实例所在地域。

  3. 找到目标实例,单击实例ID。

  4. 在控制台左侧导航栏单击实时数据消费,单击左上角开启实时数据服务

    image

  5. 在弹出的对话框中填写名称服务描述并单击确定。开通完成后,可在控制台看到服务状态连接信息

    说明

    服务规格当前不可选,默认为16CU。

使用高速数据导入API

在开通实时数据服务功能后,系统会提供一个用于接收数据的gRPC Server,您可以登录云原生数据仓库AnalyticDB PostgreSQL版控制台,在实时数据消费页面查看Server端的连接信息。

高速数据导入API使用protobuf,数据结构(消息)和支持的操作(服务)通过.proto文件定义,.proto文件为一个普通的文本文件,有关此数据序列化框架的详细信息,请参见protocol buffer语言指南

高速数据导入API的.proto文件定义了客户端可调用的方法,可以从云原生数据仓库 AnalyticDB PostgreSQL 版数据库端获取元数据信息,并向其写入数据。

说明

由于高速数据导入API使用了gRPC协议,即Protocol Buffer格式类型进行数据导入。由于云原生数据仓库 AnalyticDB PostgreSQL 版支持比Protobuf更多的数据类型,因此在导入数据时需要做数据映射。数据映射关系,请参见附录:数据类型映射

将高速导入API服务的定义复制并粘贴到一个名为adbpg.proto的文件中,使用protoc命令编译对应语言客户端即可。具体使用方式,请参见protocol buffer使用指南

syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

package api;

option java_multiple_files = true;

// Connect service Request message
message ConnectRequest {
  string Host = 1;      // Host address of ADBPG master; must be accessible from gpss server system
  int32 Port = 2;       // ADBPG master port
  string Username = 3;  // User or role name that gpss uses to access ADBPG
  string Password = 4;  // User password
  string DB = 5;        // Database name
  bool UseSSL = 6;      // Use SSL or not; ignored, use the gpss config file to config SSL
}

// Connect service Response message
message Session {
  string ID = 1;  // Id of client connection to gpss
}

// Operation mode
enum Operation {
  Insert = 0;  // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
  Merge = 1;   // Insert and Update
  Update = 2;  // Update the value of "UpdateColumns" if "MatchColumns" match
  Read = 3;    // Not supported
}

// Required parameters of the Insert operation
message InsertOption {
  repeated string InsertColumns = 1;    // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
  bool TruncateTable = 2;               // Truncate table before loading?
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Update operation
message UpdateOption {
  repeated string MatchColumns = 1;     // Names of the target table columns to compare when determining to update or not
  repeated string UpdateColumns = 2;    // Names of the target table columns to update if MatchColumns match
  string Condition = 3;                 // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
  repeated string InsertColumns = 1;
  repeated string MatchColumns = 2;
  repeated string UpdateColumns = 3;
  string Condition = 4;
  int64 ErrorLimitCount = 5;
  int32 ErrorLimitPercentage = 6;
}

message ReadOption {
  string MagicCode = 1;
}

// Open service Request message
message OpenRequest {
  Session Session = 1;      // Session ID returned by Connect
  string SchemaName = 2;    // Name of the ADBPG Database schema
  string TableName = 3;     // Name of the ADBPG Database table
  string PreSQL = 4;        // SQL to execute before gpss loads the data
  string PostSQL = 5;       // SQL to execute after gpss loads the data
  int32 Timeout = 6;        // Time to wait before aborting the operation (seconds); not supported
  string Encoding = 7;      // Encoding of text data; not supported
  string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table

  oneof Option {            // Identify the type of write operation to perform
    InsertOption InsertOption = 100;
    UpdateOption UpdateOption = 101;
    MergeOption MergeOption = 102;
  }
}

message DBValue {
  oneof DBType {
    int32 Int32Value = 1;
    int64 Int64Value = 2;
    float Float32Value = 5;
    double Float64Value = 6;
    string StringValue = 7;  // Includes types whose values are presented as string but are not a real string type in ADBPG; for example: macaddr, time with time zone, box, etc.
    bytes BytesValue = 8;
    google.protobuf.Timestamp TimeStampValue = 10;  // Time without timezone
    google.protobuf.NullValue NullValue = 11;
  }
}

message Row {
  repeated DBValue Columns = 1;
}

message RowData {
  bytes Data = 1;     // A single protobuf-encoded Row
}

// Write service Request message
message WriteRequest {
  Session Session = 1;
  repeated RowData Rows = 2;     // The data to load into the target table
}

message ReadRequest {
  string MagicCode = 1;
}

message ReadResponse {
  string Data = 1;
}

// Close service Response message
message TransferStats {          // Status of the data load operation
  int64 SuccessCount = 1;        // Number of rows successfully loaded
  int64 ErrorCount = 2;          // Number of error lines if Errorlimit is not reached
  repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}

// Close service Request message
message CloseRequest {
  Session session = 1;
  int32 MaxErrorRows = 2;        // -1: returns all, 0: nothing, >0: max rows
  bool Abort = 3;
}

// ListSchema service request message
message ListSchemaRequest {
  Session Session = 1;
}

message Schema {
  string Name = 1;
  string Owner = 2;
}

// ListSchema service response message
message Schemas {
  repeated Schema Schemas = 1;
}

// ListTable service request message
message ListTableRequest {
  Session Session = 1;
  string Schema = 2;    // 'public' is the default if no Schema is provided
}

// DescribeTable service request message
message DescribeTableRequest {
  Session Session = 1;
  string SchemaName = 2;
  string TableName = 3;
}

enum RelationType {
  Table = 0;
  View = 1;
  Index = 2;
  Sequence = 3;
  Special = 4;
  Other = 255;
}

message TableInfo {
  string Name = 1;
  RelationType Type = 2;
}

// ListTable service response message
message Tables {
  repeated TableInfo Tables = 1;
}

// DescribeTable service response message
message Columns {
  repeated ColumnInfo Columns = 1;
}

message ColumnInfo {
  string Name = 1;            // Column name
  string DatabaseType = 2;    // ADBPG data type

  bool HasLength = 3;         // Contains length information?
  int64 Length = 4;           // Length if HasLength is true

  bool HasPrecisionScale = 5; // Contains precision or scale information?
  int64 Precision = 6;
  int64 Scale = 7;

  bool HasNullable = 8;       // Contains Nullable constraint?
  bool Nullable = 9;
}

service Gpss {
  // Establish a connection to ADBPG Database; returns a Session object
  rpc Connect(ConnectRequest) returns (Session) {}

  // Disconnect, freeing all resources allocated for a session
  rpc Disconnect(Session) returns (google.protobuf.Empty) {}

  // Prepare and open a table for write
  rpc Open(OpenRequest) returns(google.protobuf.Empty) {}

  // Write data to table
  rpc Write(WriteRequest) returns(google.protobuf.Empty) {}

  // Close a write operation
  rpc Close(CloseRequest) returns(TransferStats) {}

  // List all available schemas in a database
  rpc ListSchema(ListSchemaRequest) returns (Schemas) {}

  // List all tables and views in a schema
  rpc ListTable(ListTableRequest) returns (Tables) {}

  // Decribe table metadata(column name and column type)
  rpc DescribeTable(DescribeTableRequest) returns (Columns) {}

  // Not supported
  rpc Read(ReadRequest) returns (ReadResponse) {}
}

高速导入API使用样例

下文提供了使用Java和Go语言,利用高速数据导入API读写云原生数据仓库 AnalyticDB PostgreSQL 版数据的示例。

Java

package client;

import api.*;
import com.google.longrunning.GetOperationRequest;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.TimestampProto;
import io.grpc.*;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class GrpcClient {
    /** using for adbpgss environment start */
    final static String lissHost = "<LISS_HOST>";             //Server端的连接地址。
    final static int lissPort = "<LISS_PORT>";                //Server端口号。
    final static String schemaName = "<TARGET_SCHEMA_NAME>";  //需要写入数据表的Schema。
    final static String tableName = "<TARGET_TABLE_NAME>";    //需要写入数据的表名。

    final static String adbpgMasterHost = "<ADBPG_HOSTNAME>";
    final static int adbpgMasterPort = "<ADBPG_PORT>";
    final static String adbpgUserName = "<ADBPG_USERNAME>";
    final static String adbpgPasswd = "<ADBPG_PASSWORD>";
    final static String dbname = "<TARGET_DB>";
    /** using for adbpgss environment end */

    public static void buildRowDataAllTypes(ArrayList<RowData> rows, final int ROWNUM, int initcount, String col7) {
        for (int row = 0; row < ROWNUM; row++) {
            // create a row builder
            api.Row.Builder builder = api.Row.newBuilder();

            // create builders for each column, in order, and set values - text, int, text
            api.DBValue.Builder colbuilder1 = api.DBValue.newBuilder();         // a integer
            colbuilder1.setInt32Value(row);
            builder.addColumns(colbuilder1.build());

            api.DBValue.Builder colbuilder2 = api.DBValue.newBuilder();         // b serial
            colbuilder2.setInt32Value(row);
            builder.addColumns(colbuilder2.build());

            api.DBValue.Builder colbuilder3 = api.DBValue.newBuilder();         // c bigint
            colbuilder3.setInt64Value(1000);
            builder.addColumns(colbuilder3.build());

            api.DBValue.Builder colbuilder4 = api.DBValue.newBuilder();         // d bigserial
            colbuilder4.setInt64Value(row);
            builder.addColumns(colbuilder4.build());

            api.DBValue.Builder colbuilder5 = api.DBValue.newBuilder();         // e real
            colbuilder5.setFloat32Value(row);
            builder.addColumns(colbuilder5.build());

            api.DBValue.Builder colbuilder6 = api.DBValue.newBuilder();         // f double
            colbuilder6.setFloat64Value(row);
            builder.addColumns(colbuilder6.build());

            api.DBValue.Builder colbuilder7 = api.DBValue.newBuilder();         // g text
            colbuilder7.setStringValue(col7);
            builder.addColumns(colbuilder7.build());

            api.DBValue.Builder colbuilder8 = api.DBValue.newBuilder();         // h bytea
            colbuilder8.setBytesValue(ByteString.copyFrom((col7).getBytes()));
            builder.addColumns(colbuilder8.build());

            api.DBValue.Builder colbuilder9 = api.DBValue.newBuilder();         // i timestamp without time zone
            colbuilder9.setTimeStampValue(Timestamp.newBuilder()
                    .setSeconds(System.currentTimeMillis() /1000)
                    .setNanos((int) ((System.currentTimeMillis() %1000) *1000000))
                    .build());
            builder.addColumns(colbuilder9.build());

            api.DBValue.Builder colbuilder10 = api.DBValue.newBuilder();         // j timestamp with time zone
            colbuilder10.setTimeStampValue(Timestamp.newBuilder()
                    .setSeconds(System.currentTimeMillis() /1000)
                    .setNanos((int) ((System.currentTimeMillis() %1000) *1000000))
                    .build());
            builder.addColumns(colbuilder10.build());

            // build the row
            RowData.Builder rowbuilder = RowData.newBuilder().setData(builder.build().toByteString());

            // add the row
            rows.add(rowbuilder.build());

            // columninfo -> row -> rowdata
        }
    }

    public static void logMessage(String message) {
        SimpleDateFormat sdf = new SimpleDateFormat();
        sdf.applyPattern("yyyy-MM-dd HH:mm:ss a");
        System.out.println(sdf.format(new Date()) + " " + message);
    }

    public static void listSchema(GpssGrpc.GpssBlockingStub bStub, Session mSession) {
        ListSchemaRequest lReq = ListSchemaRequest.newBuilder()
                .setSession(mSession)
                .build();
        Schemas schemas = bStub.listSchema(lReq);
        for(Schema schema: schemas.getSchemasList()) {
            logMessage("Got schemas:" + schema.getName());
        }
    }

    public static Session buildSession(GpssGrpc.GpssBlockingStub bStub, String gpMasterHost, int gpMasterPort,
                                       String gpRoleName, String gpPasswd, String dbname) {

        /** create a connect request builder */
        logMessage("Starting create session...");
        ConnectRequest connReq = ConnectRequest.newBuilder()
                .setHost(gpMasterHost)
                .setPort(gpMasterPort)
                .setUsername(gpRoleName)
                .setPassword(gpPasswd)
                .setDB(dbname)
                .setUseSSL(false)
                .build();

        assert bStub != null;

        return bStub.connect(connReq);
    }

    public static void listTable(GpssGrpc.GpssBlockingStub bStub, Session mSession) {
        ListTableRequest ltReq = ListTableRequest.newBuilder()
                .setSession(mSession)
                .setSchema("public")
                .build();
        Tables tables = bStub.listTable(ltReq);
        logMessage("Got tables, size:" + tables.getTablesList().size());
        for(TableInfo table: tables.getTablesList()) {
            logMessage("Got table:" + table.getName() + " type:" + table.getType());
        }
    }

    public static void openForInsert(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
        /** open a table for write */
        Integer errLimit = 25;
        Integer errPct = 25;
        // create an insert option builder
        InsertOption iOpt = InsertOption.newBuilder()
                .setErrorLimitCount(errLimit)
                .setErrorLimitPercentage(errPct)
                .setTruncateTable(true)
                .addInsertColumns("a")
                .addInsertColumns("b")
                .addInsertColumns("c")
                .addInsertColumns("d")
                .addInsertColumns("e")
                .addInsertColumns("f")
                .addInsertColumns("g")
                .addInsertColumns("h")
                .addInsertColumns("i")
                .addInsertColumns("j")
                .build();

        // create an open request builder
        OpenRequest oReq = OpenRequest.newBuilder()
                .setSession(mSession)
                .setSchemaName(schemaName)
                .setTableName(tableName)
                //.setPreSQL("")
                //.setPostSQL("")
                //.setEncoding("")
                //.setTimeout(5)
                //.setStagingSchema("")
                .setInsertOption(iOpt)
                .build();

        // use the blocking stub to call the Open service; it returns nothing
        bStub.open(oReq);
    }

    public static void openForMerge(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
        /** open a table for write */
        Integer errLimit = 25;
        Integer errPct = 25;
        // create an insert option builder
        MergeOption mOpt = MergeOption.newBuilder()
                .setErrorLimitCount(errLimit)
                .setErrorLimitPercentage(errPct)
                .addInsertColumns("a")
                .addInsertColumns("b")
                .addInsertColumns("c")
                .addInsertColumns("d")
                .addInsertColumns("e")
                .addInsertColumns("f")
                .addInsertColumns("g")
                .addInsertColumns("h")
                .addInsertColumns("i")
                .addInsertColumns("j")
                .addMatchColumns("a")       // match columns
                .addMatchColumns("b")       // match columns
                .addUpdateColumns("g")      // update columns
                .addUpdateColumns("h")
//                .setCondition("into_table.a>5")
                .build();

        // create an open request builder
        OpenRequest oReq = OpenRequest.newBuilder()
                .setSession(mSession)
                .setSchemaName(schemaName)
                .setTableName(tableName)
//                .setPreSQL("")
                //.setPostSQL("")
                //.setEncoding("")
                .setTimeout(5)
                //.setStagingSchema("")
                .setMergeOption(mOpt)
                .build();

        // use the blocking stub to call the Open service; it returns nothing
        bStub.open(oReq);
    }

    public static void openForUpdate(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
        /** open a table for write */
        Integer errLimit = 25;
        Integer errPct = 25;
        // create an insert option builder
        UpdateOption uOpt = UpdateOption.newBuilder()
                .setErrorLimitCount(errLimit)
                .setErrorLimitPercentage(errPct)
                .addMatchColumns("a")       // match columns
                .addUpdateColumns("b")      // update columns
                .addUpdateColumns("c")
                .addUpdateColumns("g")
                .setCondition("into_table.a>5")
                .build();

        // create an open request builder
        OpenRequest oReq = OpenRequest.newBuilder()
                .setSession(mSession)
                .setSchemaName(schemaName)
                .setTableName(tableName)
//                .setPreSQL("")
                //.setPostSQL("")
                //.setEncoding("")
                .setTimeout(5)
                //.setStagingSchema("")
                .setUpdateOption(uOpt)
                .build();

        // use the blocking stub to call the Open service; it returns nothing
        bStub.open(oReq);
    }

    public static void describeTable(GpssGrpc.GpssBlockingStub bStub, Session mSession) {
        DescribeTableRequest dReq = DescribeTableRequest.newBuilder()
                .setSession(mSession)
                .setSchemaName(schemaName)
                .setTableName(tableName)
                .build();
        Columns columns = bStub.describeTable(dReq);
        for(ColumnInfo columnInfo: columns.getColumnsList()) {
            logMessage("Column:" + columnInfo.getName() +
                    " DatabaseType:" + columnInfo.getDatabaseType() +
                    " HasLength:" + columnInfo.getHasLength() +
                    " Length:" + columnInfo.getLength() +
                    " HasPrecisionScale:" + columnInfo.getHasPrecisionScale() +
                    " Precision:" + columnInfo.getPrecision() +
                    " Scale:" + columnInfo.getScale() +
                    " HasNullable:" + columnInfo.getHasNullable() +
                    " Nullable:" + columnInfo.getNullable()
            );
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = null;
        GpssGrpc.GpssBlockingStub bStub = null;
        Session mSession = null;
        try {
            // connect to GPSS gRPC service instance; create a channel and a blocking stub
            channel = ManagedChannelBuilder.forAddress(lissHost, lissPort).usePlaintext().build();
            bStub = GpssGrpc.newBlockingStub(channel);
            // use the blocking stub to call the Connect service
            mSession = buildSession(bStub, adbpgMasterHost, adbpgMasterPort, adbpgUserName, adbpgPasswd, dbname);
            logMessage("Got session id:" + mSession.getID());

            /** list schema */
            listSchema(bStub, mSession);

            /** list tables */
            listTable(bStub, mSession);

            /** describe public.loaninfo */
            describeTable(bStub, mSession);

            /** open a table for write */
            Integer errLimit = 25;
            Integer errPct = 25;
            // create an insert option builder、
            openForInsert(bStub, mSession, schemaName, tableName);
//            openForMerge(bStub, mSession, schemaName, tableName);
//            openForUpdate(bStub, mSession, schemaName, tableName);

            /** do the write stuff */
            ArrayList<RowData> rows = new ArrayList<>();
            buildRowData(rows, 20, 0, "upsert");

            // create a write request builder
            WriteRequest wReq = WriteRequest.newBuilder()
                    .setSession(mSession)
                    .addAllRows(rows)
                    .build();

            // use the blocking stub to call the Write service; it returns nothing
            logMessage("Starting write row data...");
            bStub.write(wReq);

//            Thread.sleep(60 * 60 * 1000);

            /** create a close request builder */
            TransferStats tStats = null;
            CloseRequest cReq = CloseRequest.newBuilder()
                    .setSession(mSession)
                    //.setMaxErrorRows(15)
                    .setAbort(true)
                    .build();

            /** use the blocking stub to call the Close service */
            tStats = bStub.close(cReq);

            /** display the result to stdout */
            logMessage("CloseRequest tStats: " + tStats.toString());
          
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            assert bStub != null;
            assert mSession != null;
            /** use the blocking stub to call the Disconnect service */
            bStub.disconnect(mSession);

            // shutdown the channel
            channel.shutdown().awaitTermination(7, TimeUnit.SECONDS);
        }
    }
}

Go

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/timestamppb"
	"liss_client/pb"
)

const (
	lisssshost        = "<liss_server_hostname>"
	lissport          = "<liss_server_port>"
	lissTargetAddress = lisssshost + ":" + lissport
	adbpghost         = "<adbpg_hostname>"
	adbpgport         = "<adbpg_port>"
	adbpgusername     = "<USERNAME>"
	adbpgpassword     = "<PASSWORD>"
	adbpgDB           = "<TARGET_DATABASE>"
	adbpgSchemaName   = "<TARGET_SCHEMA_NAME>"
  adbpgTableName		= "<TARGET_TABLE_NAME>"
)

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

func main() {
	// Prepare the new session
	conn, err := grpc.Dial(lissTargetAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
	checkErr(err)
	defer conn.Close()
	client := pb.NewGpssClient(conn)
	session, err := client.Connect(context.Background(), &pb.ConnectRequest{
		Host:     adbpghost,
		Port:     adbpgport,
		Username: adbpgusername,
		Password: adbpgpassword,
		DB:       adbpgDB,
	})
	checkErr(err)

	fmt.Println("=====================================")
	// List all the schemas of the specified database
	schema, err := client.ListSchema(context.Background(), &pb.ListSchemaRequest{Session: session})
	for schemaSeq, schemaName := range schema.Schemas {
		fmt.Println("schemaSeq: ", schemaSeq, ", schemaName: ", schemaName.Name, ", schemaOnwer: ", schemaName.Owner)
	}

	// List all the tables of the specified schema
	fmt.Println("=====================================")
	tables, err := client.ListTable(context.Background(), &pb.ListTableRequest{Session: session, Schema: adbpgSchemaName})
	checkErr(err)
	for _, table := range tables.Tables {
		fmt.Println("TableName: ", table.Name, ", TableType: ", table.Type)
	}

	// describe a table
	fmt.Println("=====================================")
	tableName := adbpgTableName
	columns, err := client.DescribeTable(context.Background(), &pb.DescribeTableRequest{
		Session:    session,
		SchemaName: adbpgSchemaName,
		TableName:  tableName,
	})
	checkErr(err)

	for _, column := range columns.Columns {
		fmt.Println("ColumnName: ", column.Name,
			", DatabaseType: ", column.DatabaseType,
			", HasLength: ", column.HasLength,
			", Length: ", column.Length,
			", HasPrecisionScale: ", column.HasPrecisionScale,
			", Precision: ", column.Precision,
			", Scale: ", column.Scale,
			", HasNullable: ", column.GetHasNullable(),
			", Nullable: ", column.GetNullable())
	}

	// insert data
	fmt.Println("=====================================")
	openForInsert(client, session, adbpgSchemaName, tableName)
	rowsData := buildRows(10, "Insert operation")
	writeRequest := pb.WriteRequest{Session: session, Rows: rowsData}
	_, err = client.Write(context.Background(), &writeRequest)
	checkErr(err)
	transferStats, err := client.Close(context.Background(), &pb.CloseRequest{Session: session})
	checkErr(err)
	fmt.Println("TransferStats: ", transferStats.String())

	// merge data
	fmt.Println("=====================================")
	openForMerge(client, session, adbpgSchemaName, tableName)
	rowsData = buildRows(20, "Merge operation")
	writeRequest = pb.WriteRequest{Session: session, Rows: rowsData}
	_, err = client.Write(context.Background(), &writeRequest)
	checkErr(err)
	transferStats, err = client.Close(context.Background(), &pb.CloseRequest{Session: session})
	checkErr(err)
	fmt.Println("TransferStats: ", transferStats.String())

	// update data
	fmt.Println("=====================================")
	openForUpdate(client, session, adbpgSchemaName, tableName)
	rowsData = buildRows(20, "Update operation")
	writeRequest = pb.WriteRequest{Session: session, Rows: rowsData}
	_, err = client.Write(context.Background(), &writeRequest)
	checkErr(err)
	transferStats, err = client.Close(context.Background(), &pb.CloseRequest{Session: session})
	checkErr(err)
	fmt.Println("TransferStats: ", transferStats.String())

	// disconnect after all operations are done
	fmt.Println("=====================================")
	client.Disconnect(context.Background(), session)

}

func openForInsert(client pb.GpssClient, msession *pb.Session, mSchemaName string, mTableName string) {
	var errLimit int64 = 25
	var errPct int32 = 25

	insertCols := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}

	iOption := pb.InsertOption{
		ErrorLimitCount:      errLimit,
		ErrorLimitPercentage: errPct,
		TruncateTable:        true, // true: truncate data before insert operation
		InsertColumns:        insertCols,
	}

	oRequest := pb.OpenRequest{
		Session:       msession,
		Option:        &pb.OpenRequest_InsertOption{InsertOption: &iOption},
		SchemaName:    adbpgSchemaName,
		TableName:     adbpgTableName,
		StagingSchema: adbpgSchemaName,
	}

	_, err := client.Open(context.Background(), &oRequest)
	checkErr(err)
}

func openForMerge(client pb.GpssClient, msession *pb.Session, mSchemaName string, mTableName string) {
	var errLimit int64 = 25
	var errPct int32 = 25

	insertCols := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
	matchCols := []string{"a", "b"}
	updateCols := []string{"g"}

	mOption := pb.MergeOption{
		ErrorLimitCount:      errLimit,
		ErrorLimitPercentage: errPct,
		InsertColumns:        insertCols,
		MatchColumns:         matchCols,
		UpdateColumns:        updateCols,
	}

	oRequest := pb.OpenRequest{
		Session:       msession,
		Option:        &pb.OpenRequest_MergeOption{MergeOption: &mOption},
		SchemaName:    adbpgSchemaName,
		TableName:     adbpgTableName,
		StagingSchema: adbpgSchemaName,
	}

	_, err := client.Open(context.Background(), &oRequest)
	checkErr(err)
}

func openForUpdate(client pb.GpssClient, msession *pb.Session, mSchemaName string, mTableName string) {
	var errLimit int64 = 25
	var errPct int32 = 25

	matchCols := []string{"a", "b"}
	updateCols := []string{"g"}

	uOption := pb.UpdateOption{
		ErrorLimitCount:      errLimit,
		ErrorLimitPercentage: errPct,
		MatchColumns:         matchCols,
		UpdateColumns:        updateCols,
		Condition:            "into_table.a>15", // into_table is the alias of mTableName
	}

	oRequest := pb.OpenRequest{
		Session:       msession,
		Option:        &pb.OpenRequest_UpdateOption{UpdateOption: &uOption},
		SchemaName:    adbpgSchemaName,
		TableName:     adbpgTableName,
		StagingSchema: adbpgSchemaName,
	}

	_, err := client.Open(context.Background(), &oRequest)
	checkErr(err)
}

func buildRows(rownum int, desc string) []*pb.RowData {
	var rowsData []*pb.RowData
	// a for loop to create 1000 rows
	for i := 0; i < rownum; i++ {
		var columns []*pb.DBValue
		columns = append(columns,
			&pb.DBValue{DBType: &pb.DBValue_Int32Value{Int32Value: int32(i)}},
			&pb.DBValue{DBType: &pb.DBValue_Int32Value{Int32Value: int32(i)}},
			&pb.DBValue{DBType: &pb.DBValue_Int64Value{Int64Value: int64(i)}},
			&pb.DBValue{DBType: &pb.DBValue_Int64Value{Int64Value: int64(i)}},
			&pb.DBValue{DBType: &pb.DBValue_Float32Value{Float32Value: float32(i)}},
			&pb.DBValue{DBType: &pb.DBValue_Float64Value{Float64Value: float64(i)}},
			&pb.DBValue{DBType: &pb.DBValue_StringValue{StringValue: desc}},
			&pb.DBValue{DBType: &pb.DBValue_BytesValue{BytesValue: []byte(desc)}},
			&pb.DBValue{DBType: &pb.DBValue_TimeStampValue{TimeStampValue: timestamppb.Now()}},
			&pb.DBValue{DBType: &pb.DBValue_TimeStampValue{TimeStampValue: timestamppb.Now()}},
		)

		row := pb.Row{Columns: columns}
		data, _ := proto.Marshal(&row)

		// covert row to []bytes
		rowData := pb.RowData{Data: data}
		rowsData = append(rowsData, &rowData)
	}

	return rowsData
}

附录:数据类型映射

gRPC数据类型

AnalyticDB PostgreSQL数据类型

Int32Value

Integer或Serial

Int64Value

Bigint或Bigserial

Float32Value

Real

Float64Value

Double

StringValue

Text (any kind of data)

BytesValue

Bytea

TimeStampValue

Time或Timestamp (without time zone)

  • 本页导读 (1)
文档反馈