异步调用

本文介绍HSF如何进行异步调用。

前提条件

同步调用

HSF的IO操作是异步操作,客户端同步调用的本质是执行future.get(timeout)操作,等待服务端的结果返回,这里的timeout就是客户端生效的超时时间 (默认3000ms)。同步调用时序图如下所示:SAE服务HSF应用开发异步调用

对于客户端来说,并不是所有的HSF服务都是需要同步等待服务端返回结果的,对于这些服务,HSF提供异步调用的形式,让客户端不必同步阻塞在HSF操作上。 异步调用在发起调用时,HSF服务的调用结果都是返回值的默认值,例如返回类型是int,则会返回0,返回类型是Object,则会返回null。而真正的结果,是在HSFResponseFuture或者回调函数(Callback)中获得的。

Future异步调用

HSF发起调用后,用户可以在上下文中获取跟返回结果关联的HSFFuture对象,获取对象后调用HSFFuture.getResponse (timeout) 获取服务端的返回结果。Future异步调用的时序图如下所示:

SAE应用HSF应用开发之Future异步调用

  • API形式配置HSF服务

    HSF提供了方法级别的异步调用配置,格式为name:${methodName};type:future,由于只用方法名字来标识方法,所以并不区分重载的方法。同名的方法都会被设置为同样的调用方式。

    HSFApiConsumerBean hsfApiConsumerBean = new HSFApiConsumerBean();
    hsfApiConsumerBean.setInterfaceName("com.alibaba.middleware.hsf.guide.api.service.OrderService");
    hsfApiConsumerBean.setVersion("1.0.0");
    hsfApiConsumerBean.setGroup("HSF");
    // [设置] 异步future调用
    List<String> asyncallMethods = new ArrayList<String>();
    // 格式:name:{methodName};type:future
    asyncallMethods.add("name:queryOrder;type:future");
    hsfApiConsumerBean.setAsyncallMethods(asyncallMethods);
    
    hsfApiConsumerBean.init(true);
    
    // [代理] 获取HSF代理
    OrderService orderService = (OrderService) hsfApiConsumerBean.getObject();
    // ---------------------- 调用 -----------------------//
    // [调用] 发起HSF异步调用, 返回null
    OrderModel orderModel = orderService.queryOrder(1L);
    // 及时在当前调用上下文中,获取future对象;因为该对象是放在ThreadLocal中,同一线程中后续调用会覆盖future对象,所以要及时取出
    HSFFuture hsfFuture = HSFResponseFuture.getFuture();
    
    // do something else
    
    // 这里才真正地获取结果,如果调用还未完成,将阻塞等待结果,5000ms是等待结果的最大时间
    try {
      System.out.println(hsfFuture.getResponse(5000));
    } catch (InterruptedException e){
      e.printStackTrace();
    }

    HSF默认的超时配置是3000ms,如果过了超时时间,业务对象未返回,这时调用HSFFuture.getResponse会抛出超时异常;HSFFuture.getResponse(timeout),如果这里的timeout时间内,业务结果没有返回,也没有超时,可以调用多次执行getResponse去获取结果。

  • Spring配置HSF服务

    Spring框架在应用中广泛使用,如果不想以API的形式配置HSF服务,您可以使用Spring XML的形式进行配置,上述例子中API配置等同于如下XML配置。

    <bean id="orderService" class="com.taobao.hsf.app.spring.util.HSFSpringConsumerBean">
        <property name="interfaceName" value="com.alibaba.middleware.hsf.guide.api.service.OrderService"/>
        <property name="version" value="1.0.0"/>
        <property name="group" value="HSF"/>
        <!--[设置] 订阅服务的接口 -->
        <property name="asyncallMethods">
            <list>
                <value>name:queryOrder;type:future</value>
            </list>
         </property>
    </bean>
  • 注解配置HSF服务

    SpringBoot广泛使用的今天,使用注解装配SpringBean也成为一种选择,HSF也支持使用注解进行配置,用来订阅服务。

    在项目中增加依赖starter。

    <dependency>
        <groupId>com.alibaba.boot</groupId>
        <artifactId>pandora-hsf-spring-boot-starter</artifactId>
    </dependency>

    通常一个HSF Consumer会在多个地方使用,但并不需要在每次使用的地方都用 @HSFConsumer来标记。只需要写一个统一个Config类,然后在其它需要使用的地方,直接 @Autowired注入即可。上述例子中的API配置等同于如下注解配置。

    @Configuration
    public class HsfConfig {
        @HSFConsumer(serviceVersion = "1.0.0", serviceGroup = "HSF", futureMethods = "sayHelloInFuture")
        OrderService orderService;
    
    }

    在使用时直接注入即可。

    @Autowired
    OrderService orderService;

