兼容Kafka

DataHub已经完全兼容Kafka协议,您可以使用原生Kafka客户端对DataHub进行读写操作。

相关介绍

Kafka映射DataHub介绍

Topic类型

Kafka的Topic扩容方式和DataHub的topic扩容方式不同,为了适配Kafka的topic扩容方式,DataHub创建topic时需要将扩容方式选为扩展模式。扩展模式的topic,不再支持分裂/合并操作,而是添加shard的方式,暂不支持减少shard。

Topic命名

Kafka的Topic映射之后为DataHub的project+topic,project和topic以 “.”分割,例如:test_project.test_topic对应到DataHub中Project为test_project,Topic为test_topic,如果含有多个“.”,会以首个“.”分割Project和Topic,多余的“.”和”-“会被替换为“_“。

Partition

DataHub的每个处于Active状态shard对应Kafka的1个Partition,如果当前Active状态shard为5个,那么就可以视为Kafka有5个Partition,写入数据时,可以指定Partition范围为[0,4],如果不指定,则会由kafka客户端选择Partition。

Tuple Topic

Kafka的数据写入Tuple Topic时,Topic Schema必须为2列或1列,类型必须为STRING,其他情况会写入失败。如果为1列,则只写入value,key的数据将被丢弃,如果为2列,则第1列和第2列分别对应key和value。Tuple Topic写入二进制数据会存在乱码问题,二进制数据建议写入Blob Topic

Blob Topic

Kafka的数据写入Blob Topic时,会把Kafka数据的value写入Blob中,如果Kafka数据的key不为NULL,则会写入DataHub的Attribute,其中key为”__kafka_key__“,value为Kafka数据的key。

Header

Kafka的Header对应DataHub的Attribute,但是如果Kafka的Header的value为NULL,则会忽略掉对应的header。建议不要使用”__kafka_key__“作为Header的key

Consumer Group

DataHub的消费组就是订阅id,只能同时订阅单个topic,而kafka的group可以同时订阅多个topic,为了更好的兼容kafka的订阅方式,DataHub又提供了group的功能,用户可以在project下创建group并绑定想要订阅的topic,就可以使用该group订阅这个project下的多个topic。DataHub的group本质上就是服务端内部封装了DataHub的多个订阅,如果group绑定了topic,用户可以在topic页面的订阅列表页面,看到由group自动创建的订阅,删除该订阅会导致group无法订阅该topic,并且之前的消费点位都会消失。

目前单个group限制最多可以订阅50个topic,如果需要订阅更多,请联系我们。

Kafka配置参数

C=Consumer, P=Producer, S=Streams

参数

C/P/S

可选配置

是否必须

描述

bootstrap.servers

*

参考Kafka域名列表

security.protocol

*

SASL_SSL

为了保证数据传输的安全性,Kafka写入DataHub默认使用SSL加密传输

sasl.mechanism

*

PLAIN

AK认证方式,仅支持PLAIN

compression.type

P

LZ4

是否开启压缩传输,目前仅支持LZ4

group.id

C

project.topic:subId

或者

project.group

使用project.topic:subId时必须和订阅的topic保持一致,否则无法读取数据,推荐使用project.group

partition.assignment.strategy

C

org.apache.kafka.clients.consumer.RangeAssignor

Kafka默认为RangeAssignor,并且DataHub目前只支持RangeAssignor,请不要修改此配置

session.timeout.ms

C/S

[60000, 180000]

kafka默认为10000, 但是因为DataHub限制最小为60000,所以这里默认会变为60000

heartbeat.interval.ms

C/S

建议session.timeout.ms的 2/3

Kafka默认为3000,但是因为session.timeout.ms会被默认修改为60000,所以这里建议显示设置为40000,否则heartbeat请求会过于频繁

application.id

S

project.topic:subId

或者

project.group

使用project.topic:subId时必须和订阅的topic保持一致,否则无法读取数据,推荐使用project.group

以上是使用Kafka客户端写入DataHub需要重点关注的参数,对于等客户端相关的参数,行为没有变化,例如:retries,batch.size;对于服务端相关参数不会对服务端行为有改变,例如:无论acks的值为多少,DataHub默认数据完全写入成功之后才会返回。

Kafka域名列表

地区

Region

外网Endpoint

经典网络ECS Endpoint

VPC ECS Endpoint

华东1(杭州)

cn-hangzhou

dh-cn-hangzhou.aliyuncs.com:9092

dh-cn-hangzhou.aliyun-inc.com:9093

