diff --git a/internal/methods.go b/internal/methods.go index b9c3402..857084c 100644 --- a/internal/methods.go +++ b/internal/methods.go @@ -101,6 +101,82 @@ type ConsumeParams struct { OnEmptySleep time.Duration } +type ConsumerParams[T any] struct { + Queue string + PoolSize int + Worker func(context.Context, T) error + VisibilityTimeout int64 + OnEmptySleep time.Duration +} + +type Consumer[T any] struct { + q *Queries + params ConsumerParams[T] +} + +func (c Consumer[T]) Consume(ctx context.Context) error { + workers := pond.New(c.params.PoolSize, c.params.PoolSize) + sleep := c.params.OnEmptySleep + if sleep == 0 { + sleep = 1 * time.Second + } + for { + // If the context gets canceled for example, stop consuming + if ctx.Err() != nil { + return nil + } + + if c.params.VisibilityTimeout > 0 { + _, err := c.q.ResetJobs(ctx, ResetJobsParams{ + Queue: c.params.Queue, + ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout, + }) + + if err != nil { + return fmt.Errorf("error resetting jobs: %w", err) + } + } + + jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{ + Queue: c.params.Queue, + Count: int64(c.params.PoolSize), + }) + + if err != nil { + return fmt.Errorf("error grabbing jobs: %w", err) + } + + if len(jobs) == 0 { + time.Sleep(sleep) + continue + } + + for _, job := range jobs { + j := new(T) + err := json.Unmarshal([]byte(job.Job), j) + if err != nil { + c.q.FailJob(ctx, FailJobParams{ + ID: job.ID, + Errors: ErrorList(append(job.Errors, err.Error())), + }) + } + workers.Submit(func() { + err := c.params.Worker(ctx, *j) + if err != nil { + c.q.FailJob(ctx, FailJobParams{ + ID: job.ID, + Errors: ErrorList(append(job.Errors, err.Error())), + }) + return + } + + c.q.CompleteJob(ctx, job.ID) + }) + } + } + +} + func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error { workers := pond.New(params.PoolSize, params.PoolSize) sleep := params.OnEmptySleep