通过S3 Java API连接并使用宽表引擎

前提条件

  • 安装S3 Java SDK。

    • 安装Java SDK 1.x Maven依赖

<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-java-sdk-s3</artifactId>
  <version>1.11.655</version>
</dependency>
    • 安装Java SDK 2.x Maven依赖

<dependency>
  <groupId>software.amazon.awssdk</groupId>
  <artifactId>aws-sdk-java</artifactId>
  <version>2.17.32</version>
</dependency>

Java SDK 1.x 示例

  • 创建连接

String s3Endpoint = "http://" + "ld-xxxx:9053"; // http加上集群的S3连接地址

AmazonS3 client = AmazonS3ClientBuilder.standard()
        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3Endpoint, null))
        .withPathStyleAccessEnabled(true)
        .build();
  • Bucket操作

// 创建bucket
Bucket bucket = client.createBucket(bucket_name);

// 获取bucket是否存在
HeadBucketResult result = client.headBucket(new HeadBucketRequest(bucket_name));

// 列举所有bucket
List<Bucket> buckets = client.listBuckets(new ListBucketsRequest());
  • Object操作

String content = "content";
// 上传object
client.putObject(bucket_name, key_name, content);

// 读取object
S3Object object = client.getObject(bucketName, keyName);

// 列举bucket内所有object
// list v1
ObjectListing objects = client.listObjects(bucketName);
// list v2
ListObjectsV2Result results = client.listObjectsV2(bucketName);

// 删除object
client.deleteObject(bucketName, keyName);

// 批量删除object
client.deleteObjects(new DeleteObjectsRequest(bucketName).withKeys(keyName));
  • Multipart Upload操作

// 要上传的文件
File file = new File(filePath);
long contentLength = file.length();
long partSize = 5 * 1024 * 1024; // 设置分片大小为5MB

List<PartETag> partETags = new ArrayList<PartETag>();

// 初始化分片上传
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, keyName);
InitiateMultipartUploadResult initResponse = client.initiateMultipartUpload(initRequest);

// 上传分片
long filePosition = 0;
for (int i = 1; filePosition < contentLength; i++) {
    partSize = Math.min(partSize, (contentLength - filePosition));

    // 分片上传请求
    UploadPartRequest uploadRequest = new UploadPartRequest()
        .withBucketName(bucketName)
        .withKey(keyName)
        .withUploadId(initResponse.getUploadId())
        .withPartNumber(i)
        .withFileOffset(filePosition)
        .withFile(file)
        .withPartSize(partSize);

    UploadPartResult uploadResult = client.uploadPart(uploadRequest);
    partETags.add(uploadResult.getPartETag());

    filePosition += partSize;
}

// 完成分片上传,object可见
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, keyName,
                                                                                initResponse.getUploadId(), partETags);
client.completeMultipartUpload(compRequest);

Java SDK 2.x 示例

  • 创建连接

String s3Endpoint = "http://" + "ld-xxxx:9053"; // http加上集群的S3连接地址

// 创建连接
S3Client client = S3Client.builder()
    .endpointOverride(new URI(s3Endpoint))
    .build();

// 关闭连接
client.close();
  • Bucket操作

// 创建bucket
S3Waiter s3Waiter = client.waiter();
CreateBucketRequest bucketRequest = CreateBucketRequest.builder()
    .bucket(bucketName)
    .build();

// 获取bucket是否存在
client.createBucket(bucketRequest);
HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder()
    .bucket(bucketName)
    .build();

WaiterResponse<HeadBucketResponse> waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait);
waiterResponse.matched().response().ifPresent(System.out::println);
  • Object操作

// 写object
PutObjectRequest putOb = PutObjectRequest.builder()
        .bucket(bucketName)
        .key(keyName)
        .build();

PutObjectResponse response = client.putObject(putOb,
        RequestBody.fromString("content"));

// 读object
GetObjectRequest objectRequest = GetObjectRequest
        .builder()
        .key(keyName)
        .bucket(bucketName)
        .build();

ResponseBytes<GetObjectResponse> objectBytes = client.getObjectAsBytes(objectRequest);
    byte[] data = objectBytes.asByteArray();

// 列举object
ListObjectsRequest listObjects = ListObjectsRequest
                    .builder()
                    .bucket(bucketName)
                    .build();

ListObjectsResponse res = client.listObjects(listObjects);
List<S3Object> objects = res.contents();
  • Multipart Upload操作

// 初始化分片上传
CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
    .bucket(bucketName)
    .key(keyName)
    .build();

CreateMultipartUploadResponse response = client.createMultipartUpload(createMultipartUploadRequest);
String uploadId = response.uploadId();
System.out.println(uploadId);

// 上传分片1
UploadPartRequest uploadPartRequest1 = UploadPartRequest.builder().bucket(bucketName).key(keyName)
    .uploadId(uploadId)
    .partNumber(1).build();
String etag1 = client.uploadPart(uploadPartRequest1, RequestBody.fromString("content1")).eTag();
CompletedPart part1 = CompletedPart.builder().partNumber(1).eTag(etag1).build();

// 上传分片2
UploadPartRequest uploadPartRequest2 = UploadPartRequest.builder().bucket(bucketName).key(keyName)
    .uploadId(uploadId)
    .partNumber(2).build();
String etag2 = client.uploadPart(uploadPartRequest2, RequestBody.fromString("content2")).eTag();
CompletedPart part2 = CompletedPart.builder().partNumber(2).eTag(etag2).build();

// 完成分片上传,object可见
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder()
    .parts(part1, part2)
    .build();

CompleteMultipartUploadRequest completeMultipartUploadRequest =
    CompleteMultipartUploadRequest.builder()
    .bucket(bucketName)
    .key(keyName)
    .uploadId(uploadId)
    .multipartUpload(completedMultipartUpload)
    .build();

client.completeMultipartUpload(completeMultipartUploadRequest);