发送普通消息(三种方式)

SOFAStack 消息队列提供三种方式来发送普通消息:同步发送、异步发送和单向(Oneway)发送。本文介绍了每种发送方式的原理、使用场景、示例代码,以及三种发送方式的对比。

三种发送方式的对比

三者的特点和主要区别如下:

发送方式

发送 TPS

发送结果反馈

可靠性

适用场景

同步发送

不丢失

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

异步发送

不丢失

异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送

最快

可能丢失

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

同步发送

原理

同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。

同步发送

示例代码

  • SOFABOOT

    import com.alibaba.fastjson.JSON;
    import com.alipay.sofa.sofamq.api.Messaging;
    import com.alipay.sofa.sofamq.api.MessageProducer;
    import com.alipay.sofa.sofamq.api.Producer;
    
    // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到
    @Messaging
    public class SomeClass {
         @MessageProducer(group = "GID_XXX", topic = "TP_XXX", payload = OrderPojo.class)
         public Producer<OrderPojo> producer;
    
         public void someMethod() {
             OrderPojo orderPojo = getPojo();
             producer.send(producer.messageBuilder().withTags("TAGA").withValue(orderPojo).build());
         }
    }
  • 非 SOFABOOT

    import java.util.Properties;
    
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.Producer;
    import io.openmessaging.api.SendResult;
    
    public class SomeClass {
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXX");
            properties.setProperty(PropertyKeyConst.CELL, "GZ00B");
            Producer producer = OMS.builder().driver("sofamq").endpoint("antvip://sofamqnamesrv-pool.gz00b.alipay.net")
                .schemaRegistryUrl("antvip://openmeta-pool.gz00b.alipay.net")
                .build().createProducer(properties);
            producer.start();
    
            OrderPojo orderPojo = getPojo();
            SendResult sendResult = producer.send(producer.messageBuilder().withTopic("TP_XXX").withTags("TAGA")
                                                  .withProperty("propkey", "propvalue").withValue(orderPojo).build());
            System.out.println(sendResult);
        }
    }

异步发送

原理

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

异步

示例代码

  • SOFABOOT

    import com.alipay.sofa.sofamq.api.MessageProducer;
    import com.alipay.sofa.sofamq.api.Messaging;
    import com.alipay.sofa.sofamq.api.Producer;
    
    import io.openmessaging.api.OnExceptionContext;
    import io.openmessaging.api.SendCallback;
    import io.openmessaging.api.SendResult;
    
    // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到
    @Messaging
    public class SomeClass {
        @MessageProducer(group = "GID_XXX", topic = "TP_XXX", payload = OrderPojo.class)
        public Producer<OrderPojo> producer;
    
        public void someMethod() {
            OrderPojo orderPojo = getPojo();
            Message message = producer.messageBuilder().withTags("TRADE").withValue(orderPojo).build();
            producer.sendAsync(message,
                new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("send success: " + sendResult);
                    }
    
                    @Override
                    public void onException(OnExceptionContext context) {
                        System.out.println("send exception: " + context);
                    }
                });
        }
    }
  • 非 SOFABOOT

    import java.util.Properties;
    
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.OnExceptionContext;
    import io.openmessaging.api.Producer;
    import io.openmessaging.api.SendCallback;
    import io.openmessaging.api.SendResult;
    
    public class SomeClass {
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXX");
            properties.setProperty(PropertyKeyConst.CELL, "GZ00B");
            Producer producer = OMS.builder().driver("sofamq").endpoint("antvip://sofamqnamesrv-pool.gz00b.alipay.net")
                .schemaRegistryUrl("antvip://openmeta-pool.gz00b.alipay.net")
                .build().createProducer(properties);
            producer.start();
    
            OrderPojo orderPojo = getPojo();
            Message message = producer.messageBuilder().withTopic("TP_XXX").withTags("TRADE").withValue(orderPojo).build();
            producer.sendAsync(message, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("send success: " + sendResult);
                    }
    
                    @Override
                    public void onException(OnExceptionContext context) {
                        System.out.println("send fail: " + context);
                    }
                });
    
        }
    }

单向(Oneway)发送

原理

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

oneway

示例代码

  • SOFABOOT

    import com.alipay.sofa.sofamq.api.MessageProducer;
    import com.alipay.sofa.sofamq.api.Messaging;
    import com.alipay.sofa.sofamq.api.Producer;
    
    // 请使用 xml 或注解将该类配置为 Bean,只有 @Messaging 扫描不到
    @Messaging
    public class SomeClass {
        @MessageProducer(group = "GID_XXX", topic = "TP_XXX", payload = OrderPojo.class)
        public Producer<OrderPojo> producer;
    
        public void someMethod() {
            OrderPojo orderPojo = getPojo();
            Message message = producer.messageBuilder().withTags("TRADE").withValue(orderPojo).build();
            producer.sendOneway(message);
        }
    }
  • 非 SOFABOOT

    import java.util.Properties;
    
    import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.Producer;
    
    public class SomeClass {
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXX");
            properties.setProperty(PropertyKeyConst.CELL, "GZ00B");
            Producer producer = OMS.builder().driver("sofamq").endpoint("antvip://sofamqnamesrv-pool.gz00b.alipay.net")
                .schemaRegistryUrl("antvip://openmeta-pool.gz00b.alipay.net")
                .build().createProducer(properties);
            producer.start();
    
            OrderPojo orderPojo = getPojo();
            Message message = producer.messageBuilder().withTopic("TP_XXX").withTags("TRADE").withValue(orderPojo).build();
            producer.sendOneway(message);
        }
    }
阿里云首页 金融分布式架构 SOFAStack 相关技术圈