1 Commits

Author SHA1 Message Date
Seb
91ddecb739 generics maybe 2024-12-14 21:32:53 -08:00

View File

@@ -101,6 +101,82 @@ type ConsumeParams struct {
OnEmptySleep time.Duration
}
type ConsumerParams[T any] struct {
Queue string
PoolSize int
Worker func(context.Context, T) error
VisibilityTimeout int64
OnEmptySleep time.Duration
}
type Consumer[T any] struct {
q *Queries
params ConsumerParams[T]
}
func (c Consumer[T]) Consume(ctx context.Context) error {
workers := pond.New(c.params.PoolSize, c.params.PoolSize)
sleep := c.params.OnEmptySleep
if sleep == 0 {
sleep = 1 * time.Second
}
for {
// If the context gets canceled for example, stop consuming
if ctx.Err() != nil {
return nil
}
if c.params.VisibilityTimeout > 0 {
_, err := c.q.ResetJobs(ctx, ResetJobsParams{
Queue: c.params.Queue,
ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout,
})
if err != nil {
return fmt.Errorf("error resetting jobs: %w", err)
}
}
jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{
Queue: c.params.Queue,
Count: int64(c.params.PoolSize),
})
if err != nil {
return fmt.Errorf("error grabbing jobs: %w", err)
}
if len(jobs) == 0 {
time.Sleep(sleep)
continue
}
for _, job := range jobs {
j := new(T)
err := json.Unmarshal([]byte(job.Job), j)
if err != nil {
c.q.FailJob(ctx, FailJobParams{
ID: job.ID,
Errors: ErrorList(append(job.Errors, err.Error())),
})
}
workers.Submit(func() {
err := c.params.Worker(ctx, *j)
if err != nil {
c.q.FailJob(ctx, FailJobParams{
ID: job.ID,
Errors: ErrorList(append(job.Errors, err.Error())),
})
return
}
c.q.CompleteJob(ctx, job.ID)
})
}
}
}
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
workers := pond.New(params.PoolSize, params.PoolSize)
sleep := params.OnEmptySleep