Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91ddecb739 |
25
.github/workflows/go.yml
vendored
Normal file
25
.github/workflows/go.yml
vendored
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
# This workflow will build a golang project
|
||||||
|
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
|
||||||
|
|
||||||
|
name: Go
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [ "main" ]
|
||||||
|
pull_request:
|
||||||
|
branches: [ "main" ]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
|
||||||
|
build:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Set up Go
|
||||||
|
uses: actions/setup-go@v4
|
||||||
|
with:
|
||||||
|
go-version: '1.22'
|
||||||
|
|
||||||
|
- name: Test
|
||||||
|
run: go test -v ./...
|
||||||
@@ -14,14 +14,14 @@ liteq allows to run tens of thousands of jobs per second if needed. It can also
|
|||||||
### Install
|
### Install
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
go get git.jadud.com/jadudm/liteq
|
go get github.com/khepin/liteq
|
||||||
```
|
```
|
||||||
|
|
||||||
### Setup and DB creation
|
### Setup and DB creation
|
||||||
|
|
||||||
```go
|
```go
|
||||||
import (
|
import (
|
||||||
"git.jadud.com/jadudmliteq"
|
"github.com/khepin/liteq"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.jadud.com/jadudm/liteq"
|
"github.com/khepin/liteq"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -1,4 +1,4 @@
|
|||||||
module git.jadud.com/jadudm/liteq
|
module github.com/khepin/liteq
|
||||||
|
|
||||||
go 1.22.0
|
go 1.22.0
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.jadud.com/jadudm/liteq/internal"
|
"github.com/khepin/liteq/internal"
|
||||||
"github.com/matryer/is"
|
"github.com/matryer/is"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -101,6 +101,82 @@ type ConsumeParams struct {
|
|||||||
OnEmptySleep time.Duration
|
OnEmptySleep time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ConsumerParams[T any] struct {
|
||||||
|
Queue string
|
||||||
|
PoolSize int
|
||||||
|
Worker func(context.Context, T) error
|
||||||
|
VisibilityTimeout int64
|
||||||
|
OnEmptySleep time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
type Consumer[T any] struct {
|
||||||
|
q *Queries
|
||||||
|
params ConsumerParams[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c Consumer[T]) Consume(ctx context.Context) error {
|
||||||
|
workers := pond.New(c.params.PoolSize, c.params.PoolSize)
|
||||||
|
sleep := c.params.OnEmptySleep
|
||||||
|
if sleep == 0 {
|
||||||
|
sleep = 1 * time.Second
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
// If the context gets canceled for example, stop consuming
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.params.VisibilityTimeout > 0 {
|
||||||
|
_, err := c.q.ResetJobs(ctx, ResetJobsParams{
|
||||||
|
Queue: c.params.Queue,
|
||||||
|
ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error resetting jobs: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{
|
||||||
|
Queue: c.params.Queue,
|
||||||
|
Count: int64(c.params.PoolSize),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error grabbing jobs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(jobs) == 0 {
|
||||||
|
time.Sleep(sleep)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
j := new(T)
|
||||||
|
err := json.Unmarshal([]byte(job.Job), j)
|
||||||
|
if err != nil {
|
||||||
|
c.q.FailJob(ctx, FailJobParams{
|
||||||
|
ID: job.ID,
|
||||||
|
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
workers.Submit(func() {
|
||||||
|
err := c.params.Worker(ctx, *j)
|
||||||
|
if err != nil {
|
||||||
|
c.q.FailJob(ctx, FailJobParams{
|
||||||
|
ID: job.ID,
|
||||||
|
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.q.CompleteJob(ctx, job.ID)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
||||||
workers := pond.New(params.PoolSize, params.PoolSize)
|
workers := pond.New(params.PoolSize, params.PoolSize)
|
||||||
sleep := params.OnEmptySleep
|
sleep := params.OnEmptySleep
|
||||||
|
|||||||
Reference in New Issue
Block a user