Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91ddecb739 | ||
|
|
a0757c7ec6 | ||
|
|
043c5c2703 | ||
|
|
8c4c128a68 |
25
.github/workflows/go.yml
vendored
Normal file
25
.github/workflows/go.yml
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
# This workflow will build a golang project
|
||||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
|
||||
|
||||
name: Go
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ "main" ]
|
||||
pull_request:
|
||||
branches: [ "main" ]
|
||||
|
||||
jobs:
|
||||
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: '1.22'
|
||||
|
||||
- name: Test
|
||||
run: go test -v ./...
|
||||
21
LICENSE
Normal file
21
LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2024 Sebastien Armand
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -224,7 +224,7 @@ func Test_Consume(t *testing.T) {
|
||||
go func() {
|
||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||
Queue: "q1",
|
||||
PoolSize: 10,
|
||||
PoolSize: 1,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
q1 <- job.Job
|
||||
if job.Job == "q1-99" {
|
||||
@@ -243,7 +243,7 @@ func Test_Consume(t *testing.T) {
|
||||
go func() {
|
||||
jqueue.Consume(ctx2, internal.ConsumeParams{
|
||||
Queue: "q2",
|
||||
PoolSize: 7,
|
||||
PoolSize: 1,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
q2 <- job.Job
|
||||
if job.Job == "q2-99" {
|
||||
@@ -357,7 +357,7 @@ func Test_VisibilityTimeout(t *testing.T) {
|
||||
go func() {
|
||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||
Queue: "",
|
||||
PoolSize: 6,
|
||||
PoolSize: 1,
|
||||
VisibilityTimeout: 1,
|
||||
OnEmptySleep: 100 * time.Millisecond,
|
||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||
|
||||
@@ -101,6 +101,82 @@ type ConsumeParams struct {
|
||||
OnEmptySleep time.Duration
|
||||
}
|
||||
|
||||
type ConsumerParams[T any] struct {
|
||||
Queue string
|
||||
PoolSize int
|
||||
Worker func(context.Context, T) error
|
||||
VisibilityTimeout int64
|
||||
OnEmptySleep time.Duration
|
||||
}
|
||||
|
||||
type Consumer[T any] struct {
|
||||
q *Queries
|
||||
params ConsumerParams[T]
|
||||
}
|
||||
|
||||
func (c Consumer[T]) Consume(ctx context.Context) error {
|
||||
workers := pond.New(c.params.PoolSize, c.params.PoolSize)
|
||||
sleep := c.params.OnEmptySleep
|
||||
if sleep == 0 {
|
||||
sleep = 1 * time.Second
|
||||
}
|
||||
for {
|
||||
// If the context gets canceled for example, stop consuming
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.params.VisibilityTimeout > 0 {
|
||||
_, err := c.q.ResetJobs(ctx, ResetJobsParams{
|
||||
Queue: c.params.Queue,
|
||||
ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error resetting jobs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{
|
||||
Queue: c.params.Queue,
|
||||
Count: int64(c.params.PoolSize),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error grabbing jobs: %w", err)
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
time.Sleep(sleep)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
j := new(T)
|
||||
err := json.Unmarshal([]byte(job.Job), j)
|
||||
if err != nil {
|
||||
c.q.FailJob(ctx, FailJobParams{
|
||||
ID: job.ID,
|
||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||
})
|
||||
}
|
||||
workers.Submit(func() {
|
||||
err := c.params.Worker(ctx, *j)
|
||||
if err != nil {
|
||||
c.q.FailJob(ctx, FailJobParams{
|
||||
ID: job.ID,
|
||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.q.CompleteJob(ctx, job.ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
||||
workers := pond.New(params.PoolSize, params.PoolSize)
|
||||
sleep := params.OnEmptySleep
|
||||
|
||||
Reference in New Issue
Block a user