The LogHub (SLS) data source lets you read data from and write data to Simple Log Service (SLS). This topic describes how DataWorks supports data synchronization for LogHub (SLS).
Features
You can use Simple Log Service (SLS) for the following data synchronization scenarios:
-
Across different regions.
-
Between different Alibaba Cloud accounts.
-
Within the same Alibaba Cloud account.
-
Between public cloud and financial cloud accounts.
Limitations
When Data Integration performs an offline write to LogHub (SLS), data duplication may occur if a task is rerun after a failover. This is because LogHub (SLS) is not idempotent.
Supported field types
Data Integration supports reading and writing the following LogHub (SLS) field types.
Field type | Offline read (LogHub (SLS) Reader) | Offline write (LogHub (SLS) Writer) | Real-time read |
STRING | Supported | Supported | Supported |
Details:
When writing data to LogHub (SLS) in offline mode
All supported data types are converted to the STRING type before being written to LogHub (SLS). The following table lists the data type conversions for the LogHub (SLS) writer.
Supported Data Integration internal type
Data type when writing to LogHub (SLS)
LONG
STRING
DOUBLE
STRING
STRING
STRING
DATE
STRING
BOOLEAN
STRING
BYTES
STRING
When reading data from LogHub (SLS) in real-time mode
The following metadata fields are included by default.
LogHub (SLS) real-time synchronization field
Data type
Description
__time__
STRING
SLS reserved field: __time__. The log time specified when the log data is written. This is a UNIX timestamp in seconds.
__source__
STRING
SLS reserved field: __source__. The source device of the log.
__topic__
STRING
SLS reserved field: __topic__. The topic name.
__tag__:__receive_time__
STRING
The time when the log arrives at the server. If you enable the feature to record public IP addresses, the server appends this field to the raw log upon receipt. This is a UNIX timestamp in seconds.
__tag__:__client_ip__
STRING
The public IP address of the log source device. If you enable the feature to record public IP addresses, the server appends this field to the raw log upon receipt.
__tag__:__path__
STRING
The path of the log file collected by Logtail. Logtail automatically appends this field to the log.
__tag__:__hostname__
STRING
The hostname of the machine from which Logtail collects data. Logtail automatically appends this field to the log.
Create a data source
Configure the data source
Before you develop a data synchronization task, you must create a corresponding data source in DataWorks. For more information, see Manage data sources. The configuration UI provides detailed parameter descriptions as on-screen tips.
Create a cross-account data source
This example shows how to synchronize data from a LogHub service in Account A to a MaxCompute service in Account B using a task configured in Account B. Note the following points for cross-account data synchronization:
-
Create a LogHub (SLS) data source using the AccessKey ID and AccessKey secret of Account A.
This configuration allows Account B to synchronize data from all Simple Log Service (SLS) projects in Account A.
-
Create a LogHub (SLS) data source using the AccessKey ID and AccessKey secret of a RAM user (for example, A1) in Account A.
-
Account A grants general permissions for Simple Log Service (SLS) to RAM user A1, such as
AliyunLogFullAccessandAliyunLogReadOnlyAccess. For more information, see Create a RAM user and grant permissions.NoteGranting the
AliyunLogFullAccessandAliyunLogReadOnlyAccesssystem policies allows the RAM user to query all Simple Log Service resources in the main account. -
Account A grants a custom permission policy for Simple Log Service (SLS) to RAM user A1.
Account A goes to the page, and clicks Create Policy.
For more information about authorization, see Introduction to RAM and Overview.
After granting permissions based on the following policy, Account B can synchronize data only from the
project_name1andproject_name2projects in Simple Log Service (SLS) using RAM user A1.{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", "log:List*", "log:CreateConsumerGroup", "log:UpdateConsumerGroup", "log:DeleteConsumerGroup", "log:ListConsumerGroup", "log:ConsumerGroupUpdateCheckPoint", "log:ConsumerGroupHeartBeat", "log:GetConsumerGroupCheckPoint" ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
-
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
When LogHub is used as the source, you can filter data using LogHub query syntax or Structured Process Language (SPL) statements. For more information about the syntax, see Appendix 2: SPL syntax for filtering.
Configure a single-table offline synchronization task
For more information, see Use the Codeless UI and Use the Code Editor.
NoteWhen you configure a synchronization task in the codeless UI, ensure that the parameter format matches the format described in Appendix 1: Script examples and parameter descriptions.
For information about all parameters and a script example for the code editor, see Appendix 1: Script examples and parameter descriptions.
Configure a single-table real-time synchronization task
For the procedure, see Configure a single-table real-time synchronization task.
Configure a whole-database real-time synchronization task
For more information, see Configure a whole-database real-time synchronization task.
FAQ
Data exists in a LogHub source field but is empty after synchronization.
The fields that are read from LogHub during field mapping are not as expected.
For more Data Integration FAQs, see Data Integration FAQ.
Appendix 1: Script examples and parameter descriptions
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script example
{
"type":"job",
"version":"2.0",// Version number.
"steps":[
{
"stepType":"LogHub",// Plug-in name.
"parameter":{
"datasource":"",// Data source.
"column":[// Fields.
"col0",
"col1",
"col2",
"col3",
"col4",
"C_Category",
"C_Source",
"C_Topic",
"C_MachineUUID", // Topic.
"C_HostName", // Hostname.
"C_Path", // Path.
"C_LogTime" // Event time.
],
"beginDateTime":"",// The start time for data consumption.
"batchSize":"",// The number of data entries to query from Simple Log Service at a time.
"endDateTime":"",// The end time for data consumption.
"fieldDelimiter":",",// Column delimiter.
"logstore":""// The name of the destination Logstore.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The number of error records.
},
"speed":{
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect, and the data rate is not limited. If throttle is set to true, the data rate is limited.
"concurrent":1, // The number of concurrent jobs.
"mbps":"12"// The maximum data rate. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader script parameters
Parameter | Description | Required | Default value |
endPoint | The endpoint of Simple Log Service. The endpoint is the URL used to access a project and its log data. The endpoint is related to the Alibaba Cloud region where the project is located and the project name. For the endpoints of each region, see Endpoints. | Yes | None |
accessId | The AccessKey ID used to access Simple Log Service. It identifies the user. | Yes | None |
accessKey | The AccessKey secret used to access Simple Log Service. It authenticates the user. | Yes | None |
project | The name of the destination Simple Log Service project. A project is a resource management unit in Simple Log Service used to isolate and control resources. | Yes | None |
logstore | The name of the destination Logstore. A Logstore is the unit for log data collection, storage, and query in Simple Log Service. | Yes | None |
batchSize | The number of data entries to query from Simple Log Service at a time. | No | 128 |
column | The column names in each data entry. You can configure metadata from Simple Log Service as synchronization columns. Simple Log Service supports metadata such as topics, unique machine group identifiers, hostnames, paths, and log times. Note Column names are case-sensitive. For information about how to write metadata, see Simple Log Service machine groups. | Yes | None |
beginDateTime |
The start offset for data consumption. This is the time when the log data arrives at LogHub (SLS). This parameter specifies the start of the time range, which is inclusive. The time must be a string in the yyyyMMddHHmmss format, such as 20180111013000. You can use this parameter with DataWorks scheduling parameters. For example, on the Scheduling Configuration tab on the right side of the node configuration page, set Parameter to Note
|
Yes |
None |
endDateTime |
The end offset for data consumption. This parameter specifies the end of the time range, which is exclusive. The time must be a string in the yyyyMMddHHmmss format, such as 20180111013010. You can use this parameter with DataWorks scheduling parameters. For example, on the Scheduling Configuration tab on the right side of the node configuration page, set Parameter to endDateTime=${yyyymmdd}. Then, set Log End Time to ${endDateTime}000000. This sets the log end time to 00:00:00 on the day after the business date. For more information, see Supported formats of scheduling parameters. Important
|
Yes |
None |
query | Filters data in LogHub using the LogHub query syntax or SPL statements. SPL (Structured Process Language) is the syntax used by SLS to process logs. | Yes | None |
If data is missing after it is read from LogHub, go to the LogHub console and check whether the receive_time metadata field is within the time range configured for the task.
Writer script example
{
"type": "job",
"version": "2.0",// Version number.
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "LogHub",// Plug-in name.
"parameter": {
"datasource": "",// Data source.
"column": [// Fields.
"col0",
"col1",
"col2",
"col3",
"col4",
"col5"
],
"topic": "",// Select a topic.
"batchSize": "1024",// The number of records in a batch submission.
"logstore": ""// The name of the destination Simple Log Service Logstore.
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// The number of error records.
},
"speed": {
"throttle":true,// If throttle is set to false, the mbps parameter does not take effect, and the data rate is not limited. If throttle is set to true, the data rate is limited.
"concurrent":3, // The number of concurrent jobs.
"mbps":"12"// The maximum data rate. 1 mbps = 1 MB/s.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer script parameters
The LogHub (SLS) writer retrieves data from the reader through the Data Integration framework. The writer then converts the supported Data Integration data types to the STRING type. When the number of records reaches the specified batchSize, the data is pushed to LogHub (SLS) in a single batch using the Simple Log Service Java SDK.
Parameter | Description | Required | Default value |
endpoint | The endpoint of Simple Log Service. The endpoint is the URL used to access a project and its log data. The endpoint is related to the Alibaba Cloud region where the project is located and the project name. For the endpoints of each region, see: Endpoints. | Yes | None |
accessKeyId |
The AccessKey ID for accessing Simple Log Service (SLS). |
Yes |
None |
accessKeySecret | The AccessKeySecret used to access Simple Log Service. | Yes | None |
project | The name of the destination Simple Log Service project. | Yes | None |
logstore | The name of the destination Logstore. A Logstore is the unit for log data collection, storage, and query in Simple Log Service. | Yes | None |
topic | The topic name in the destination Simple Log Service. | No | Empty string |
batchSize | The number of data entries to synchronize to LogHub (SLS) at a time. The default value is 1,024. The maximum value is 4,096. Note The size of data synchronized to LogHub (SLS) in a single batch cannot exceed 5 MB. Adjust the number of entries to push at a time based on the size of a single data entry. | No | 1,024 |
column | The column names in each data entry. | Yes | None |
Appendix 2: SPL syntax for filtering
When LogHub is used as the source, you can filter data from LogHub using the LogHub query syntax or Structured Process Language (SPL) statements. The following table describes the syntax.
For more information about SPL, see SPL syntax.
Scenario | SQL statement | SPL statement |
Data filtering | |
|
Field processing and filtering | Select specific fields and rename them: |
|
Data cleansing (calling SQL functions) | Convert data types, parse time, and so on: | Convert data types, parse time, and so on: |
Field extraction | Regular expression extraction: JSON extraction: |
|