dh-cn-hangzhou-int-vpc.aliyuncs.com:9094

华东2(上海)

cn-shanghai

dh-cn-shanghai.aliyuncs.com:9092

dh-cn-shanghai.aliyun-inc.com:9093

dh-cn-shanghai-int-vpc.aliyuncs.com:9094

华北2(北京)

cn-beijing

dh-cn-beijing.aliyuncs.com:9092

dh-cn-beijing.aliyun-inc.com:9093

dh-cn-beijing-int-vpc.aliyuncs.com:9094

华南1(深圳)

cn-shenzhen

dh-cn-shenzhen.aliyuncs.com:9092

dh-cn-shenzhen.aliyun-inc.com:9093

dh-cn-shenzhen-int-vpc.aliyuncs.com:9094

华北3(张家口)

cn-zhangjiakou

dh-cn-zhangjiakou.aliyuncs.com:9092

dh-cn-zhangjiakou.aliyun-inc.com:9093

dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094

亚太东南1(新加坡)

ap-southeast-1

dh-ap-southeast-1.aliyuncs.com:9092

dh-ap-southeast-1.aliyun-inc.com:9093

dh-ap-southeast-1-int-vpc.aliyuncs.com:9094

亚太东南3(吉隆坡)

ap-southeast-3

dh-ap-southeast-3.aliyuncs.com:9092

dh-ap-southeast-3.aliyun-inc.com:9093

dh-ap-southeast-3-int-vpc.aliyuncs.com:9094

亚太南部1(孟买)

ap-south-1

dh-ap-south-1.aliyuncs.com:9092

dh-ap-south-1.aliyun-inc.com:9093

dh-ap-south-1-int-vpc.aliyuncs.com:9094

欧洲中部1(法兰克福)

eu-central-1

dh-eu-central-1.aliyuncs.com:9092

dh-eu-central-1.aliyun-inc.com:9093

dh-eu-central-1-int-vpc.aliyuncs.com:9094

上海金融云

cn-shanghai-finance-1

dh-cn-shanghai-finance-1.aliyuncs.com:9092

dh-cn-shanghai-finance-1.aliyun-inc.com:9093

dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094

中国香港

cn-hongkong

dh-cn-hongkong.aliyuncs.com:9092

dh-cn-hongkong.aliyun-inc.com:9093

dh-cn-hongkong-int-vpc.aliyuncs.com:9094

示例

创建Topic示例

页面创建

kafka

代码创建

注意:目前无法通过kafka的API创建topic,只能通过datahub的SDK创建,创建时需要指定ExpandMode为ONLY_EXTEND。

Maven依赖版本需为2.19.0或更高版本

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.19.0-public</version>
</dependency>
public class CreateTopic {
    public static void main(String[] args) {
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig("https://dh-cn-hangzhou.aliyuncs.com",
                                new AliyunAccount("accessId", "accessKey")))
                .build();

        int shardCount = 1;
        int lifeCycle = 7;

        try {
            datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

创建group示例

页面创建

创建group

创建完成后仍旧可以修改绑定的topic列表,所以这里可以先任意选择。

创建 group

创建完成后,可以在topic的订阅列表页面看到group自动创建了订阅。

创建group

代码创建

Maven依赖版本需为2.21.6-public或更高版本

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.21.6-public</version>
</dependency>
public class CreateGroup {
    public static void main(String[] args) {
        DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
                .setDatahubConfig(
                        new DatahubConfig("https://dh-cn-hangzhou.aliyuncs.com",
                                new AliyunAccount("accessId", "accessKey")))
                .build();

        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.topic1");
        topicList.add("test_project.topic2");
        topicList.add("test_project.topic3");

        try {
            // 创建kafka group
            datahubClient.createKafkaGroup("test_project", "test_topic", "test comment");

            // 将需要订阅的topic绑定到group上
            datahubClient.updateTopicsForKafkaGroup("test_project", "test_topic", topicList, UpdateKafkaGroupMode.ADD);
        } catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

Producer示例:

生成kafka_client_producer_jaas.conf文件

创建文件kafka_client_producer_jaas.conf,保存到任意路径,文件内容如下。

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

Maven依赖

Kafka-client版本至少大于等于0.10.0.0,推荐2.4.0

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

示例代码

public class ProducerExample {
    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("compression.type", "lz4");

        String KafkaTopicName = "test_project.test_topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);

        try {
            List<Header> headers = new ArrayList<>();
            RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
            RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
            headers.add(header1);
            headers.add(header2);

            ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);

            // sync send
            producer.send(record).get();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

运行结果

运行成功之后,可以再DataHub抽样一下,确认是否正常DataHub。

image

Consumer示例

生成kafka_client_producer_jaas.conf文件和Maven依赖参考Producer示例。

新加入的consumer需要十几秒左右分配shard,分配完成后即可消费。

示例代码

使用kafka group示例(推荐)

package com.aliyun.datahub.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        // group.id填project.group
        properties.put("group.id", "test_project.test_kafka_group");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        List<String> topicList = new ArrayList<>();
        topicList.add("test_project.test_topic1");
        topicList.add("test_project.test_topic2");
        topicList.add("test_project.test_topic3");
        // 使用kafka group可以同时订阅多个topic
        kafkaConsumer.subscribe(topicList);

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

使用project.topic.subid示例

package com.aliyun.datahub.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        // group.id填project.topic.subId
        properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("ssl.endpoint.identification.algorithm", "");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        // 使用project.topic.subId的方式只能订阅单个topic
        kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.toString());
            }
        }
    }
}

