Compare commits
3 Commits
23923b7be4
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f53639af2f
|
||
|
|
06cdc68be7
|
||
|
|
d212a354fe
|
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
*.db*
|
||||||
|
*.sqlite*
|
||||||
4
Makefile
4
Makefile
@@ -4,3 +4,7 @@ test: generate
|
|||||||
generate:
|
generate:
|
||||||
cd internal/domain64 ; make generate
|
cd internal/domain64 ; make generate
|
||||||
cd internal/liteq ; sqlc generate
|
cd internal/liteq ; sqlc generate
|
||||||
|
cd internal/liteq ; make schema-const
|
||||||
|
|
||||||
|
run: generate
|
||||||
|
go run cmd/api/*.go
|
||||||
@@ -32,3 +32,7 @@ The use-case is (essentially) single-user.
|
|||||||
* https://github.com/jpillora/go-tld
|
* https://github.com/jpillora/go-tld
|
||||||
* https://pkg.go.dev/modernc.org/sqlite
|
* https://pkg.go.dev/modernc.org/sqlite
|
||||||
* https://github.com/sqlc-dev/sqlc
|
* https://github.com/sqlc-dev/sqlc
|
||||||
|
* https://github.com/spf13/afero
|
||||||
|
* https://github.com/tidwall/gjson
|
||||||
|
|
||||||
|
//go:generate stringer -type=UpdateFrequency
|
||||||
|
|||||||
78
cmd/api/init.go
Normal file
78
cmd/api/init.go
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/domain64"
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/engine"
|
||||||
|
liteq "git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/types"
|
||||||
|
_ "modernc.org/sqlite"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupDB() *sql.DB {
|
||||||
|
|
||||||
|
// FIXME: This path needs to come from the env.
|
||||||
|
db, err := sql.Open("sqlite", "grosbeak.sqlite")
|
||||||
|
db.SetMaxOpenConns(1)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return db
|
||||||
|
}
|
||||||
|
|
||||||
|
func runQ(queue *liteq.JobQueue, queueName string, worker types.QueueWorker) {
|
||||||
|
for {
|
||||||
|
log.Printf("runQ %s\n", queueName)
|
||||||
|
|
||||||
|
err := queue.Consume(context.Background(), liteq.ConsumeParams{
|
||||||
|
Queue: queueName,
|
||||||
|
PoolSize: 3,
|
||||||
|
VisibilityTimeout: 20,
|
||||||
|
Worker: worker,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("runQ/%s: %s", queueName, err.Error())
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupLiteQ(db *sql.DB, d64m *domain64.Domain64Map) *liteq.JobQueue {
|
||||||
|
liteq.Setup(db)
|
||||||
|
queue := liteq.New(db)
|
||||||
|
// The queue processes as long as this context is not cancelled.
|
||||||
|
|
||||||
|
log.Println("setting up worker queues...")
|
||||||
|
queues := []struct {
|
||||||
|
queueName string
|
||||||
|
worker types.QueueWorker
|
||||||
|
}{
|
||||||
|
{"fetch", engine.Fetch(d64m)},
|
||||||
|
}
|
||||||
|
for _, q := range queues {
|
||||||
|
go runQ(queue, q.queueName, q.worker)
|
||||||
|
}
|
||||||
|
return queue
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupDomain64Map(db *sql.DB) *domain64.Domain64Map {
|
||||||
|
d64m, err := domain64.NewDomain64Map()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("newdomain64map err: %s", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
d64m.Setup(db)
|
||||||
|
return d64m
|
||||||
|
}
|
||||||
@@ -1,84 +1,40 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_ "modernc.org/sqlite"
|
_ "modernc.org/sqlite"
|
||||||
|
|
||||||
liteq "git.jadud.com/grosbeak/internal/liteq"
|
"git.jadud.com/jadudm/grosbeak/internal/engine"
|
||||||
|
base "git.jadud.com/jadudm/grosbeak/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Fetch(ctx context.Context, job *liteq.Job) error {
|
|
||||||
n := rand.Intn(50)
|
|
||||||
|
|
||||||
time.Sleep(time.Duration(n) * time.Millisecond)
|
|
||||||
log.Println("Fetching", job.Job)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func runWorkers(queue *liteq.JobQueue) {
|
|
||||||
go queue.Consume(context.Background(), liteq.ConsumeParams{
|
|
||||||
Queue: "fetch",
|
|
||||||
PoolSize: 3,
|
|
||||||
VisibilityTimeout: 200000,
|
|
||||||
Worker: Fetch,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func Entre(queue *liteq.JobQueue, chUrl <-chan string) {
|
|
||||||
ctx := context.Background()
|
|
||||||
for {
|
|
||||||
url := <-chUrl
|
|
||||||
log.Println("Entre", url)
|
|
||||||
// Don't duplicate jobs on the same day of the year.
|
|
||||||
n := time.Now()
|
|
||||||
queue.QueueJob(ctx, liteq.QueueJobParams{
|
|
||||||
Queue: "fetch",
|
|
||||||
// This only works for things in the `queued` state
|
|
||||||
DedupingKey: liteq.IgnoreDuplicate(fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay())),
|
|
||||||
Job: url,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// Don't let `main()` exit
|
// Don't let `main()` exit
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
// FIXME: This path needs to come from the env.
|
db := setupDB()
|
||||||
liteqDB, err := sql.Open("sqlite", "liteq.db")
|
d64m := setupDomain64Map(db)
|
||||||
if err != nil {
|
queue := setupLiteQ(db, d64m)
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
// Enqueue URLs
|
||||||
|
urls := []struct {
|
||||||
|
url string
|
||||||
|
uf base.UpdateFrequency
|
||||||
|
}{
|
||||||
|
{"https://jadud.com/", base.UPDATE_DAILY},
|
||||||
|
{"https://berea.us/", base.UPDATE_WEEKLY},
|
||||||
}
|
}
|
||||||
liteq.Setup(liteqDB)
|
|
||||||
queue := liteq.New(liteqDB)
|
|
||||||
// The queue processes as long as this context is not cancelled.
|
|
||||||
|
|
||||||
log.Println("Setting up workers...")
|
for _, u := range urls {
|
||||||
go runWorkers(queue)
|
engine.Entre(queue, &base.EntreJob{URL: u.url, UpdateFrequency: u.uf})
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
log.Println("Building network...")
|
|
||||||
// Create the network for the search engine.
|
|
||||||
chUrl := make(chan string)
|
|
||||||
|
|
||||||
go Entre(queue, chUrl)
|
|
||||||
for {
|
|
||||||
chUrl <- "https://jadud.com/"
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
chUrl <- "https://berea.us/"
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't exit.
|
// Don't exit.
|
||||||
log.Println("Waiting...")
|
log.Println("Waiting for godot...")
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|||||||
5
go.mod
5
go.mod
@@ -1,4 +1,4 @@
|
|||||||
module git.jadud.com/grosbeak
|
module git.jadud.com/jadudm/grosbeak
|
||||||
|
|
||||||
go 1.24.0
|
go 1.24.0
|
||||||
|
|
||||||
@@ -8,6 +8,7 @@ require (
|
|||||||
github.com/alitto/pond v1.9.2
|
github.com/alitto/pond v1.9.2
|
||||||
github.com/jpillora/go-tld v1.2.1
|
github.com/jpillora/go-tld v1.2.1
|
||||||
github.com/matryer/is v1.4.1
|
github.com/matryer/is v1.4.1
|
||||||
|
github.com/tidwall/gjson v1.18.0
|
||||||
modernc.org/sqlite v1.40.1
|
modernc.org/sqlite v1.40.1
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,6 +18,8 @@ require (
|
|||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||||
|
github.com/tidwall/match v1.2.0 // indirect
|
||||||
|
github.com/tidwall/pretty v1.2.1 // indirect
|
||||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect
|
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 // indirect
|
||||||
golang.org/x/net v0.47.0 // indirect
|
golang.org/x/net v0.47.0 // indirect
|
||||||
golang.org/x/sys v0.38.0 // indirect
|
golang.org/x/sys v0.38.0 // indirect
|
||||||
|
|||||||
8
go.sum
8
go.sum
@@ -18,6 +18,14 @@ github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOF
|
|||||||
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||||
|
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||||
|
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||||
|
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||||
|
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
|
||||||
|
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||||
|
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||||
|
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||||
|
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
|
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
|
||||||
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
|
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
|
||||||
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
|
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
|
||||||
|
|||||||
@@ -3,30 +3,43 @@ package domain64
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
_ "embed"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
sqlc "git.jadud.com/grosbeak/internal/domain64/sqlc"
|
sqlc "git.jadud.com/jadudm/grosbeak/internal/domain64/sqlc"
|
||||||
"github.com/jpillora/go-tld"
|
"github.com/jpillora/go-tld"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//go:embed sqlc/schema.sql
|
||||||
|
var ddl string
|
||||||
|
|
||||||
type Domain64Map struct {
|
type Domain64Map struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
m map[string]int
|
m map[string]int
|
||||||
DB *sql.DB
|
DB *sql.DB
|
||||||
Flushed bool
|
Queries *sqlc.Queries
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDomain64Map(db *sql.DB) (*Domain64Map, error) {
|
func NewDomain64Map() (*Domain64Map, error) {
|
||||||
d64m := &Domain64Map{}
|
d64m := &Domain64Map{}
|
||||||
d64m.DB = db
|
d64m.DB = nil
|
||||||
d64m.Flushed = true
|
d64m.Queries = nil
|
||||||
// Init the tables if a DB pointer is passed in.
|
|
||||||
return d64m, nil
|
return d64m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d64m *Domain64Map) Setup(db *sql.DB) {
|
||||||
|
log.Printf("creating domain64 tables\n")
|
||||||
|
if _, err := db.ExecContext(context.Background(), ddl); err != nil {
|
||||||
|
log.Printf("setup err: %s", err.Error())
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
d64m.DB = db
|
||||||
|
d64m.Queries = sqlc.New(db)
|
||||||
|
}
|
||||||
|
|
||||||
func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string {
|
func (d64m *Domain64Map) URLToRFQDN(url *tld.URL) string {
|
||||||
s := ""
|
s := ""
|
||||||
if url.TLD != "" {
|
if url.TLD != "" {
|
||||||
@@ -140,21 +153,21 @@ func (d64m *Domain64Map) URLToDomain64(url *tld.URL) (*Domain64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
d64 := &Domain64{}
|
d64 := &Domain64{}
|
||||||
queries := sqlc.New(d64m.DB)
|
|
||||||
// These manipulate both the DB and the Domain64 struct
|
// These manipulate both the DB and the Domain64 struct
|
||||||
err := _get_or_insert_tld(queries, d64, url)
|
err := _get_or_insert_tld(d64m.Queries, d64, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = _get_or_insert_domain(queries, d64, url)
|
err = _get_or_insert_domain(d64m.Queries, d64, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = _get_or_insert_subdomain(queries, d64, url)
|
err = _get_or_insert_subdomain(d64m.Queries, d64, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = _get_or_insert_path(queries, d64, url)
|
err = _get_or_insert_path(d64m.Queries, d64, url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,16 +3,12 @@ package domain64
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
_ "embed"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/jpillora/go-tld"
|
"github.com/jpillora/go-tld"
|
||||||
_ "modernc.org/sqlite"
|
_ "modernc.org/sqlite"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed sqlc/schema.sql
|
|
||||||
var ddl string
|
|
||||||
|
|
||||||
func setup() *sql.DB {
|
func setup() *sql.DB {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
@@ -32,7 +28,8 @@ func setup() *sql.DB {
|
|||||||
|
|
||||||
func TestNewDomain64Map(t *testing.T) {
|
func TestNewDomain64Map(t *testing.T) {
|
||||||
db := setup()
|
db := setup()
|
||||||
M, err := NewDomain64Map(db)
|
M, err := NewDomain64Map()
|
||||||
|
M.Setup(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO
|
// TODO
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@@ -43,7 +40,8 @@ func TestNewDomain64Map(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestURLToRFQDN(t *testing.T) {
|
func TestURLToRFQDN(t *testing.T) {
|
||||||
M, _ := NewDomain64Map(nil)
|
M, _ := NewDomain64Map()
|
||||||
|
// M.Setup(db)
|
||||||
simple, _ := tld.Parse("https://jadud.com/")
|
simple, _ := tld.Parse("https://jadud.com/")
|
||||||
rfqdn := M.URLToRFQDN(simple)
|
rfqdn := M.URLToRFQDN(simple)
|
||||||
if rfqdn != "com.jadud/" {
|
if rfqdn != "com.jadud/" {
|
||||||
@@ -53,7 +51,8 @@ func TestURLToRFQDN(t *testing.T) {
|
|||||||
|
|
||||||
func TestURLToDomain64(t *testing.T) {
|
func TestURLToDomain64(t *testing.T) {
|
||||||
db := setup()
|
db := setup()
|
||||||
M, _ := NewDomain64Map(db)
|
M, _ := NewDomain64Map()
|
||||||
|
M.Setup(db)
|
||||||
simple, _ := tld.Parse("https://jadud.com/")
|
simple, _ := tld.Parse("https://jadud.com/")
|
||||||
d64, _ := M.URLToDomain64(simple)
|
d64, _ := M.URLToDomain64(simple)
|
||||||
if d64.TLD != 1 {
|
if d64.TLD != 1 {
|
||||||
@@ -67,7 +66,8 @@ func TestURLToDomain64(t *testing.T) {
|
|||||||
|
|
||||||
func TestURLToDomain64_02(t *testing.T) {
|
func TestURLToDomain64_02(t *testing.T) {
|
||||||
db := setup()
|
db := setup()
|
||||||
M, _ := NewDomain64Map(db)
|
M, _ := NewDomain64Map()
|
||||||
|
M.Setup(db)
|
||||||
simple1, _ := tld.Parse("https://jadud.com/")
|
simple1, _ := tld.Parse("https://jadud.com/")
|
||||||
simple2, _ := tld.Parse("https://another.com/")
|
simple2, _ := tld.Parse("https://another.com/")
|
||||||
d64_1, _ := M.URLToDomain64(simple1)
|
d64_1, _ := M.URLToDomain64(simple1)
|
||||||
@@ -93,7 +93,8 @@ func TestURLToDomain64_02(t *testing.T) {
|
|||||||
|
|
||||||
func TestURLToDomain64_03(t *testing.T) {
|
func TestURLToDomain64_03(t *testing.T) {
|
||||||
db := setup()
|
db := setup()
|
||||||
M, _ := NewDomain64Map(db)
|
M, _ := NewDomain64Map()
|
||||||
|
M.Setup(db)
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
url string
|
url string
|
||||||
tld int64
|
tld int64
|
||||||
|
|||||||
39
internal/engine/entre.go
Normal file
39
internal/engine/entre.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||||
|
base "git.jadud.com/jadudm/grosbeak/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Entre(queue *liteq.JobQueue, ej *base.EntreJob) error {
|
||||||
|
n := time.Now()
|
||||||
|
|
||||||
|
var ignore_tag string
|
||||||
|
|
||||||
|
switch ej.UpdateFrequency {
|
||||||
|
case base.UPDATE_DAILY:
|
||||||
|
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
|
||||||
|
case base.UPDATE_WEEKLY:
|
||||||
|
ignore_tag = fmt.Sprintf("%s|y%d-w%d", ej.URL, n.Year(), n.YearDay()/7)
|
||||||
|
case base.UPDATE_MONTHLY:
|
||||||
|
ignore_tag = fmt.Sprintf("%s|y%d-m%d", ej.URL, n.Year(), n.Month())
|
||||||
|
default:
|
||||||
|
ignore_tag = fmt.Sprintf("%s|y%d-yd%d", ej.URL, n.Year(), n.YearDay())
|
||||||
|
}
|
||||||
|
|
||||||
|
err := queue.QueueJob(context.Background(), liteq.QueueJobParams{
|
||||||
|
Queue: "fetch",
|
||||||
|
// This only works for things in the `queued` state
|
||||||
|
DedupingKey: liteq.IgnoreDuplicate(ignore_tag),
|
||||||
|
Job: ej.AsJson(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("entre err %s: %s\n", ej.URL, err.Error())
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
34
internal/engine/fetch.go
Normal file
34
internal/engine/fetch.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/domain64"
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/types"
|
||||||
|
"github.com/jpillora/go-tld"
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Fetch(d64m *domain64.Domain64Map) types.QueueWorker {
|
||||||
|
_f := func(ctx context.Context, job *liteq.Job) error {
|
||||||
|
url := gjson.Get(job.Job, "url").String()
|
||||||
|
|
||||||
|
turl, err := tld.Parse(url)
|
||||||
|
if err != nil {
|
||||||
|
// If we can't parse it, shut the job down as completed.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
d64, err := d64m.URLToDomain64(turl)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("urltodomain64 err %s: %s\n", url, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("fetching %s 0x%016x\n", url, d64.ToInt64())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return _f
|
||||||
|
}
|
||||||
@@ -20,10 +20,9 @@ VALUES
|
|||||||
unixepoch(),
|
unixepoch(),
|
||||||
?,
|
?,
|
||||||
?
|
?
|
||||||
) ON CONFLICT (deduping_key, job_status)
|
) ON CONFLICT (deduping_key)
|
||||||
WHERE
|
WHERE
|
||||||
deduping_key != ''
|
deduping_key != '' DO NOTHING;
|
||||||
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed') DO NOTHING;
|
|
||||||
|
|
||||||
-- name: doQueueJobReplaceDupe :exec
|
-- name: doQueueJobReplaceDupe :exec
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
|
|||||||
@@ -21,10 +21,9 @@ WHERE
|
|||||||
job_status = 'queued'
|
job_status = 'queued'
|
||||||
OR job_status = 'fetched';
|
OR job_status = 'fetched';
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key, job_status)
|
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
|
||||||
WHERE
|
WHERE
|
||||||
deduping_key != ''
|
deduping_key != '';
|
||||||
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed');
|
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
||||||
WHERE
|
WHERE
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"database/sql/driver"
|
"database/sql/driver"
|
||||||
@@ -110,6 +111,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
|||||||
for {
|
for {
|
||||||
// If the context gets canceled for example, stop consuming
|
// If the context gets canceled for example, stop consuming
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
log.Println("context cancelled in Consume()")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,6 +122,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Println("error resetting jobs", err.Error())
|
||||||
return fmt.Errorf("error resetting jobs: %w", err)
|
return fmt.Errorf("error resetting jobs: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -143,6 +146,7 @@ func (q *Queries) Consume(ctx context.Context, params ConsumeParams) error {
|
|||||||
workers.Submit(func() {
|
workers.Submit(func() {
|
||||||
err := params.Worker(ctx, job)
|
err := params.Worker(ctx, job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Println("worker failled", err.Error())
|
||||||
q.FailJob(ctx, FailJobParams{
|
q.FailJob(ctx, FailJobParams{
|
||||||
ID: job.ID,
|
ID: job.ID,
|
||||||
Errors: ErrorList(append(job.Errors, err.Error())),
|
Errors: ErrorList(append(job.Errors, err.Error())),
|
||||||
|
|||||||
@@ -205,10 +205,9 @@ VALUES
|
|||||||
unixepoch(),
|
unixepoch(),
|
||||||
?,
|
?,
|
||||||
?
|
?
|
||||||
) ON CONFLICT (deduping_key, job_status)
|
) ON CONFLICT (deduping_key)
|
||||||
WHERE
|
WHERE
|
||||||
deduping_key != ''
|
deduping_key != '' DO NOTHING
|
||||||
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed') DO NOTHING
|
|
||||||
`
|
`
|
||||||
|
|
||||||
type doQueueJobIgnoreDupeParams struct {
|
type doQueueJobIgnoreDupeParams struct {
|
||||||
|
|||||||
@@ -25,10 +25,9 @@ WHERE
|
|||||||
job_status = 'queued'
|
job_status = 'queued'
|
||||||
OR job_status = 'fetched';
|
OR job_status = 'fetched';
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key, job_status)
|
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_ignore ON jobs (deduping_key)
|
||||||
WHERE
|
WHERE
|
||||||
deduping_key != ''
|
deduping_key != '';
|
||||||
AND (job_status = 'queued' OR job_status = 'fetched' OR job_status = 'completed');
|
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
CREATE UNIQUE INDEX IF NOT EXISTS dedupe_replace ON jobs (deduping_key, job_status)
|
||||||
WHERE
|
WHERE
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.jadud.com/grosbeak/internal/liteq/internal"
|
"git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
|
||||||
"github.com/matryer/is"
|
"github.com/matryer/is"
|
||||||
_ "modernc.org/sqlite"
|
_ "modernc.org/sqlite"
|
||||||
)
|
)
|
||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
internal "git.jadud.com/grosbeak/internal/liteq/internal"
|
internal "git.jadud.com/jadudm/grosbeak/internal/liteq/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Creates the db file with the tables and indexes
|
// Creates the db file with the tables and indexes
|
||||||
|
|||||||
9
internal/types/base.go
Normal file
9
internal/types/base.go
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.jadud.com/jadudm/grosbeak/internal/liteq"
|
||||||
|
)
|
||||||
|
|
||||||
|
type QueueWorker func(ctx context.Context, job *liteq.Job) error
|
||||||
9
internal/types/constants.go
Normal file
9
internal/types/constants.go
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
type UpdateFrequency string
|
||||||
|
|
||||||
|
const (
|
||||||
|
UPDATE_DAILY UpdateFrequency = "DAILY"
|
||||||
|
UPDATE_WEEKLY UpdateFrequency = "WEEKLY"
|
||||||
|
UPDATE_MONTHLY UpdateFrequency = "MONTHLY"
|
||||||
|
)
|
||||||
22
internal/types/jobs.go
Normal file
22
internal/types/jobs.go
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
|
type Job interface {
|
||||||
|
AsJson() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used by all of the jobs.
|
||||||
|
func _as_json(j Job) string {
|
||||||
|
as_json, _ := json.Marshal(j)
|
||||||
|
return string(as_json)
|
||||||
|
}
|
||||||
|
|
||||||
|
type EntreJob struct {
|
||||||
|
URL string `json:"url"`
|
||||||
|
UpdateFrequency UpdateFrequency `json:"update_frequency"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ej *EntreJob) AsJson() string {
|
||||||
|
return _as_json(ej)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user