本文介绍通过OpenTelemetry Rust SDK将Rust应用的Trace数据接入到日志服务的操作步骤。

前提条件

  • 已创建Trace实例。更多信息,请参见创建Trace实例
  • 已安装Rust 1.46及以上版本的开发环境。

操作步骤

  1. 添加依赖项。
    [package]
    name = "test"
    version = "0.1.0"
    authors = [""]
    edition = "2018"
    
    # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
    
    [dependencies]
    futures = "0.3"
    lazy_static = "1.4"
    opentelemetry = { version = "0.12.0", features = ["tokio-support", "metrics", "serialize"] }
    opentelemetry-otlp = { version = "0.5.0", features = ["tonic", "metrics", "tls", "tls-roots"] }
    serde_json = "1.0"
    tokio = { version = "1.0", features = ["full"] }
    tonic="0.4.0"
    url = "2.2.0"
  2. 运行代码。

    如下代码中的变量需根据实际情况替换。关于变量的详细说明,请参见表 1

    use futures::stream::Stream;
    use futures::StreamExt;
    use opentelemetry::sdk::metrics::{selectors, PushController};
    use opentelemetry::trace::TraceError;
    use opentelemetry::{
        baggage::BaggageExt,
        metrics::{self, ObserverResult},
        trace::{TraceContextExt, Tracer},
        Context, Key, KeyValue,
    };
    use opentelemetry::{global, sdk::trace as sdktrace};
    use opentelemetry_otlp::ExporterConfig;
    use std::error::Error;
    use std::time::Duration;
    use tonic::{
      metadata::MetadataMap,
      transport::ClientTlsConfig,
    };
    use url::Url;
    use opentelemetry::sdk::Resource;
    use opentelemetry::sdk::export::trace::stdout;
    use opentelemetry::sdk::{
        propagation::TraceContextPropagator,
        trace::{self, Sampler},
    };
    use opentelemetry::sdk::export::trace::stdout::Uninstall;
    
    
    static ENDPOINT : &str = "https://${endpoint}";
    static PROJECT : &str = "${project}";
    static INSTANCE_ID : &str = "${instance}";
    static AK_ID : &str = "${access-key-id}";
    static AK_SECRET : &str = "${access-key-secret}";
    static SERVICE_VERSION : &str = "${version}";
    static SERVICE_NAME : &str = "${service}";
    static HOST_NAME : &str = "${host}";
    
    static SLS_PROJECT_HEADER : &str = "x-sls-otel-project";
    static SLS_INSTANCE_ID_HEADER : &str = "x-sls-otel-instance-id";
    static SLS_AK_ID_HEADER : &str = "x-sls-otel-ak-id";
    static SLS_AK_SECRET_HEADER : &str = "x-sls-otel-ak-secret";
    static SLS_SERVICE_VERSION : &str = "service.version";
    static SLS_SERVICE_NAME : &str = "service.name";
    static SLS_HOST_NAME : &str = "host.name";
    
    fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_otlp::Uninstall), TraceError> {
        let mut metadata_map = MetadataMap::with_capacity(4);
        metadata_map.insert(SLS_PROJECT_HEADER, PROJECT.parse().unwrap());
        metadata_map.insert(SLS_INSTANCE_ID_HEADER, INSTANCE_ID.parse().unwrap());
        metadata_map.insert(SLS_AK_ID_HEADER, AK_ID.parse().unwrap());
        metadata_map.insert(SLS_AK_SECRET_HEADER, AK_SECRET.parse().unwrap());
        let endpoint = ENDPOINT;
        let endpoint = Url::parse(&endpoint).expect("endpoint is not a valid url");
        let resource = vec![KeyValue::new(SLS_SERVICE_VERSION, SERVICE_VERSION),
                            KeyValue::new(SLS_HOST_NAME, HOST_NAME), 
                            KeyValue::new(SLS_SERVICE_NAME, SERVICE_NAME)];
        opentelemetry_otlp::new_pipeline()
            .with_endpoint(endpoint.as_str())
            .with_tls_config(
                ClientTlsConfig::new().domain_name(endpoint.host_str().expect("the specified endpoint should have a valid host")))
            .with_metadata(metadata_map)
            .with_trace_config(
                opentelemetry::sdk::trace::config()
                    .with_resource(Resource::new(resource)),
            )
            .install()
    }
    
    // Skip first immediate tick from tokio, not needed for async_std.
    fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Instant> {
        opentelemetry::util::tokio_interval_stream(duration).skip(1)
    }
    
    fn init_meter() -> metrics::Result<PushController> {
    
        let mut metadata_map = MetadataMap::with_capacity(4);
        metadata_map.insert(SLS_PROJECT_HEADER, PROJECT.parse().unwrap());
        metadata_map.insert(SLS_INSTANCE_ID_HEADER, INSTANCE_ID.parse().unwrap());
        metadata_map.insert(SLS_AK_ID_HEADER, AK_ID.parse().unwrap());
        metadata_map.insert(SLS_AK_SECRET_HEADER, AK_SECRET.parse().unwrap());
        let endpoint = ENDPOINT;
        let endpoint = Url::parse(&endpoint).expect("endpoint is not a valid url");
        let export_config = ExporterConfig {
            endpoint: endpoint.as_str().to_string(),
            tls_config : Some(ClientTlsConfig::new().domain_name(
                    endpoint
                        .host_str()
                        .expect("the specified endpoint should have a valid host"))),
            metadata : Some(metadata_map),
            ..ExporterConfig::default()
        };
        let resource = vec![KeyValue::new(SLS_SERVICE_VERSION, SERVICE_VERSION),
                            KeyValue::new(SLS_HOST_NAME, HOST_NAME), 
                            KeyValue::new(SLS_SERVICE_NAME, SERVICE_NAME)];
        opentelemetry_otlp::new_metrics_pipeline(tokio::spawn, delayed_interval)
            .with_export_config(export_config)
            .with_resource(resource)
            .with_aggregator_selector(selectors::simple::Selector::Exact)
            .build()
    }
    
    fn init_tracer_stdout() -> (opentelemetry::sdk::trace::Tracer, Uninstall) {
        global::set_text_map_propagator(TraceContextPropagator::new());
        // Install stdout exporter pipeline to be able to retrieve the collected spans.
        // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. In a production
        // application, use `Sampler::ParentBased` or `Sampler::TraceIdRatioBased` with a desired ratio.
        stdout::new_pipeline()
            .with_trace_config(trace::config().with_default_sampler(Sampler::AlwaysOn))
            .install()
    }
    
    const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
    const BAR_KEY: Key = Key::from_static_str("ex.com/bar");
    const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons");
    const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another");
    
    lazy_static::lazy_static! {
        static ref COMMON_LABELS: [KeyValue; 4] = [
            LEMONS_KEY.i64(10),
            KeyValue::new("A", "1"),
            KeyValue::new("B", "2"),
            KeyValue::new("C", "3"),
        ];
    }
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
        let _tracer : (opentelemetry::sdk::trace::Tracer, Uninstall);
        let _guard : Result<(sdktrace::Tracer, opentelemetry_otlp::Uninstall), TraceError>;
        let _started : metrics::Result<PushController>;
        if ENDPOINT == "stdout"{
            println!("init_tracer_stdout");
            _tracer = init_tracer_stdout();
        }else {
            let _guard = init_tracer()?;
            let _started = init_meter()?;
        }
    
        let tracer = global::tracer("ex.com/basic");
        let meter = global::meter("ex.com/basic");
    
        let one_metric_callback = |res: ObserverResult<f64>| res.observe(1.0, COMMON_LABELS.as_ref());
        let _ = meter
            .f64_value_observer("ex.com.one", one_metric_callback)
            .with_description("A ValueObserver set to 1.0")
            .init();
    
        let value_recorder_two = meter.f64_value_recorder("ex.com.two").init();
    
        let another_recorder = meter.f64_value_recorder("ex.com.two").init();
        another_recorder.record(5.5, COMMON_LABELS.as_ref());
    
        let _baggage =
            Context::current_with_baggage(vec![FOO_KEY.string("foo1"), BAR_KEY.string("bar1")])
                .attach();
    
        let value_recorder = value_recorder_two.bind(COMMON_LABELS.as_ref());
    
        tracer.in_span("operation", |cx| {
            let span = cx.span();
            span.add_event(
                "Nice operation!".to_string(),
                vec![Key::new("bogons").i64(100)],
            );
            span.set_attribute(ANOTHER_KEY.string("yes"));
    
            meter.record_batch_with_context(
                // Note: call-site variables added as context Entries:
                &Context::current_with_baggage(vec![ANOTHER_KEY.string("xyz")]),
                COMMON_LABELS.as_ref(),
                vec![value_recorder_two.measurement(2.0)],
            );
    
            tracer.in_span("Sub operation...", |cx| {
                let span = cx.span();
                span.set_attribute(LEMONS_KEY.string("five"));
    
                span.add_event("Sub span event".to_string(), vec![]);
    
                value_recorder.record(1.3);
            });
        });
    
        // wait for 1 minutes so that we could see metrics being pushed via OTLP every 10 seconds.
        tokio::time::sleep(Duration::from_secs(60)).await;
    
    
        Ok(())
    }
    表 1. 变量说明
    变量 说明 示例
    ${service} 服务名。根据您的实际场景取值即可。 payment
    ${version} 服务版本号。建议按照va.b.c格式定义。 v0.1.2
    ${host} 主机名。 localhost
    ${endpoint} 接入地址,格式为${project}.${region-endpoint}:Port,其中:
    • ${project}:日志服务Project名称。
    • ${region-endpoint}:Project访问域名,支持公网和阿里云内网(经典网络、VPC)。更多信息,请参见服务入口
    • Port:网络端口,固定为10010。
    test-project.cn-hangzhou.log.aliyuncs.com:10010
    ${project} 日志服务Project名称。 test-project
    ${instance} Trace服务实例名称。 test-traces
    ${access-key-id} 阿里云账号AccessKey ID。

    建议您使用只具备日志服务Project写入权限的RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。授予RAM用户向指定Project写入数据权限的具体操作,请参见授权。如何获取AccessKey的具体操作,请参见访问密钥

    ${access-key-secret} 阿里云账号AccessKey Secret。

    建议您使用只具备日志服务Project写入权限的RAM用户的AccessKey。

后续步骤