运行结果

运行成功之后,便可以在终端看到读取到的数据。

ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)

注意:这里同一个请求返回的数据的LogAppendTime是相同的,是该请求返回所有的数据的写入DataHub时间的最大值

Streams示例

Maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.4.0</version>
</dependency>

代码示例

这里读取test_project下input的数据,将key和value的字符串转为小写重新写入output。

public class StreamExample {

    static {
        System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
    }

    public static void main(final String[] args) {
        final String input = "test_project.input";
        final String output = "test_project.output";
        final Properties properties = new Properties();
        properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
        properties.put("application.id", "test_project.input:1611293595417QH0WL");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("session.timeout.ms", "60000");
        properties.put("heartbeat.interval.ms", "40000");
        properties.put("auto.offset.reset", "earliest");

        final StreamsBuilder builder = new StreamsBuilder();
        TestMapper testMapper = new TestMapper();
        builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
                .map(testMapper)
                .to(output, Produced.with(Serdes.String(), Serdes.String()));

        final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

    static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {

        @Override
        public KeyValue<String, String> apply(String s, String s2) {
            return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
        }
    }
}

运行结果

启动Streams任务之后,分配shard大概需要1分钟左右,1分钟之后就可以在控制台看到当前的task数量,task数量和输入topic的shard数量保持一致,示例输入topic为3个shard。

currently assigned active tasks: [0_0, 0_1, 0_2]
    currently assigned standby tasks: []
    revoked active tasks: []
    revoked standby tasks: []

shard分配成功之后,可以向input中写入一组测试数据 (AAAA,BBBB),(CCCC,DDDD),(EEEE,FFFF),之后再output抽样一下,查看数据是否正确写入。

image

注意事项

  • 目前不支持事务、幂等

  • 目前Kafka客户端无法自动创建DataHub Topic,写入之前需要保证已创建Topic

  • Consumer目前最多只可以订阅一个topic

  • Consumer读取的数据时间戳均为LogAppendTime,表示DataHub的落盘时间,单个请求返回的所有数据时间戳相同,为所有数据时间戳的最大值,所以如果读取的时间戳可能会大于实际的落盘时间

  • Streams输入topic目前仅支持一个,输出可以多个topic

  • Streams目前只支持无状态的任务。

  • 支持Kafka版本为0.10.0 -> 2.4.0

常见问题

Q: 写入数据时连接断开

Selector - [Producer clientId=producer-1] Connection with dh-cn-shenzhen.aliyuncs.com disconnected
java.io.EOFException
    at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
    ...

A: Kafka meta请求和写数据请求不是一个连接,第一次meta请求会请求建立一个连接,然后写数据时会重新和meta中的返回的broker重新建立一个连接,并且之后所有的请求都是在第二个连接上发送,因此第一个连接就会闲置,服务端会主动关闭闲置超过一定时间的连接,因此如果这个错误并没有影像数据的正常写入,直接忽略即可。

Q: 启动kafka客户端失败

Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 found

A:添加配置properties.put("ssl.endpoint.identification.algorithm", "");

Q:Consumer消费过程中出现DisconnectException

[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectException

A:Kafka的客户端需要与服务端保持TCP长连接,一般情况是因为网络抖动造成的,客户端有重试逻辑,因此不会对客户端的消费造成影响。

阿里云首页 数据总线 DataHub 相关技术圈