From 2c5911fd2359e7dfa05ea58fc6c2be57cc049a82 Mon Sep 17 00:00:00 2001 From: nyanotech Date: Sun, 7 Apr 2024 03:15:48 -0700 Subject: [PATCH] implement a goroutine pool turns out b2 doesn't like it if you use too many threads --- main.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index e98d751..fe73edf 100644 --- a/main.go +++ b/main.go @@ -24,7 +24,7 @@ var secretAccessKey = flag.String("secret-access-key", "", "aws secret key") var updateExpiresWithin = flag.Int("update-expires-within", 0, "only update objects whose lock expires within this many seconds (default 0)") var lockFor = flag.Int("lock-for", 90*24*3600, "how many seconds to renew the object lock for (default 90 days)") -var wg sync.WaitGroup +var threadCount = flag.Int("threads", 1024, "how many objects to operate on at a time") func main() { flag.Parse() @@ -52,6 +52,17 @@ func main() { Bucket: bucket, }) + objectQueue := make(chan string) + + var wg sync.WaitGroup + for i := 0; i < *threadCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + queueWorker(svc, objectQueue) + }() + } + for paginator.HasMorePages() { page, err := paginator.NextPage(context.TODO()) if err != nil { @@ -59,17 +70,24 @@ func main() { } for _, item := range page.Contents { - wg.Add(1) - go func(svc *s3.Client, key string) { - defer wg.Done() - checkAndRenewObjectLock(svc, key) - }(svc, *item.Key) + objectQueue <- *item.Key } } + close(objectQueue) wg.Wait() } +func queueWorker(svc *s3.Client, inQueue chan string) { + for { + object, more := <-inQueue + if !more { + return + } + checkAndRenewObjectLock(svc, object) + } +} + func checkAndRenewObjectLock(svc *s3.Client, object string) { updateHold := false if *updateExpiresWithin == 0 {