Integrate with Spark

更新时间:
复制 MD 格式

This topic describes how to read data from and write data to Fluss using the Fluss Spark Connector in Alibaba Cloud EMR Serverless Spark.

Prerequisites

  • You have activated ApsaraMQ for Fluss and obtained the bootstrap.servers address and built-in account credentials from the overview page of your target cluster in the Fluss console.

  • You have created an Alibaba Cloud EMR Serverless Spark workspace. For more information, see Create a workspace.

  • You have established a network connection between the VPC where your Alibaba Cloud EMR Serverless Spark workspace resides and the VPC where your Fluss cluster is located. For more information, see Connect an EMR Serverless Spark workspace to another VPC.

Limitations

  • Alibaba Cloud EMR Serverless Spark engine versions esr-4.9.0 and later include a built-in Fluss Spark Connector that supports reading from and writing to all types of Fluss tables.

  • Specific connector versions downloaded from Maven can only read from and write to Fluss log tables.

  • Engine versions earlier than esr-4.9.0 do not include the built-in connector. You must reference a specific JAR package. For more information, see Use a specific version of the connector.

Register a Fluss catalog

With engine version esr-4.9.0 and later, Alibaba Cloud EMR Serverless Spark includes a built-in Fluss Spark Connector. To use the connector, register the Fluss catalog in the Spark configuration section when you create a batch job or session.

  1. Log in to the EMR Serverless Spark console and navigate to your target workspace.

  2. Create a batch job or session.

    • Batch job: In the left-side navigation pane, click Data Development and create a SQL batch job.

    • SQL session/notebook session: In the left-side navigation pane, click Session Management, and then click Create SQL session or Create notebook session.

  3. Configure the following parameters.

    • Engine version: Select esr-4.9.0 or later.

    • Network connection: Select the existing network connection to the VPC of your Fluss cluster.

    • Resource queue: Select the appropriate queue.

  4. In the Advanced Settings > Spark configuration section, add the following configurations to register the Fluss catalog.

    spark.sql.catalog.fluss_catalog org.apache.fluss.spark.SparkCatalog
    spark.sql.catalog.fluss_catalog.bootstrap.servers <bootstrap_servers>
    spark.sql.extensions org.apache.fluss.spark.FlussSparkSessionExtensions
    spark.sql.catalog.fluss_catalog.client.security.protocol SASL
    spark.sql.catalog.fluss_catalog.client.security.sasl.mechanism PLAIN
    spark.sql.catalog.fluss_catalog.client.security.sasl.username <username>
    spark.sql.catalog.fluss_catalog.client.security.sasl.password <password>    

    The following table describes the configuration parameters.

    Parameter

    Description

    Required

    Notes

    spark.sql.catalog.fluss_catalog

    The catalog implementation class.

    Yes

    Set the value to org.apache.fluss.spark.SparkCatalog. Here, fluss_catalog is a custom name for your catalog that you can reference in subsequent SQL statements.

    spark.sql.catalog.fluss_catalog.bootstrap.servers

    The bootstrap server address for connecting to the Fluss cluster.

    Yes

    Your Fluss cluster's service endpoint.

    spark.sql.extensions

    The Fluss Spark SQL extension class.

    Yes

    Set the value to org.apache.fluss.spark.FlussSparkSessionExtensions.

    spark.sql.catalog.fluss_catalog.client.security.protocol

    The authentication framework.

    Yes

    Set the value to SASL.

    spark.sql.catalog.fluss_catalog.client.security.sasl.mechanism

    The authentication mechanism.

    Yes

    Set the value to PLAIN.

    spark.sql.catalog.fluss_catalog.client.security.sasl.username

    The built-in username.

    Yes

    Your Fluss instance's built-in username.

    spark.sql.catalog.fluss_catalog.client.security.sasl.password

    The built-in password.

    Yes

    Your Fluss instance's built-in password.

  5. Click Create or Run to complete the catalog registration.

Manage databases and tables

After registering the catalog, run the following statements in a Spark SQL script within a batch job, SQL session, or notebook session to manage Fluss databases and tables.

-- View all databases in the catalog
SHOW DATABASES IN fluss_catalog;
-- Create a database
CREATE DATABASE IF NOT EXISTS fluss_catalog.testdb;
-- View all tables in the database
SHOW TABLES IN fluss_catalog.testdb;
-- Create a table
CREATE TABLE IF NOT EXISTS fluss_catalog.testdb.test (id INT, name STRING);

Write data

Use the INSERT INTO statement to write data to a Fluss table.

INSERT INTO fluss_catalog.testdb.test VALUES(1, 'Bob');        

Read data

Batch read

By default, reading from a Fluss table is a batch operation that retrieves all data from the table.

SELECT * FROM fluss_catalog.testdb.test;          

Accelerated read

Accelerated read scans only the snapshot data of a primary key table and skips the log merge step. This mode is ideal for latency-sensitive queries that do not require strong consistency.

SET spark.sql.fluss.read.optimized=true;
SELECT * FROM fluss_catalog.testdb.test;         

Union read

After you enable union read for a Fluss table, queries automatically merge real-time Fluss data with historical data archived to a data lake (Paimon), eliminating the need to manually union the data at the SQL layer.

  • Enable union read for the table.

    ALTER TABLE fluss_catalog.testdb.test SET TBLPROPERTIES ('table.datalake.enabled' = 'true');         
  • Query the table directly to get the combined results.

    SELECT * FROM fluss_catalog.testdb.test;

Use a specific version of the connector

If the built-in connector does not meet your requirements, or if you are using an engine version earlier than esr-4.9.0, you can manually reference a specific version of the Fluss Spark Connector.

  1. Download the Fluss Spark Connector JAR.

    curl -fLO "https://repo1.maven.org/maven2/org/apache/fluss/fluss-spark-3.5_2.12/0.9.1-incubating/fluss-spark-3.5_2.12-0.9.1-incubating.jar"        
  2. Upload the JAR to Object Storage Service (OSS).

  3. When creating a batch job or session, add the following two properties to the Spark configuration section in addition to the catalog settings. These properties are used to exclude the built-in module and load a custom JAR.

    spark.emr.serverless.excludedModules fluss
    spark.emr.serverless.user.defined.jars oss://<bucket>/path/fluss-spark-3.5_2.12-0.9.1-incubating.jar