Lindorm流引擎完全兼容开源Kafka API,您可以通过Kafka API编写程序写入Lindorm流引擎数据,也可以通过开源的三方工具采集并写入Lindorm流引擎数据,例如FluentD、Debezium等。本文介绍通过开源Kafka客户端连接Lindorm流引擎并写入Lindorm流引擎数据的代码示例。
前提条件
- 已安装Java环境,要求安装JDK 1.7及以上版本。
- 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单。
- 已获取Lindorm流引擎的Lindorm Stream Kafka地址,具体操作请参见查看连接地址。
说明 Lindorm流引擎的Lindorm Stream Kafka地址为专有网络地址,需确保应用程序部署的环境和Lindorm实例使用相同的专有网络ID。
操作步骤
- 下载开源Kafka客户端。在pom.xml中配置Maven依赖,具体内容如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
- 连接Lindorm流引擎并写入Lindorm流引擎数据。完整的代码示例如下:
说明
- 写入的数据格式支持JSON、Avro和CSV。
- 代码中Lindorm Stream Kafka地址为专有网络地址,获取方法请参见查看连接地址。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.codehaus.jettison.json.JSONObject;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaToLindormStreamDemo {
public static void main(String[] args) {
Properties props = new Properties();
//设置Lindorm Stream Kafka地址,这个Lindorm Stream Kafka地址为专有网络地址,需确保应用程序部署的环境和Lindorm实例使用相同的专有网络ID。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Lindorm Stream Kafka地址");
//指定数据流表的物理数据存储在某个Topic上
String topic = "log_topic";
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
try {
JSONObject json = new JSONObject();
//写入流引擎数据
json.put("timestamp", System.currentTimeMillis());
json.put("loglevel", "ERROR");
json.put("thread", "[ReportFinishedTask7-thread-4]");
json.put("class", "engine.ImporterTaskManager(318)");
json.put("detail", "Remove tasks fail: job name=e35318e5-52ea-48ab-ad2a-0144ffc6955e , task name=prepare_e35318e5-52ea-48ab-ad2a-0144ffc6955e , runningTasks=0");
Future<RecordMetadata> future = producer.send(
new ProducerRecord<String, String>(topic, json.getString("thread") + json.getLong("timestamp"),
json.toString()));
producer.flush();
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
System.out.println("Produce exception " + t.getMessage());
t.printStackTrace();
}
} catch (Exception e) {
System.out.println("Produce exception " + e.getMessage());
e.printStackTrace();
}
}
}