Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91ddecb739 |
25
.github/workflows/go.yml
vendored
Normal file
25
.github/workflows/go.yml
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
# This workflow will build a golang project
|
||||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
|
||||
|
||||
name: Go
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ "main" ]
|
||||
pull_request:
|
||||
branches: [ "main" ]
|
||||
|
||||
jobs:
|
||||
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: '1.22'
|
||||
|
||||
- name: Test
|
||||
run: go test -v ./...
|
||||
@@ -14,14 +14,14 @@ liteq allows to run tens of thousands of jobs per second if needed. It can also
|
||||
### Install
|
||||
|
||||
```sh
|
||||
go get git.jadud.com/jadudm/liteq
|
||||
go get github.com/khepin/liteq
|
||||
```
|
||||
|
||||
### Setup and DB creation
|
||||
|
||||
```go
|
||||
import (
|
||||
"git.jadud.com/jadudmliteq"
|
||||
"github.com/khepin/liteq"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/liteq"
|
||||
"github.com/khepin/liteq"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
|
||||
2
go.mod
2
go.mod
@@ -1,4 +1,4 @@
|
||||
module git.jadud.com/jadudm/liteq
|
||||
module github.com/khepin/liteq
|
||||
|
||||
go 1.22.0
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.jadud.com/jadudm/liteq/internal"
|
||||
"github.com/khepin/liteq/internal"
|
||||
"github.com/matryer/is"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
@@ -101,6 +101,82 @@ type ConsumeParams struct {
|
||||
OnEmptySleep time.Duration
|
||||
}
|
||||
|
||||
type ConsumerParams[T any] struct {
|
||||
Queue string
|
||||
PoolSize int
|
||||
Worker func(context.Context, T) error
|
||||
VisibilityTimeout int64
|
||||
OnEmptySleep time.Duration
|
||||
}
|
||||
|
||||
type Consumer[T any] struct {
|
||||
q *Queries
|
||||
params ConsumerParams[T]
|
||||
}
|
||||
|
||||
func (c Consumer[T]) Consume(ctx context.Context) error {
|
||||
workers := pond.New(c.params.PoolSize, c.params.PoolSize)
|
||||
sleep := c.params.OnEmptySleep
|
||||
if sleep == 0 {
|
||||
sleep = 1 * time.Second
|
||||
}
|
||||
for {
|
||||
// If the context gets canceled for example, stop consuming
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.params.VisibilityTimeout > 0 {
|
||||
_, err := c.q.ResetJobs(ctx, ResetJobsParams{
|
||||
Queue: c.params.Queue,
|
||||
ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error resetting jobs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{
|
||||
Queue: c.params.Queue,
|
||||
Count: int64(c.params.PoolSize),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error grabbing jobs: %w", err)
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
time.Sleep(sleep)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
j := new(T)
|
||||
err := json.Unmarshal([]byte(job.Job), j)
|
||||
if err != nil {
|
||||
c.q.FailJob(ctx, FailJobParams{
|
||||
ID: job.ID,
|
||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||
})
|
||||
}
|
||||
workers.Submit(func() {
|
||||
err := c.params.Worker(ctx, *j)
|
||||
if err != nil {
|
||||
c.q.FailJob(ctx, FailJobParams{
|
||||
ID: job.ID,
|
||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
c.q.CompleteJob(ctx, job.ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
||||
workers := pond.New(params.PoolSize, params.PoolSize)
|
||||
sleep := params.OnEmptySleep
|
||||
|
||||
Reference in New Issue
Block a user