gotosocial/vendor/github.com/minio/minio-go/v7/api-remove.go
Dominik Süß 9d0df426da
[feature] S3 support (#674)
* feat: vendor minio client

* feat: introduce storage package with s3 support

* feat: serve s3 files directly

this saves a lot of bandwith as the files are fetched from the object
store directly

* fix: use explicit local storage in tests

* feat: integrate s3 storage with the main server

* fix: add s3 config to cli tests

* docs: explicitly set values in example config

also adds license header to the storage package

* fix: use better http status code on s3 redirect

HTTP 302 Found is the best fit, as it signifies that the resource
requested was found but not under its presumed URL

307/TemporaryRedirect would mean that this resource is usually located
here, not in this case

303/SeeOther indicates that the redirection does not link to the
requested resource but to another page

* refactor: use context in storage driver interface
2022-07-03 12:08:30 +02:00

545 lines
16 KiB
Go

/*
* MinIO Go Library for Amazon S3 Compatible Cloud Storage
* Copyright 2015-2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"bytes"
"context"
"encoding/xml"
"io"
"net/http"
"net/url"
"time"
"github.com/minio/minio-go/v7/pkg/s3utils"
)
//revive:disable
// Deprecated: BucketOptions will be renamed to RemoveBucketOptions in future versions.
type BucketOptions = RemoveBucketOptions
//revive:enable
// RemoveBucketOptions special headers to purge buckets, only
// useful when endpoint is MinIO
type RemoveBucketOptions struct {
ForceDelete bool
}
// RemoveBucketWithOptions deletes the bucket name.
//
// All objects (including all object versions and delete markers)
// in the bucket will be deleted forcibly if bucket options set
// ForceDelete to 'true'.
func (c *Client) RemoveBucketWithOptions(ctx context.Context, bucketName string, opts RemoveBucketOptions) error {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err
}
// Build headers.
headers := make(http.Header)
if opts.ForceDelete {
headers.Set(minIOForceDelete, "true")
}
// Execute DELETE on bucket.
resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{
bucketName: bucketName,
contentSHA256Hex: emptySHA256Hex,
customHeader: headers,
})
defer closeResponse(resp)
if err != nil {
return err
}
if resp != nil {
if resp.StatusCode != http.StatusNoContent {
return httpRespToErrorResponse(resp, bucketName, "")
}
}
// Remove the location from cache on a successful delete.
c.bucketLocCache.Delete(bucketName)
return nil
}
// RemoveBucket deletes the bucket name.
//
// All objects (including all object versions and delete markers).
// in the bucket must be deleted before successfully attempting this request.
func (c *Client) RemoveBucket(ctx context.Context, bucketName string) error {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err
}
// Execute DELETE on bucket.
resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{
bucketName: bucketName,
contentSHA256Hex: emptySHA256Hex,
})
defer closeResponse(resp)
if err != nil {
return err
}
if resp != nil {
if resp.StatusCode != http.StatusNoContent {
return httpRespToErrorResponse(resp, bucketName, "")
}
}
// Remove the location from cache on a successful delete.
c.bucketLocCache.Delete(bucketName)
return nil
}
// AdvancedRemoveOptions intended for internal use by replication
type AdvancedRemoveOptions struct {
ReplicationDeleteMarker bool
ReplicationStatus ReplicationStatus
ReplicationMTime time.Time
ReplicationRequest bool
}
// RemoveObjectOptions represents options specified by user for RemoveObject call
type RemoveObjectOptions struct {
ForceDelete bool
GovernanceBypass bool
VersionID string
Internal AdvancedRemoveOptions
}
// RemoveObject removes an object from a bucket.
func (c *Client) RemoveObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) error {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return err
}
res := c.removeObject(ctx, bucketName, objectName, opts)
return res.Err
}
func (c *Client) removeObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) RemoveObjectResult {
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
if opts.VersionID != "" {
urlValues.Set("versionId", opts.VersionID)
}
// Build headers.
headers := make(http.Header)
if opts.GovernanceBypass {
// Set the bypass goverenance retention header
headers.Set(amzBypassGovernance, "true")
}
if opts.Internal.ReplicationDeleteMarker {
headers.Set(minIOBucketReplicationDeleteMarker, "true")
}
if !opts.Internal.ReplicationMTime.IsZero() {
headers.Set(minIOBucketSourceMTime, opts.Internal.ReplicationMTime.Format(time.RFC3339Nano))
}
if !opts.Internal.ReplicationStatus.Empty() {
headers.Set(amzBucketReplicationStatus, string(opts.Internal.ReplicationStatus))
}
if opts.Internal.ReplicationRequest {
headers.Set(minIOBucketReplicationRequest, "")
}
if opts.ForceDelete {
headers.Set(minIOForceDelete, "true")
}
// Execute DELETE on objectName.
resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{
bucketName: bucketName,
objectName: objectName,
contentSHA256Hex: emptySHA256Hex,
queryValues: urlValues,
customHeader: headers,
})
defer closeResponse(resp)
if err != nil {
return RemoveObjectResult{Err: err}
}
if resp != nil {
// if some unexpected error happened and max retry is reached, we want to let client know
if resp.StatusCode != http.StatusNoContent {
err := httpRespToErrorResponse(resp, bucketName, objectName)
return RemoveObjectResult{Err: err}
}
}
// DeleteObject always responds with http '204' even for
// objects which do not exist. So no need to handle them
// specifically.
return RemoveObjectResult{
ObjectName: objectName,
ObjectVersionID: opts.VersionID,
DeleteMarker: resp.Header.Get("x-amz-delete-marker") == "true",
DeleteMarkerVersionID: resp.Header.Get("x-amz-version-id"),
}
}
// RemoveObjectError - container of Multi Delete S3 API error
type RemoveObjectError struct {
ObjectName string
VersionID string
Err error
}
// RemoveObjectResult - container of Multi Delete S3 API result
type RemoveObjectResult struct {
ObjectName string
ObjectVersionID string
DeleteMarker bool
DeleteMarkerVersionID string
Err error
}
// generateRemoveMultiObjects - generate the XML request for remove multi objects request
func generateRemoveMultiObjectsRequest(objects []ObjectInfo) []byte {
delObjects := []deleteObject{}
for _, obj := range objects {
delObjects = append(delObjects, deleteObject{
Key: obj.Key,
VersionID: obj.VersionID,
})
}
xmlBytes, _ := xml.Marshal(deleteMultiObjects{Objects: delObjects, Quiet: false})
return xmlBytes
}
// processRemoveMultiObjectsResponse - parse the remove multi objects web service
// and return the success/failure result status for each object
func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, resultCh chan<- RemoveObjectResult) {
// Parse multi delete XML response
rmResult := &deleteMultiObjectsResult{}
err := xmlDecoder(body, rmResult)
if err != nil {
resultCh <- RemoveObjectResult{ObjectName: "", Err: err}
return
}
// Fill deletion that returned success
for _, obj := range rmResult.DeletedObjects {
resultCh <- RemoveObjectResult{
ObjectName: obj.Key,
// Only filled with versioned buckets
ObjectVersionID: obj.VersionID,
DeleteMarker: obj.DeleteMarker,
DeleteMarkerVersionID: obj.DeleteMarkerVersionID,
}
}
// Fill deletion that returned an error.
for _, obj := range rmResult.UnDeletedObjects {
// Version does not exist is not an error ignore and continue.
switch obj.Code {
case "InvalidArgument", "NoSuchVersion":
continue
}
resultCh <- RemoveObjectResult{
ObjectName: obj.Key,
ObjectVersionID: obj.VersionID,
Err: ErrorResponse{
Code: obj.Code,
Message: obj.Message,
},
}
}
}
// RemoveObjectsOptions represents options specified by user for RemoveObjects call
type RemoveObjectsOptions struct {
GovernanceBypass bool
}
// RemoveObjects removes multiple objects from a bucket while
// it is possible to specify objects versions which are received from
// objectsCh. Remove failures are sent back via error channel.
func (c *Client) RemoveObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, opts RemoveObjectsOptions) <-chan RemoveObjectError {
errorCh := make(chan RemoveObjectError, 1)
// Validate if bucket name is valid.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
defer close(errorCh)
errorCh <- RemoveObjectError{
Err: err,
}
return errorCh
}
// Validate objects channel to be properly allocated.
if objectsCh == nil {
defer close(errorCh)
errorCh <- RemoveObjectError{
Err: errInvalidArgument("Objects channel cannot be nil"),
}
return errorCh
}
resultCh := make(chan RemoveObjectResult, 1)
go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts)
go func() {
defer close(errorCh)
for res := range resultCh {
// Send only errors to the error channel
if res.Err == nil {
continue
}
errorCh <- RemoveObjectError{
ObjectName: res.ObjectName,
VersionID: res.ObjectVersionID,
Err: res.Err,
}
}
}()
return errorCh
}
// RemoveObjectsWithResult removes multiple objects from a bucket while
// it is possible to specify objects versions which are received from
// objectsCh. Remove results, successes and failures are sent back via
// RemoveObjectResult channel
func (c *Client) RemoveObjectsWithResult(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, opts RemoveObjectsOptions) <-chan RemoveObjectResult {
resultCh := make(chan RemoveObjectResult, 1)
// Validate if bucket name is valid.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
defer close(resultCh)
resultCh <- RemoveObjectResult{
Err: err,
}
return resultCh
}
// Validate objects channel to be properly allocated.
if objectsCh == nil {
defer close(resultCh)
resultCh <- RemoveObjectResult{
Err: errInvalidArgument("Objects channel cannot be nil"),
}
return resultCh
}
go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts)
return resultCh
}
// Return true if the character is within the allowed characters in an XML 1.0 document
// The list of allowed characters can be found here: https://www.w3.org/TR/xml/#charsets
func validXMLChar(r rune) (ok bool) {
return r == 0x09 ||
r == 0x0A ||
r == 0x0D ||
r >= 0x20 && r <= 0xD7FF ||
r >= 0xE000 && r <= 0xFFFD ||
r >= 0x10000 && r <= 0x10FFFF
}
func hasInvalidXMLChar(str string) bool {
for _, s := range str {
if !validXMLChar(s) {
return true
}
}
return false
}
// Generate and call MultiDelete S3 requests based on entries received from objectsCh
func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, resultCh chan<- RemoveObjectResult, opts RemoveObjectsOptions) {
maxEntries := 1000
finish := false
urlValues := make(url.Values)
urlValues.Set("delete", "")
// Close result channel when Multi delete finishes.
defer close(resultCh)
// Loop over entries by 1000 and call MultiDelete requests
for {
if finish {
break
}
count := 0
var batch []ObjectInfo
// Try to gather 1000 entries
for object := range objectsCh {
if hasInvalidXMLChar(object.Key) {
// Use single DELETE so the object name will be in the request URL instead of the multi-delete XML document.
removeResult := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{
VersionID: object.VersionID,
GovernanceBypass: opts.GovernanceBypass,
})
if err := removeResult.Err; err != nil {
// Version does not exist is not an error ignore and continue.
switch ToErrorResponse(err).Code {
case "InvalidArgument", "NoSuchVersion":
continue
}
resultCh <- removeResult
}
resultCh <- removeResult
continue
}
batch = append(batch, object)
if count++; count >= maxEntries {
break
}
}
if count == 0 {
// Multi Objects Delete API doesn't accept empty object list, quit immediately
break
}
if count < maxEntries {
// We didn't have 1000 entries, so this is the last batch
finish = true
}
// Build headers.
headers := make(http.Header)
if opts.GovernanceBypass {
// Set the bypass goverenance retention header
headers.Set(amzBypassGovernance, "true")
}
// Generate remove multi objects XML request
removeBytes := generateRemoveMultiObjectsRequest(batch)
// Execute GET on bucket to list objects.
resp, err := c.executeMethod(ctx, http.MethodPost, requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
contentBody: bytes.NewReader(removeBytes),
contentLength: int64(len(removeBytes)),
contentMD5Base64: sumMD5Base64(removeBytes),
contentSHA256Hex: sum256Hex(removeBytes),
customHeader: headers,
})
if resp != nil {
if resp.StatusCode != http.StatusOK {
e := httpRespToErrorResponse(resp, bucketName, "")
resultCh <- RemoveObjectResult{ObjectName: "", Err: e}
}
}
if err != nil {
for _, b := range batch {
resultCh <- RemoveObjectResult{
ObjectName: b.Key,
ObjectVersionID: b.VersionID,
Err: err,
}
}
continue
}
// Process multiobjects remove xml response
processRemoveMultiObjectsResponse(resp.Body, batch, resultCh)
closeResponse(resp)
}
}
// RemoveIncompleteUpload aborts an partially uploaded object.
func (c *Client) RemoveIncompleteUpload(ctx context.Context, bucketName, objectName string) error {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return err
}
// Find multipart upload ids of the object to be aborted.
uploadIDs, err := c.findUploadIDs(ctx, bucketName, objectName)
if err != nil {
return err
}
for _, uploadID := range uploadIDs {
// abort incomplete multipart upload, based on the upload id passed.
err := c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
if err != nil {
return err
}
}
return nil
}
// abortMultipartUpload aborts a multipart upload for the given
// uploadID, all previously uploaded parts are deleted.
func (c *Client) abortMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string) error {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return err
}
// Initialize url queries.
urlValues := make(url.Values)
urlValues.Set("uploadId", uploadID)
// Execute DELETE on multipart upload.
resp, err := c.executeMethod(ctx, http.MethodDelete, requestMetadata{
bucketName: bucketName,
objectName: objectName,
queryValues: urlValues,
contentSHA256Hex: emptySHA256Hex,
})
defer closeResponse(resp)
if err != nil {
return err
}
if resp != nil {
if resp.StatusCode != http.StatusNoContent {
// Abort has no response body, handle it for any errors.
var errorResponse ErrorResponse
switch resp.StatusCode {
case http.StatusNotFound:
// This is needed specifically for abort and it cannot
// be converged into default case.
errorResponse = ErrorResponse{
Code: "NoSuchUpload",
Message: "The specified multipart upload does not exist.",
BucketName: bucketName,
Key: objectName,
RequestID: resp.Header.Get("x-amz-request-id"),
HostID: resp.Header.Get("x-amz-id-2"),
Region: resp.Header.Get("x-amz-bucket-region"),
}
default:
return httpRespToErrorResponse(resp, bucketName, objectName)
}
return errorResponse
}
}
return nil
}