Use the Java SDK to manage consumer groups

更新时间:
复制 MD 格式

Consumer groups handle load balancing and failovers for log data consumption, so you can focus on business logic. This topic provides sample code for creating, modifying, querying, and deleting consumer groups and their checkpoints.

Prerequisites

Precautions

In this example, the public Simple Log Service endpoint for the China (Hangzhou) region is used. Endpoint: https://cn-hangzhou.log.aliyuncs.com.

If you want to access Simple Log Service from other Alibaba Cloud services that reside in the same region as your project, you can use the internal Simple Log Service endpoint, which is https://cn-hangzhou-intranet.log.aliyuncs.com.

For more information about the supported regions and endpoints of Simple Log Service, see Endpoints.

Create a consumer group

The following code creates a consumer group named ali-test-consumergroup.

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;

public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Enter the project name.
        String projectName = "ali-test-project";
        // Enter the Logstore name.
        String logstoreName = "ali-test-logstore";
        // Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client.
        Client client = new Client(host, accessId, accessKey);

        try {
            // Set the consumer group name.
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("ready to create consumergroup");

            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);

            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);

            System.out.println(String.format("create consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Expected result:

ready to create consumergroup
create consumergroup ali-test-consumergroup success

Modify a consumer group

The following code modifies the consumer group named ali-test-consumergroup.

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;

public class UpdateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Enter the project name.
        String projectName = "ali-test-project";
        // Enter the Logstore name.
        String logstoreName = "ali-test-logstore";
        // Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client.
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "ali-test-consumergroup";
            System.out.println("ready to update consumergroup");

            // Change the timeout period of the consumer group to 350 seconds.
            client.UpdateConsumerGroup(projectName, logstoreName, consumerGroupName, false, 350);

            System.out.println(String.format("update consumergroup %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Expected result:

ready to update consumergroup
update consumergroup ali-test-consumergroup success

Query all consumer groups

The following code queries all consumer groups in a Logstore.

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;

public class ListConsumerGroup {
    public static void main(String[] args) throws LogException {
         // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Enter the project name.
        String projectName = "ali-test-project";
        // Enter the Logstore name.
        String logstoreName = "ali-test-logstore";
        // Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client.
        Client client = new Client(host, accessId, accessKey);

        try {
            System.out.println("ready to list consumergroup");

            // Query all consumer groups in the specified Logstore.
            ListConsumerGroupResponse response = client.ListConsumerGroup(projectName,logstoreName);

            for(ConsumerGroup consumerGroup : response.GetConsumerGroups()){
                System.out.println("ConsumerName is : " + consumerGroup.getConsumerGroupName());
            }

            System.out.println(String.format("list consumergroup from %s success",projectName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Expected result:

ready to list consumergroup
ConsumerName is : ali-test-consumergroup2
ConsumerName is : ali-test-consumergroup
list consumergroup from ali-test-project success

Delete a consumer group

The following code deletes a specified consumer group.

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;

public class DeleteConsumerGroup {
    public static void main(String[] args) throws LogException {
         // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Enter the project name.
        String projectName = "ali-test-project";
        // Enter the Logstore name.
        String logstoreName = "ali-test-logstore";
        // Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client.
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "ali-test-consumergroup";
            System.out.println("ready to delete consumergroup");

            // Delete a specified consumer group.
            client.DeleteConsumerGroup(projectName,logstoreName,consumerGroupName);

            System.out.println(String.format("delete consumergroup %s success",consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Expected result:

ready to delete consumergroup
delete consumergroup ali-test-consumergroup success

Get a consumer group checkpoint

The following code obtains the checkpoint of a consumer group.

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.GetCheckPointResponse;

public class GetCheckPoint {
    public static void main(String[] args) throws LogException {
         // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Enter the project name.
        String projectName = "ali-test-project";
        // Enter the Logstore name.
        String logstoreName = "ali-test-logstore";
        // Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client.
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "consumerGroupX";
            System.out.println("ready to get consumergroup checkpoint");

            // Obtain the checkpoint of a shard in the specified consumer group.
            GetCheckPointResponse response1 = client.getCheckpoint(projectName,logstoreName,consumerGroupName,0);
            GetCheckPointResponse response2 = client.getCheckpoint(projectName,logstoreName,consumerGroupName,1);
            System.out.println("The checkpoint of Shard 0 is : " + response1.getCheckpoint());
            System.out.println("The checkpoint of Shard 1 is : " + response2.getCheckpoint());

            System.out.println(String.format("get consumergroup %s checkpoint success",consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Expected result:

ready to get consumergroup checkpoint
The checkpoint of Shard 0 is : ConsumerGroupShardCheckPoint [shard=0, checkPoint=MTY2NzgxMDc0Nzk5MDk5MzAyMg==, updateTime=1668750821709044, consumer=consumer_1]
The checkpoint of Shard 1 is : ConsumerGroupShardCheckPoint [shard=1, checkPoint=MTY2NzgxMDc0Nzk5MTk0NTU0NQ==, updateTime=1668750828790425, consumer=consumer_1]
get consumergroup consumerGroupX checkpoint success

Update a consumer group checkpoint

The following code updates the checkpoint of a consumer group.

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;

public class ConsumerGroupUpdateCheckpoint {
    public static void main(String[] args) throws LogException {
         // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // Enter the project name.
        String projectName = "ali-test-project";
        // Enter the Logstore name.
        String logstoreName = "ali-test-logstore";
        // Set the endpoint for Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
        String host = "https://cn-hangzhou.log.aliyuncs.com";

        // Create a Simple Log Service client.
        Client client = new Client(host, accessId, accessKey);

        try {
            String consumerGroupName = "consumerGroupX";
            System.out.println("ready to update checkpoint");

            // Query all consumer groups in the specified Logstore.
            ListConsumerGroupResponse response = client.ListConsumerGroup(projectName, logstoreName);

            for(ConsumerGroup consumerGroup : response.GetConsumerGroups()){
                System.out.println("ConsumerName is : " + consumerGroup.getConsumerGroupName());
                System.out.println("Consumer order is : " + consumerGroup.isInOrder());
            }

            // Update the checkpoint of Shard 0. You can call the GetCursor operation to obtain the cursor for a specific time.
            client.UpdateCheckPoint(projectName, logstoreName, consumerGroupName, 0, "MTY2NzgxMDc0Nzk5MTAwNjQ3Mg==");
            System.out.println(String.format("update checkpoint of %s success", consumerGroupName));

        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
        }
    }
}

Expected result:

ready to update checkpoint
ConsumerName is : consumerGroupX
Consumer order is : false
ConsumerName is : ali-test-consumergroup2
Consumer order is : true
ConsumerName is : ali-test-consumergroup
Consumer order is : false
update consumergroup checkpoint is:consumerGroupX
update checkpoint of consumerGroupX success

References