Callback异步调用

客户端配置为Callback方式调用时,需要配置一个实现了HSFResponseCallback接口的listener,结果返回之后,HSF会调用HSFResponseCallback中的方法。时序图如下所示:

Callback异步调用

  • API形式配置HSF服务

    Callback的调用上下文

    用户在调用前还可以通过CallbackInvocationContext.setContext(Object obj),来设置一个关于本次调用的上下文信息,该信息存放在threadlocal中。在listener的回调函数中,可以通过CallbackInvocationContext.getContext() 来获取该对象。

    回调函数示例

    public class CallbackHandler implements HSFResponseCallback {
    
        // 业务异常时会触发
        @Override
        public void onAppException(Throwable t) {
            t.printStackTrace();
        }
    
        // 业务返回结果
        @Override
        public void onAppResponse(Object result) {
            // 取callback调用时设置的上下文
            Object context = CallbackInvocationContext.getContext();
    
            System.out.println(result.toString() + context);
        }
        //HSF异常
        @Override
        public void onHSFException(HSFException e) {
            e.printStackTrace();
        }
    }

    接口Callback方法配置

    HSFApiConsumerBean hsfApiConsumerBean = new HSFApiConsumerBean();
    hsfApiConsumerBean.setInterfaceName("com.alibaba.middleware.hsf.guide.api.service.OrderService");
    hsfApiConsumerBean.setVersion("1.0.0");
    hsfApiConsumerBean.setGroup("HSF");
    // [设置] 异步callback调用
    List<String> asyncallMethods = new ArrayList<String>();
    asyncallMethods.add("name:queryOrder;type:callback;listener:com.alibaba.middleware.hsf.CallbackHandler");
    hsfApiConsumerBean.setAsyncallMethods(asyncallMethods);
    hsfApiConsumerBean.init(true);
    // [代理] 获取HSF代理
    OrderService orderService = (OrderService) hsfApiConsumerBean.getObject();
     // 可选步骤,设置上下文。CallbackHandler中通过api可以获取到
    CallbackInvocationContext.setContext("in callback");
    // 发起调用
    orderService.queryOrder(1L); // 这里返回的其实是null
    // 清理上下文
    CallbackInvocationContext.setContext(null);
    // do something else

    在调用线程中可以设置上下文,然后在listener中获取使用。相对于Future异步调用,callback会立即知晓结果的返回。

  • Spring配置HSF服务

    <bean id="CallHelloWorld" class="com.taobao.hsf.app.spring.util.HSFSpringConsumerBean">
        <!--[设置] 订阅服务的接口 -->
        <property name="interfaceName" value="com.alibaba.middleware.hsf.guide.api.service.OrderService"/>
        <!--[设置] 服务的版本 -->
        <property name="version" value="1.0.0"/>
        <!--[设置] 服务的归组 -->
        <property name="group" value="HSF"/>
        <property name="asyncallMethods">
            <list>
                <!--future的含义为通过Future的方式去获取请求执行的结果,例如先调用下远程的接口,接着在同一线程继续做别的事情,然后再在同一线程中通过Future来获取结果 -->
                <!--name:methodName;type:future|callback-->
                <value>name:queryOrder;type:callback;listener:com.alibaba.middleware.hsf.CallbackHandler</value>
            </list>
         </property>
    </bean>
  • 注解配置HSF接口方法为callback调用

    @AsyncOn(interfaceName = OrderService.class, methodName = "queryOrder")
    public class CallbackHandler implements HSFResponseCallback {
    
        @Override
        public void onAppException(Throwable t) {
            t.printStackTrace();
        }
    
        @Override
        public void onAppResponse(Object result) {
            // 取callback调用时设置的上下文
            Object context = CallbackInvocationContext.getContext();
    
            System.out.println(result.toString() + context);
        }
    
        @Override
        public void onHSFException(HSFException e) {
            e.printStackTrace();
        }
    
    
    }
    重要

    回调函数是由单独的线程池( LinkedBlockingQueue无限队列)来调用的,不要做太费时间的操作,避免影响其他请求的onAppResponse回调。callback线程默认的corePoolSize、maxPoolSize是实例CPU数目。 下面的-D 参数可以去自定义配置。

    • CALLBACK线程池最小配置:-Dhsf.callback.min.poolsize

    • CALLBACK线程池最大配置:-Dhsf.callback.max.poolsize