1 Commits

Author SHA1 Message Date
Seb
91ddecb739 generics maybe 2024-12-14 21:32:53 -08:00
7 changed files with 107 additions and 6 deletions

25
.github/workflows/go.yml vendored Normal file
View File

@@ -0,0 +1,25 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
name: Go
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.22'
- name: Test
run: go test -v ./...

View File

@@ -14,14 +14,14 @@ liteq allows to run tens of thousands of jobs per second if needed. It can also
### Install
```sh
go get git.jadud.com/jadudm/liteq
go get github.com/khepin/liteq
```
### Setup and DB creation
```go
import (
"git.jadud.com/jadudmliteq"
"github.com/khepin/liteq"
_ "github.com/mattn/go-sqlite3"
)

View File

@@ -7,7 +7,7 @@ import (
"math/rand"
"time"
"git.jadud.com/jadudm/liteq"
"github.com/khepin/liteq"
_ "github.com/mattn/go-sqlite3"
)

2
go.mod
View File

@@ -1,4 +1,4 @@
module git.jadud.com/jadudm/liteq
module github.com/khepin/liteq
go 1.22.0

View File

@@ -9,7 +9,7 @@ import (
"testing"
"time"
"git.jadud.com/jadudm/liteq/internal"
"github.com/khepin/liteq/internal"
"github.com/matryer/is"
_ "github.com/mattn/go-sqlite3"
)

View File

@@ -101,6 +101,82 @@ type ConsumeParams struct {
OnEmptySleep time.Duration
}
type ConsumerParams[T any] struct {
Queue string
PoolSize int
Worker func(context.Context, T) error
VisibilityTimeout int64
OnEmptySleep time.Duration
}
type Consumer[T any] struct {
q *Queries
params ConsumerParams[T]
}
func (c Consumer[T]) Consume(ctx context.Context) error {
workers := pond.New(c.params.PoolSize, c.params.PoolSize)
sleep := c.params.OnEmptySleep
if sleep == 0 {
sleep = 1 * time.Second
}
for {
// If the context gets canceled for example, stop consuming
if ctx.Err() != nil {
return nil
}
if c.params.VisibilityTimeout > 0 {
_, err := c.q.ResetJobs(ctx, ResetJobsParams{
Queue: c.params.Queue,
ConsumerFetchedAt: time.Now().Unix() - c.params.VisibilityTimeout,
})
if err != nil {
return fmt.Errorf("error resetting jobs: %w", err)
}
}
jobs, err := c.q.GrabJobs(ctx, GrabJobsParams{
Queue: c.params.Queue,
Count: int64(c.params.PoolSize),
})
if err != nil {
return fmt.Errorf("error grabbing jobs: %w", err)
}
if len(jobs) == 0 {
time.Sleep(sleep)
continue
}
for _, job := range jobs {
j := new(T)
err := json.Unmarshal([]byte(job.Job), j)
if err != nil {
c.q.FailJob(ctx, FailJobParams{
ID: job.ID,
Errors: ErrorList(append(job.Errors, err.Error())),
})
}
workers.Submit(func() {
err := c.params.Worker(ctx, *j)
if err != nil {
c.q.FailJob(ctx, FailJobParams{
ID: job.ID,
Errors: ErrorList(append(job.Errors, err.Error())),
})
return
}
c.q.CompleteJob(ctx, job.ID)
})
}
}
}
func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
workers := pond.New(params.PoolSize, params.PoolSize)
sleep := params.OnEmptySleep

View File

@@ -4,7 +4,7 @@ import (
"context"
"database/sql"
"git.jadud.com/jadudm/liteq/internal"
"github.com/khepin/liteq/internal"
)
// Creates the db file with the tables and indexes