This document covers the configuration parameters for a Flink online training job that uses the LinUCB (Linear Upper Confidence Bound) contextual bandit algorithm. Configure Hologres storage, the LinUCB algorithm, and your data source (Kafka or DataHub) to get the training job running.
Hologres configurations
| Parameter | Description | Example |
|---|---|---|
endpoint | The Hologres endpoint, in the format Host:Port. | hgpostcn-cn-xxxx-cn-beijing-vpc.hologres.aliyuncs.com:80 |
dbname | The name of the Hologres database. | db_name |
tablename | The name of the Hologres table that stores model parameters. | rec.contextual_bandit_models |
username | The Hologres database username. | ${holo_user_name} |
password | The Hologres database password. | ${holo_password} |
insertOrUpdate | When set to true, inserts overwrite rows whose primary key values already exist. | true |
hologres.feature.table | The name of the Hologres table that stores features. | rec.contextual_bandit_features |
hologres.arm.table | The name of the Hologres table that stores subsequent arms. | rec.cold_start_item_table |
Algorithm configurations
| Parameter | Description | Example |
|---|---|---|
log.parser.parallelism | The parallelism of worker nodes that parse logs. | 4 |
parallelism | The parallelism of the LinUCB algorithm. | 4 |
linucb.algo | The LinUCB variant. Valid values: disjoint, hybrid. | hybrid |
browse.window.size | How long (in minutes) the system waits after an impression event to collect associated behavioral events (such as clicks or shares). Increase this value if users in your scenario typically delay interactions after exposure. | 8 |
default.window.size | How long (in minutes) the system waits for non-impression behavioral events. | 5 |
new.arm.duration.hours | The lifetime of a new arm in hours. Set this to match how frequently new items enter your catalog — for example, use 24 for content that refreshes daily, or a shorter value for fast-moving news feeds. | 24 |
new.arm.cache.minutes | How long (in minutes) a new arm is kept in memory before the system queries Hologres again. | 1 |
arm.id.column.name | The column name of the arm ID in the hologres.arm.table table. | videoId |
arm.create.time.column.name | The column name of the arm creation time in the hologres.arm.table table. | createTime |
arm.create.time.column.type | The format of the arm creation time. See Time format for valid values. | timestamp |
The parallelism configured in the Realtime Compute for Apache Flink console must equal the value of log.parser.parallelism whenever possible. If they differ, the console value must be greater than or equal to log.parser.parallelism. In most cases, set log.parser.parallelism to a value less than or equal to the number of DataHub or Kafka partitions.
Data source configurations
Kafka data source
The Kafka data source requires input data as standard JSON strings in the format {Key:Value, Key:Value,...}.
| Parameter | Description | Example |
|---|---|---|
kafka.bootstrap.servers | The IP addresses of Kafka servers. | 172.0.XX.XX:9092,172.0.XX.XX:9092 |
kafka.topic | The Kafka topic. | item_bhv_log |
kafka.group.id | The Kafka consumer group ID. | realtime_rec |
input.user.field.path | The JSON path of the user ID field. | $.userid |
input.arm.field.path | The JSON path of the arm field. | $.value.svid |
input.event.type.field.path | The JSON path of the event type field. | $.event_name |
input.event.time.field.path | The JSON path of the event time field. | $.gmt |
input.event.time.format | The format of the event time. See Time format for valid values. | "yyyy-MM-dd HH:mm:ss.SSS" |
input.event.reward.json | The reward of the event, in JSON format. Events not listed here are filtered out. Set the impression event reward to 0. | {"exposure":0,"svplay":0.5,"svplayend":1,"share":2,"likes":1.5} |
input.exposure.event.name | The event name that identifies an impression event. | exposure |
input.event.equal.filter | Retain only log entries that match the specified key-value equality condition. Format: {"key": "value"}. | {"from_page":"p_smartvideodetail"} |
input.event.in.filter | Retain only log entries where the specified key's value is in the given set. Format: {"key": [...]}. | {"refer":["p_weexpage", "p_svhome", "p_svh_tab_0"]} |
Events not configured ininput.event.reward.jsonare filtered out and not used for training. Set the reward of the impression event to0.
Field paths use JSON path syntax to locate values within a JSON log entry. For details, see JSON path reference.
DataHub data source
| Parameter | Description | Example |
|---|---|---|
datahub.endpoint | The DataHub endpoint. | http://dh-cn-beijing-int-vpc.aliyuncs.com |
datahub.project | The name of the DataHub project. | xxx_rec |
datahub.topic | The name of the DataHub topic. | item_bhv_log |
datahub.sub.id | The DataHub subscription ID. | 16255766364793VNMA |
datahub.access.id | The DataHub AccessKey ID. | |
datahub.access.key | The DataHub AccessKey secret. | |
datahub.start.in.ms | The timestamp (in milliseconds) from which log consumption starts. Defaults to the time when the job starts. | |
input.user.field.path | The value path of the user ID field. | userid |
input.arm.field.path | The value path of the arm field. | value.svid |
input.event.type.field.path | The value path of the event type field. | event_name |
input.event.time.field.path | The value path of the event time field. | gmt |
input.event.time.format | The format of the event time. See Time format for valid values. | "yyyy-MM-dd HH:mm:ss.SSS" |
input.event.reward.json | The reward of the event, in JSON format. Events not listed here are filtered out. Set the impression event reward to 0. | {"exposure":0,"svplay":0.5,"svplayend":1,"share":2,"likes":1.5} |
input.exposure.event.name | The event name that identifies an impression event. | exposure |
input.event.equal.filter | Retain only log entries that match the specified key-value equality condition. | {"from_page":"p_smartvideodetail"} |
input.event.in.filter | Retain only log entries where the specified key's value is in the given set. | {"refer":["p_weexpage", "p_svhome", "p_svh_tab_0"]} |
Events not configured ininput.event.reward.jsonare filtered out and not used for training. Set the reward of the impression event to0.
DataHub field paths use value path syntax. For details, see Value path reference.
JSON path reference
JSON path locates values within a JSON structure. All paths start with $ (root node). For the full specification, see LanguageManual UDF.
| Character | Meaning |
|---|---|
$ | Root node |
. or [''] | Child node. Use [''] when the key contains a . |
[number] | Array subscript, starting at 0 |
: | Wildcard for an array — returns the entire array |
The following example shows how JSON path expressions map to values in a log entry:
{
"store": {
"fruit": [
{"weight": 8, "type": "apple"},
{"weight": 9, "type": "pear"}
],
"bicycle": {"price": 19.95, "color": "red"}
},
"email": "a***@example.net",
"owner": "a**"
}| JSON path | Value |
|---|---|
$.owner | a** |
$.store.bicycle.price | 19.95 |
$.store.fruit[0] | {"weight":8,"type":"apple"} |
$.store.fruit[0].type | apple |
Value path reference
DataHub value paths locate values within a DataHub record. The path syntax differs from JSON path: no leading $, and brackets indicate array subscripts.
| Syntax | Description |
|---|---|
column_name | Single-value field |
column_name.path | Nested field within a JSON-formatted column |
. | Child node separator |
[sub1][sub2]... | Array subscripts |
The following example shows value path expressions for the value column of a DataHub record:
{
"svd_tab": "recommend",
"publish_uid": 964655287,
"totaltime": 8219,
"exp_id": "ER4_L6#EG8****",
"refer": ["weexpage", "html5"],
"alogr_name": {
"name": "U2IRecallV10",
"type": "linucb"
},
"svid": "5480249",
"n_svid": "5505132",
"playtime": 572,
"n_alogr_name": "SwingU2IRecallV10",
"request_id": "03cabe99-e46c-43bf-b689-5c6b9ab083fa"
}| Value path | Value |
|---|---|
value.svid | 5480249 |
value.alogr_name.name | U2IRecallV10 |
value.refer[1] | html5 |
Time format
Use one of the following values for any parameter that specifies a time format:
| Value | Description |
|---|---|
timestamp | Integer timestamp in seconds |
timestamp.in.millisecond | Integer timestamp in milliseconds |
java SimpleDateFormat | A string in the format of yyyy-MM-dd HH:mm:ss.SSS. Example: 2021-09-14 19:18:37.971 |