Store e-commerce order data in Tablestore, run real-time and batch aggregations with E-MapReduce (EMR) Spark, and display the results on DataV dashboards.
Solution overview
|
Category |
Description |
|
Use cases |
|
|
Technical approach |
|
|
Products involved |
|
Solution design
The data flows through the following stages:
An SQL client writes raw order data to the Tablestore data table (OrderSource) in real time.
-
The EMR Spark engine runs stream and batch computations on the order data and writes the aggregated results back to Tablestore data tables.
Real-time stream processing: Spark Structured Streaming calculates the order count and total amount for each time window.
Offline batch processing: Spark SQL aggregates the total amount across all orders and the total amount per user.
DataV connects to the Tablestore aggregation result tables and displays the real-time and offline results on dashboards.
The following Tablestore data tables are used in this solution.
|
Table |
Primary key / attribute columns |
Description |
|
OrderSource |
PK: UserId (String), OrderId (String)Attribute columns: price (Double), timestamp (Long) |
Raw order data table that stores all order records. Each order contains a user ID, order ID, order amount, and timestamp. |
|
OrderStreamSink |
PK: begin (String), end (String)Attribute columns: count (Long), totalPrice (Double) |
Real-time stream processing result table. Spark Structured Streaming aggregates order data in 30-second windows and writes one row per window containing the order count and total amount. |
|
OrderBatchSink |
PK: UserId (String)Attribute columns: count (Long), totalPrice (Double) |
Per-user batch processing result table. Offline batch processing writes the cumulative order count and total spending for each user. |
|
OrderTotalSink |
PK: count (Long)Attribute columns: totalPrice (Double) |
Global batch processing result table. Offline batch processing writes the global order count and total amount. |
Prerequisites
Before you begin, Activate service and create instance. If you already have a usable instance, skip this step.
-
An EMR on ECS cluster. For more information, see Create a cluster. Note the following configuration requirements:
Parameter
Description
Business Scenario
Select Data Lake.
Product Version
To use real-time stream processing (the
streaming-sqlCREATE STREAM/CREATE SCANsyntax), select EMR-5.17.4 or earlier.NoteThe SPARK-EXTENSION component of EMR-5.21.0 has compatibility issues with SPARK3 (3.5.3). If you only need offline batch processing, select the latest product version.
Optional Services
Select SPARK3 (selected by default).
-
The EMR cluster VPC must be bound to the Tablestore instance. For more information, see Bind a VPC to an instance.
ImportantAfter you bind the VPC, use the VPC Address of the bound VPC to access Tablestore.
Implementation
The following steps use Java SDK to create Tablestore resources and the streaming-sql client of EMR 5.17.4 (Spark 3.4.2) to run Spark SQL operations.
Step 1: Create data tables and a tunnel
Use the Java SDK to create four data tables (OrderSource, OrderStreamSink, OrderBatchSink, and OrderTotalSink), and create an incremental tunnel for the OrderSource table for real-time stream processing.
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import java.util.*;
public class CreateTablesAndTunnel {
public static void main(String[] args) {
// Initialize V4 signature client.
String endpoint = System.getenv("OTS_ENDPOINT");
String accessKeyId = System.getenv("OTS_AK_ENV");
String accessKeySecret = System.getenv("OTS_SK_ENV");
String instanceName = System.getenv("OTS_INSTANCE");
String region = System.getenv("OTS_REGION");
DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));
// 1. Create OrderSource table (PK: UserId + OrderId).
createTable(client, "OrderSource", Arrays.asList(
new PrimaryKeySchema("UserId", PrimaryKeyType.STRING),
new PrimaryKeySchema("OrderId", PrimaryKeyType.STRING)));
// 2. Create OrderStreamSink table (PK: begin + end).
createTable(client, "OrderStreamSink", Arrays.asList(
new PrimaryKeySchema("begin", PrimaryKeyType.STRING),
new PrimaryKeySchema("end", PrimaryKeyType.STRING)));
// 3. Create OrderBatchSink table (PK: UserId).
createTable(client, "OrderBatchSink", Collections.singletonList(
new PrimaryKeySchema("UserId", PrimaryKeyType.STRING)));
// 4. Create OrderTotalSink table (PK: count).
createTable(client, "OrderTotalSink", Collections.singletonList(
new PrimaryKeySchema("count", PrimaryKeyType.INTEGER)));
// 5. Create stream tunnel for OrderSource (required for real-time streaming).
TunnelClient tunnelClient = new TunnelClient(endpoint, accessKeyId, accessKeySecret, instanceName);
CreateTunnelRequest tunnelRequest = new CreateTunnelRequest(
"OrderSource", "OrderSourceTunnel", TunnelType.Stream);
CreateTunnelResponse tunnelResponse = tunnelClient.createTunnel(tunnelRequest);
System.out.println("Tunnel ID: " + tunnelResponse.getTunnelId());
System.out.println("Use this tunnel ID in Spark SQL: tunnel.id=\"" + tunnelResponse.getTunnelId() + "\"");
tunnelClient.shutdown();
client.shutdown();
}
private static void createTable(SyncClient client, String tableName, List<PrimaryKeySchema> pkSchema) {
TableMeta meta = new TableMeta(tableName);
for (PrimaryKeySchema pk : pkSchema) {
meta.addPrimaryKeyColumn(pk);
}
TableOptions options = new TableOptions(-1, 1);
CreateTableRequest request = new CreateTableRequest(meta, options);
try {
client.createTable(request);
System.out.println("Table " + tableName + " created.");
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("already exist")) {
System.out.println("Table " + tableName + " already exists.");
} else {
throw new RuntimeException(e);
}
}
}
}
After the code runs, the console outputs a tunnel ID. Record this ID — you need it for the tunnel.id parameter when creating a Spark external table.
Step 2: Write test data
Use the BatchWriteRow operation to write simulated e-commerce order data to the OrderSource table. The following example generates 500 orders for 5 users, each with a random amount and timestamp.
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.core.ResourceManager;
import com.alicloud.openservices.tablestore.core.auth.*;
import com.alicloud.openservices.tablestore.model.*;
import java.util.*;
public class InsertTestOrders {
public static void main(String[] args) {
String endpoint = System.getenv("OTS_ENDPOINT");
String accessKeyId = System.getenv("OTS_AK_ENV");
String accessKeySecret = System.getenv("OTS_SK_ENV");
String instanceName = System.getenv("OTS_INSTANCE");
String region = System.getenv("OTS_REGION");
DefaultCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
V4Credentials v4Credentials = V4Credentials.createByServiceCredentials(credentials, region);
CredentialsProvider provider = new DefaultCredentialProvider(v4Credentials);
SyncClient client = new SyncClient(endpoint, provider, instanceName, null, new ResourceManager(null, null));
Random random = new Random(42);
String[] users = {"user_001", "user_002", "user_003", "user_004", "user_005"};
long baseTime = System.currentTimeMillis();
int totalRows = 0;
// Insert 500 orders in batches of 200.
List<RowPutChange> batch = new ArrayList<>();
for (int i = 0; i < 500; i++) {
String userId = users[random.nextInt(users.length)];
String orderId = String.format("order_%06d", i + 1);
double price = Math.round((10.0 + random.nextDouble() * 990.0) * 100.0) / 100.0;
long timestamp = baseTime - (500 - i) * 1000L;
PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("UserId", PrimaryKeyValue.fromString(userId))
.addPrimaryKeyColumn("OrderId", PrimaryKeyValue.fromString(orderId))
.build();
RowPutChange row = new RowPutChange("OrderSource", pk);
row.addColumn("price", ColumnValue.fromDouble(price));
row.addColumn("timestamp", ColumnValue.fromLong(timestamp));
batch.add(row);
if (batch.size() >= 200) {
writeBatch(client, batch);
totalRows += batch.size();
batch.clear();
}
}
if (!batch.isEmpty()) {
writeBatch(client, batch);
totalRows += batch.size();
}
System.out.println("Inserted " + totalRows + " orders into OrderSource.");
client.shutdown();
}
private static void writeBatch(SyncClient client, List<RowPutChange> rows) {
BatchWriteRowRequest request = new BatchWriteRowRequest();
for (RowPutChange row : rows) {
request.addRowChange(row);
}
client.batchWriteRow(request);
}
}
Step 3: Start the Spark SQL client
Log on to the master node of the EMR cluster and run the following command to start the streaming-sql client.
EMR includes the streaming-sql client and the Tablestore connector (the emr-datasources_shaded JAR), so you do not need to download them manually. Load the Hadoop commons-net dependency at startup. The following example uses EMR 5.17.4 (Spark 3.4.2).
COMMONS_NET=$(find /opt/apps/HADOOP-COMMON -name "commons-net*.jar" | head -1)
streaming-sql --master yarn --deploy-mode client \
--driver-class-path $COMMONS_NET \
--jars $COMMONS_NET
After the client starts, an interactive SQL command line appears. Run the SQL statements in the following steps in this client.
Step 4: Create Spark external tables
Run CREATE TABLE ... USING tablestore to create Spark external tables that map to the Tablestore data tables. The Tablestore connector routes read and write operations on the external tables to the corresponding data tables.
Create an external table order_source for the raw order data. If you only need offline batch processing, leave tunnel.id blank.
DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderSource",
tunnel.id="<tunnel-id>",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"},
"OrderId": {"col": "OrderId", "type": "string"},
"price": {"col": "price", "type": "double"},
"timestamp": {"col": "timestamp", "type": "long"}}}'
);
|
Parameter |
Description |
|
endpoint |
The VPC Address of the Tablestore instance. |
|
access.key.id |
The AccessKey ID of your Alibaba Cloud account. |
|
access.key.secret |
The AccessKey Secret of your Alibaba Cloud account. |
|
instance.name |
The name of the Tablestore instance. |
|
table.name |
The name of the Tablestore data table. |
|
tunnel.id |
The ID of the Tablestore incremental tunnel. Required for real-time stream processing. Not required for offline batch processing. |
|
catalog |
The column schema definition in JSON format. Specifies column names, column mappings, and data types. |
Create external tables for the three result tables in the same way.
-- Create external table for OrderStreamSink.
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},
"end": {"col": "end", "type": "string"},
"count": {"col": "count", "type": "long"},
"totalPrice": {"col": "totalPrice", "type": "double"}}}'
);
-- Create external table for OrderBatchSink.
DROP TABLE IF EXISTS order_batch_sink;
CREATE TABLE order_batch_sink
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderBatchSink",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"},
"count": {"col": "count", "type": "long"},
"totalPrice": {"col": "totalPrice", "type": "double"}}}'
);
-- Create external table for OrderTotalSink.
DROP TABLE IF EXISTS order_total_sink;
CREATE TABLE order_total_sink
USING tablestore
OPTIONS(
endpoint="http://<instance-name>.<region>.vpc.ots.aliyuncs.com",
access.key.id="<AccessKey ID>",
access.key.secret="<AccessKey Secret>",
instance.name="<instance-name>",
table.name="OrderTotalSink",
catalog='{"columns": {"count": {"col": "count", "type": "long"},
"totalPrice": {"col": "totalPrice", "type": "double"}}}'
);
Step 5: Run offline batch processing
Run Spark SQL to aggregate all raw order data and write the results to the corresponding sink tables.
Run the following SQL statement to write per-user aggregation results to the order_batch_sink table.
INSERT INTO order_batch_sink
SELECT UserId, count(*) AS count, sum(price) AS totalPrice
FROM order_source
GROUP BY UserId;
Run the following SQL statement to write global aggregation results to the order_total_sink table.
INSERT INTO order_total_sink
SELECT count(*) AS count, sum(price) AS totalPrice
FROM order_source;
Run the following SQL statements to verify the aggregation results.
-- Verify the total number of orders.
SELECT count(*) AS total_orders FROM order_source;
-- Verify per-user aggregation results.
SELECT * FROM order_batch_sink;
-- Verify global aggregation results.
SELECT * FROM order_total_sink;
Step 6: Run real-time stream processing
Use Spark Structured Streaming to calculate the order count and total amount for each time window, and write the aggregated results to the order_stream_sink table.
Create a stream view on the order_source table.
CREATE SCAN order_source_stream_view ON order_source USING STREAM
OPTIONS("maxoffsetsperchannel"="10000");
Create a stream processing job that aggregates order data in 30-second windows.
CREATE STREAM job1
OPTIONS(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin,
CAST(window.end AS String) AS end,
count(*) AS count,
CAST(sum(price) AS Double) AS totalPrice
FROM order_source_stream_view
GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");
After the stream processing job starts, it runs continuously. When new order data arrives in the OrderSource table, Spark retrieves it through the Tablestore incremental tunnel, aggregates the data in 30-second windows, and writes the order count (count) and total amount (totalPrice) for each window to the OrderStreamSink table.
Step 7: (Optional) Visualize results with DataV
To display aggregated results on dashboards, connect DataV to the Tablestore data source.
Activate the DataV service. For more information, see Activate DataV-Board Service.
Add Tablestore as a data source. For more information, see Add a Tablestore data source.
The following table lists the recommended visualization components for each result table.
|
Result table |
Data |
Recommended component |
|
OrderTotalSink |
Global order count (count) and total amount (totalPrice) |
Number flipping or KPI card |
|
OrderStreamSink |
Real-time order count and amount trend for each 30-second window |
Real-time line chart or dynamic bar chart |
|
OrderBatchSink |
Per-user cumulative order count and spending ranking |
Leaderboard or horizontal bar chart |
Clean up resources
If you no longer need the resources created in this solution, clean them up to avoid unnecessary costs.
If you no longer need the EMR cluster, Release a cluster in the EMR console.
In the Tablestore console, delete the incremental tunnel (OrderSourceTunnel) from the OrderSource table, and then delete the four data tables: OrderSource, OrderStreamSink, OrderBatchSink, and OrderTotalSink.
If you no longer need the DataV service, unsubscribe from the service.