Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91ddecb739 | ||
|
|
a0757c7ec6 | ||
|
|
043c5c2703 | ||
|
|
8c4c128a68 |
25
.github/workflows/go.yml
vendored
Normal file
25
.github/workflows/go.yml
vendored
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
# This workflow will build a golang project
|
||||||
|
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
|
||||||
|
|
||||||
|
name: Go
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [ "main" ]
|
||||||
|
pull_request:
|
||||||
|
branches: [ "main" ]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
|
||||||
|
build:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Set up Go
|
||||||
|
uses: actions/setup-go@v4
|
||||||
|
with:
|
||||||
|
go-version: '1.22'
|
||||||
|
|
||||||
|
- name: Test
|
||||||
|
run: go test -v ./...
|
||||||
21
LICENSE
Normal file
21
LICENSE
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2024 Sebastien Armand
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
||||||
@@ -224,7 +224,7 @@ func Test_Consume(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||||
Queue: "q1",
|
Queue: "q1",
|
||||||
PoolSize: 10,
|
PoolSize: 1,
|
||||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||||
q1 <- job.Job
|
q1 <- job.Job
|
||||||
if job.Job == "q1-99" {
|
if job.Job == "q1-99" {
|
||||||
@@ -243,7 +243,7 @@ func Test_Consume(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
jqueue.Consume(ctx2, internal.ConsumeParams{
|
jqueue.Consume(ctx2, internal.ConsumeParams{
|
||||||
Queue: "q2",
|
Queue: "q2",
|
||||||
PoolSize: 7,
|
PoolSize: 1,
|
||||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||||
q2 <- job.Job
|
q2 <- job.Job
|
||||||
if job.Job == "q2-99" {
|
if job.Job == "q2-99" {
|
||||||
@@ -357,7 +357,7 @@ func Test_VisibilityTimeout(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
jqueue.Consume(ctx, internal.ConsumeParams{
|
jqueue.Consume(ctx, internal.ConsumeParams{
|
||||||
Queue: "",
|
Queue: "",
|
||||||
PoolSize: 6,
|
PoolSize: 1,
|
||||||
VisibilityTimeout: 1,
|
VisibilityTimeout: 1,
|
||||||
OnEmptySleep: 100 * time.Millisecond,
|
OnEmptySleep: 100 * time.Millisecond,
|
||||||
Worker: func(ctx context.Context, job *internal.Job) error {
|
Worker: func(ctx context.Context, job *internal.Job) error {
|
||||||
|
|||||||
@@ -101,6 +101,82 @@ type ConsumeParams struct {
|
|||||||
OnEmptySleep time.Duration
|
OnEmptySleep time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ConsumerParams[T any] struct {
|
||||||
|
Queue string
|
||||||
|
PoolSize int
|
||||||
|
Worker func(context.Context, T) error
|
||||||
|
VisibilityTimeout int64
|
||||||
|
OnEmptySleep time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type Consumer[T any] struct {
|
||||||
|
q *Queries
|
||||||
|
params ConsumerParams[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Consumer[T]) Consume(ctx context.Context) error {
|
||||||
|
workers := pond.New(c.params.PoolSize, c.params.PoolSize)
|
||||||
|
sleep := c.params.OnEmptySleep
|
||||||
|
if sleep == 0 {
|
||||||
|
sleep = 1 * time.Second
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
// If the context gets canceled for example, stop consuming
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.params.VisibilityTimeout > 0 {
|
||||||
|
_, err := c.q.ResetJobs(ctx, ResetJobsParams{
|
||||||
|
Queue: c.params.Queue,
|
||||||
|
ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error resetting jobs: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{
|
||||||
|
Queue: c.params.Queue,
|
||||||
|
Count: int64(c.params.PoolSize),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error grabbing jobs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(jobs) == 0 {
|
||||||
|
time.Sleep(sleep)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
j := new(T)
|
||||||
|
err := json.Unmarshal([]byte(job.Job), j)
|
||||||
|
if err != nil {
|
||||||
|
c.q.FailJob(ctx, FailJobParams{
|
||||||
|
ID: job.ID,
|
||||||
|
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
workers.Submit(func() {
|
||||||
|
err := c.params.Worker(ctx, *j)
|
||||||
|
if err != nil {
|
||||||
|
c.q.FailJob(ctx, FailJobParams{
|
||||||
|
ID: job.ID,
|
||||||
|
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.q.CompleteJob(ctx, job.ID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
||||||
workers := pond.New(params.PoolSize, params.PoolSize)
|
workers := pond.New(params.PoolSize, params.PoolSize)
|
||||||
sleep := params.OnEmptySleep
|
sleep := params.OnEmptySleep
|
||||||
|
|||||||
Reference in New Issue
Block a user