mirror of
https://github.com/nyanotech/object-holder.git
synced 2025-12-16 03:20:25 -08:00
160 lines
4.4 KiB
Go
160 lines
4.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
)
|
|
|
|
var endpoint = flag.String("endpoint", "", "s3 endpoint")
|
|
var region = flag.String("region", "us-east-1", "s3 region")
|
|
|
|
var accessKeyId = flag.String("access-key-id", "", "aws access key")
|
|
var secretAccessKey = flag.String("secret-access-key", "", "aws secret key")
|
|
|
|
var bucket = flag.String("bucket", "", "bucket name")
|
|
var prefix = flag.String("prefix", "", "only operate on objects starting with this prefix")
|
|
|
|
var updateExpiresWithin = flag.Int("update-expires-within", 0, "only update objects whose lock expires within this many seconds (default 0)")
|
|
var lockFor = flag.Int("lock-for", 90*24*3600, "how many seconds to renew the object lock for (default 90 days)")
|
|
|
|
var threadCount = flag.Int("threads", 1024, "how many objects to operate on at a time")
|
|
var lockMode = flag.String("lock-mode", "compliance", "object lock mode (governance or compliance)")
|
|
|
|
type objectLockOptions struct {
|
|
CheckExistingHold bool
|
|
UpdateExpiry time.Time
|
|
LockExpiry time.Time
|
|
Mode types.ObjectLockRetentionMode
|
|
}
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
var mode types.ObjectLockRetentionMode
|
|
switch strings.ToLower(*lockMode) {
|
|
case "governance":
|
|
mode = types.ObjectLockRetentionModeGovernance
|
|
case "compliance":
|
|
mode = types.ObjectLockRetentionModeCompliance
|
|
default:
|
|
log.Fatalln("Invalid lock mode. Must be 'governance' or 'compliance'")
|
|
}
|
|
|
|
objectLockArguments := objectLockOptions{
|
|
CheckExistingHold: *updateExpiresWithin != 0,
|
|
UpdateExpiry: time.Now().Add(time.Second * time.Duration(*updateExpiresWithin)),
|
|
LockExpiry: time.Now().Add(time.Second * time.Duration(*lockFor)),
|
|
Mode: mode,
|
|
}
|
|
|
|
options := []func(*config.LoadOptions) error{}
|
|
|
|
if *region != "" {
|
|
options = append(options, config.WithRegion(*region))
|
|
}
|
|
|
|
if *endpoint != "" {
|
|
if !strings.HasPrefix(*endpoint, "http://") && !strings.HasPrefix(*endpoint, "https://") {
|
|
*endpoint = "https://" + *endpoint
|
|
}
|
|
|
|
options = append(options, config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
|
|
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
|
return aws.Endpoint{
|
|
URL: *endpoint,
|
|
}, nil
|
|
},
|
|
)))
|
|
}
|
|
|
|
if *accessKeyId != "" && *secretAccessKey != "" {
|
|
options = append(options, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(*accessKeyId, *secretAccessKey, "")))
|
|
}
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.TODO(), options...)
|
|
if err != nil {
|
|
log.Fatalln("Failed to create AWS config", err)
|
|
}
|
|
|
|
svc := s3.NewFromConfig(cfg)
|
|
|
|
paginator := s3.NewListObjectsV2Paginator(svc, &s3.ListObjectsV2Input{
|
|
Bucket: bucket,
|
|
Prefix: prefix,
|
|
})
|
|
|
|
objectQueue := make(chan string)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < *threadCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
queueWorker(svc, objectLockArguments, objectQueue)
|
|
}()
|
|
}
|
|
|
|
for paginator.HasMorePages() {
|
|
page, err := paginator.NextPage(context.TODO())
|
|
if err != nil {
|
|
log.Fatalln("Failed to list objects", err)
|
|
}
|
|
|
|
for _, item := range page.Contents {
|
|
objectQueue <- *item.Key
|
|
}
|
|
}
|
|
|
|
close(objectQueue)
|
|
wg.Wait()
|
|
}
|
|
|
|
func queueWorker(svc *s3.Client, options objectLockOptions, inQueue chan string) {
|
|
for {
|
|
object, more := <-inQueue
|
|
if !more {
|
|
return
|
|
}
|
|
checkAndRenewObjectLock(svc, options, object)
|
|
}
|
|
}
|
|
|
|
func checkAndRenewObjectLock(svc *s3.Client, options objectLockOptions, object string) {
|
|
updateHold := !options.CheckExistingHold
|
|
if !updateHold {
|
|
retention, _ := svc.GetObjectRetention(context.TODO(), &s3.GetObjectRetentionInput{
|
|
Bucket: bucket,
|
|
Key: &object,
|
|
})
|
|
if retention == nil || retention.Retention.RetainUntilDate.Before(options.UpdateExpiry) {
|
|
updateHold = true
|
|
}
|
|
}
|
|
|
|
if updateHold {
|
|
log.Println("Renewing object lock for object", object)
|
|
_, err := svc.PutObjectRetention(context.TODO(), &s3.PutObjectRetentionInput{
|
|
Bucket: bucket,
|
|
Key: &object,
|
|
Retention: &types.ObjectLockRetention{
|
|
Mode: options.Mode,
|
|
RetainUntilDate: aws.Time(options.LockExpiry),
|
|
},
|
|
})
|
|
if err != nil {
|
|
// TODO: handle 403 for when object already has a longer-lasting hold
|
|
log.Fatalln("Failed to update retention for", object, err)
|
|
}
|
|
}
|
|
}
|