文档

通过开源Kafka客户端写入Lindorm消息引擎数据

更新时间:

Lindorm消息引擎完全兼容开源Kafka API,您可以通过Kafka API编写程序向Lindorm消息引擎写入数据,也可以通过开源的三方工具采集并写入Lindorm消息引擎数据,例如FluentD、Debezium等。本文介绍通过开源Kafka客户端连接Lindorm消息引擎并写入Lindorm消息引擎数据的代码示例。

前提条件

  • 已安装Java环境,要求安装JDK 1.8及以上版本。

  • 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单

  • 已获取Lindorm消息引擎Kafka地址,具体操作请参见访问实例

注意事项

  • Lindorm消息引擎仅支持通过专有网络访问。

  • 如果应用部署在ECS实例,通过专有网络访问Lindorm实例前,需要确保Lindorm实例和ECS实例满足以下条件,以保证网络的连通性。

    • 所在地域相同,并建议所在可用区相同(以减少网络延时)。

    • ECS实例与Lindorm实例属于同一专有网络。

操作步骤

  1. 下载开源Kafka客户端。在pom.xml中配置Maven依赖,具体内容如下:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
    <!-- jettison库为代码样例中的JSON依赖,实际使用中可视具体情况决定是否依赖jettison库 -->
    <dependency>
        <groupId>org.codehaus.jettison</groupId>
        <artifactId>jettison</artifactId>
        <version>1.5.4</version>
    </dependency>
  2. 连接Lindorm消息引擎并写入Lindorm消息引擎数据。完整的代码示例如下:

    说明
    • 写入的数据格式支持JSON、Avro和CSV。或任意自定义格式数据。

    • 代码中Lindorm消息引擎Kafka地址为专有网络地址,获取方法请参见访问实例

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.codehaus.jettison.json.JSONObject;
    
    import java.util.Properties;
    import java.util.concurrent.Future;
    
    public class KafkaToLindormMessageDemo {
      public static void main(String[] args) {
        Properties props = new Properties();
    
        //设置Lindorm消息引擎Kafka地址,这个Lindorm消息引擎Kafka地址为专有网络地址,需确保应用程序部署的环境和Lindorm实例使用相同的专有网络ID。
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://<控制台获取的Lindorm消息引擎Kafka地址>");
        //指定数据流表的物理数据存储在某个消息引擎Topic上
        String topic = "log_topic";
    
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        try {
          JSONObject json = new JSONObject();
          //写入消息引擎数据
          json.put("timestamp", System.currentTimeMillis());
          json.put("loglevel", "ERROR");
          json.put("thread", "[ReportFinishedTask7-thread-4]");
          json.put("class", "engine.ImporterTaskManager(318)");
          json.put("detail", "Remove tasks fail: job name=e35318e5-52ea-48ab-ad2a-0144ffc6955e , " +
            "task name=prepare_e35318e5-52ea-48ab-ad2a-0144ffc6955e , runningTasks=0");
          Future<RecordMetadata> future = producer.send(new ProducerRecord<>(
            topic, json.getString("thread") + json.getLong("timestamp"), json.toString()));
          producer.flush();
          try {
            RecordMetadata recordMetadata = future.get();
            System.out.println("Produce ok:" + recordMetadata.toString());
          } catch (Throwable t) {
            System.out.println("Produce exception " + t.getMessage());
            t.printStackTrace();
          }
        } catch (Exception e) {
          System.out.println("Produce exception " + e.getMessage());
          e.printStackTrace();
        }
      }
    }
  • 本页导读 (1)