Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
596196cdc0
|
||
| c4b7b8ffe1 | |||
| 3f639fe803 |
25
.github/workflows/go.yml
vendored
25
.github/workflows/go.yml
vendored
@@ -1,25 +0,0 @@
|
|||||||
# This workflow will build a golang project
|
|
||||||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
|
|
||||||
|
|
||||||
name: Go
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [ "main" ]
|
|
||||||
pull_request:
|
|
||||||
branches: [ "main" ]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
|
|
||||||
build:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v3
|
|
||||||
|
|
||||||
- name: Set up Go
|
|
||||||
uses: actions/setup-go@v4
|
|
||||||
with:
|
|
||||||
go-version: '1.22'
|
|
||||||
|
|
||||||
- name: Test
|
|
||||||
run: go test -v ./...
|
|
||||||
@@ -14,14 +14,14 @@ liteq allows to run tens of thousands of jobs per second if needed. It can also
|
|||||||
### Install
|
### Install
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
go get github.com/khepin/liteq
|
go get git.jadud.com/jadudm/liteq
|
||||||
```
|
```
|
||||||
|
|
||||||
### Setup and DB creation
|
### Setup and DB creation
|
||||||
|
|
||||||
```go
|
```go
|
||||||
import (
|
import (
|
||||||
"github.com/khepin/liteq"
|
"git.jadud.com/jadudmliteq"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/khepin/liteq"
|
"git.jadud.com/jadudm/liteq"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -1,4 +1,4 @@
|
|||||||
module github.com/khepin/liteq
|
module git.jadud.com/jadudm/liteq
|
||||||
|
|
||||||
go 1.22.0
|
go 1.22.0
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/khepin/liteq/internal"
|
"git.jadud.com/jadudm/liteq/internal"
|
||||||
"github.com/matryer/is"
|
"github.com/matryer/is"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -101,82 +101,6 @@ type ConsumeParams struct {
|
|||||||
OnEmptySleep time.Duration
|
OnEmptySleep time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConsumerParams[T any] struct {
|
|
||||||
Queue string
|
|
||||||
PoolSize int
|
|
||||||
Worker func(context.Context, T) error
|
|
||||||
VisibilityTimeout int64
|
|
||||||
OnEmptySleep time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type Consumer[T any] struct {
|
|
||||||
q *Queries
|
|
||||||
params ConsumerParams[T]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c Consumer[T]) Consume(ctx context.Context) error {
|
|
||||||
workers := pond.New(c.params.PoolSize, c.params.PoolSize)
|
|
||||||
sleep := c.params.OnEmptySleep
|
|
||||||
if sleep == 0 {
|
|
||||||
sleep = 1 * time.Second
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
// If the context gets canceled for example, stop consuming
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.params.VisibilityTimeout > 0 {
|
|
||||||
_, err := c.q.ResetJobs(ctx, ResetJobsParams{
|
|
||||||
Queue: c.params.Queue,
|
|
||||||
ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error resetting jobs: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{
|
|
||||||
Queue: c.params.Queue,
|
|
||||||
Count: int64(c.params.PoolSize),
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error grabbing jobs: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(jobs) == 0 {
|
|
||||||
time.Sleep(sleep)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, job := range jobs {
|
|
||||||
j := new(T)
|
|
||||||
err := json.Unmarshal([]byte(job.Job), j)
|
|
||||||
if err != nil {
|
|
||||||
c.q.FailJob(ctx, FailJobParams{
|
|
||||||
ID: job.ID,
|
|
||||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
workers.Submit(func() {
|
|
||||||
err := c.params.Worker(ctx, *j)
|
|
||||||
if err != nil {
|
|
||||||
c.q.FailJob(ctx, FailJobParams{
|
|
||||||
ID: job.ID,
|
|
||||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.q.CompleteJob(ctx, job.ID)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
||||||
workers := pond.New(params.PoolSize, params.PoolSize)
|
workers := pond.New(params.PoolSize, params.PoolSize)
|
||||||
sleep := params.OnEmptySleep
|
sleep := params.OnEmptySleep
|
||||||
|
|||||||
Reference in New Issue
Block a user