This topic describes how to read data from and write data to Fluss using the Fluss Spark Connector in Alibaba Cloud EMR Serverless Spark.
Prerequisites
You have activated ApsaraMQ for Fluss and obtained the
bootstrap.serversaddress and built-in account credentials from the overview page of your target cluster in the Fluss console.You have created an Alibaba Cloud EMR Serverless Spark workspace. For more information, see Create a workspace.
You have established a network connection between the VPC where your Alibaba Cloud EMR Serverless Spark workspace resides and the VPC where your Fluss cluster is located. For more information, see Connect an EMR Serverless Spark workspace to another VPC.
Limitations
Alibaba Cloud EMR Serverless Spark engine versions esr-4.9.0 and later include a built-in Fluss Spark Connector that supports reading from and writing to all types of Fluss tables.
Specific connector versions downloaded from Maven can only read from and write to Fluss log tables.
Engine versions earlier than esr-4.9.0 do not include the built-in connector. You must reference a specific JAR package. For more information, see Use a specific version of the connector.
Register a Fluss catalog
With engine version esr-4.9.0 and later, Alibaba Cloud EMR Serverless Spark includes a built-in Fluss Spark Connector. To use the connector, register the Fluss catalog in the Spark configuration section when you create a batch job or session.
Log in to the EMR Serverless Spark console and navigate to your target workspace.
Create a batch job or session.
Batch job: In the left-side navigation pane, click Data Development and create a SQL batch job.
SQL session/notebook session: In the left-side navigation pane, click Session Management, and then click Create SQL session or Create notebook session.
Configure the following parameters.
Engine version: Select esr-4.9.0 or later.
Network connection: Select the existing network connection to the VPC of your Fluss cluster.
Resource queue: Select the appropriate queue.
In the Advanced Settings > Spark configuration section, add the following configurations to register the Fluss catalog.
spark.sql.catalog.fluss_catalog org.apache.fluss.spark.SparkCatalog spark.sql.catalog.fluss_catalog.bootstrap.servers <bootstrap_servers> spark.sql.extensions org.apache.fluss.spark.FlussSparkSessionExtensions spark.sql.catalog.fluss_catalog.client.security.protocol SASL spark.sql.catalog.fluss_catalog.client.security.sasl.mechanism PLAIN spark.sql.catalog.fluss_catalog.client.security.sasl.username <username> spark.sql.catalog.fluss_catalog.client.security.sasl.password <password>The following table describes the configuration parameters.
Parameter
Description
Required
Notes
spark.sql.catalog.fluss_catalogThe catalog implementation class.
Yes
Set the value to
org.apache.fluss.spark.SparkCatalog. Here,fluss_catalogis a custom name for your catalog that you can reference in subsequent SQL statements.spark.sql.catalog.fluss_catalog.bootstrap.serversThe bootstrap server address for connecting to the Fluss cluster.
Yes
Your Fluss cluster's service endpoint.
spark.sql.extensionsThe Fluss Spark SQL extension class.
Yes
Set the value to
org.apache.fluss.spark.FlussSparkSessionExtensions.spark.sql.catalog.fluss_catalog.client.security.protocolThe authentication framework.
Yes
Set the value to
SASL.spark.sql.catalog.fluss_catalog.client.security.sasl.mechanismThe authentication mechanism.
Yes
Set the value to
PLAIN.spark.sql.catalog.fluss_catalog.client.security.sasl.usernameThe built-in username.
Yes
Your Fluss instance's built-in username.
spark.sql.catalog.fluss_catalog.client.security.sasl.passwordThe built-in password.
Yes
Your Fluss instance's built-in password.
Click Create or Run to complete the catalog registration.
Manage databases and tables
After registering the catalog, run the following statements in a Spark SQL script within a batch job, SQL session, or notebook session to manage Fluss databases and tables.
-- View all databases in the catalog
SHOW DATABASES IN fluss_catalog;
-- Create a database
CREATE DATABASE IF NOT EXISTS fluss_catalog.testdb;
-- View all tables in the database
SHOW TABLES IN fluss_catalog.testdb;
-- Create a table
CREATE TABLE IF NOT EXISTS fluss_catalog.testdb.test (id INT, name STRING);Write data
Use the INSERT INTO statement to write data to a Fluss table.
INSERT INTO fluss_catalog.testdb.test VALUES(1, 'Bob'); Read data
Batch read
By default, reading from a Fluss table is a batch operation that retrieves all data from the table.
SELECT * FROM fluss_catalog.testdb.test; Accelerated read
SET spark.sql.fluss.read.optimized=true;
SELECT * FROM fluss_catalog.testdb.test; Union read
After you enable union read for a Fluss table, queries automatically merge real-time Fluss data with historical data archived to a data lake (Paimon), eliminating the need to manually union the data at the SQL layer.
Enable union read for the table.
ALTER TABLE fluss_catalog.testdb.test SET TBLPROPERTIES ('table.datalake.enabled' = 'true');Query the table directly to get the combined results.
SELECT * FROM fluss_catalog.testdb.test;
Use a specific version of the connector
If the built-in connector does not meet your requirements, or if you are using an engine version earlier than esr-4.9.0, you can manually reference a specific version of the Fluss Spark Connector.
Download the Fluss Spark Connector JAR.
curl -fLO "https://repo1.maven.org/maven2/org/apache/fluss/fluss-spark-3.5_2.12/0.9.1-incubating/fluss-spark-3.5_2.12-0.9.1-incubating.jar"Upload the JAR to Object Storage Service (OSS).
When creating a batch job or session, add the following two properties to the Spark configuration section in addition to the catalog settings. These properties are used to exclude the built-in module and load a custom JAR.
spark.emr.serverless.excludedModules fluss spark.emr.serverless.user.defined.jars oss://<bucket>/path/fluss-spark-3.5_2.12-0.9.1-incubating.jar