package main import ( "context" "database/sql" "fmt" "log" "math/rand" "os" "sync" "time" _ "modernc.org/sqlite" liteq "git.jadud.com/grosbeak/internal/liteq" ) func Fetch(ctx context.Context, job *liteq.Job) error { n := rand.Intn(50) time.Sleep(time.Duration(n) * time.Millisecond) log.Println("Fetching", job.Job) return nil } func runWorkers(queue *liteq.JobQueue) { go queue.Consume(context.Background(), liteq.ConsumeParams{ Queue: "fetch", PoolSize: 3, VisibilityTimeout: 200000, Worker: Fetch, }) } func Entre(queue *liteq.JobQueue, chUrl <-chan string) { ctx := context.Background() for { url := <-chUrl log.Println("Entre", url) // Don't duplicate jobs on the same day of the year. n := time.Now() queue.QueueJob(ctx, liteq.QueueJobParams{ Queue: "fetch", // This only works for things in the `queued` state DedupingKey: liteq.IgnoreDuplicate(fmt.Sprintf("%s:%d:%d", url, n.Year(), n.YearDay())), Job: url, }) } } func main() { // Don't let `main()` exit wg := &sync.WaitGroup{} wg.Add(1) // FIXME: This path needs to come from the env. liteqDB, err := sql.Open("sqlite", "liteq.db") if err != nil { fmt.Println(err) os.Exit(1) } liteq.Setup(liteqDB) queue := liteq.New(liteqDB) // The queue processes as long as this context is not cancelled. log.Println("Setting up workers...") go runWorkers(queue) log.Println("Building network...") // Create the network for the search engine. chUrl := make(chan string) go Entre(queue, chUrl) for { chUrl <- "https://jadud.com/" time.Sleep(2 * time.Second) chUrl <- "https://berea.us/" time.Sleep(2 * time.Second) } // Don't exit. log.Println("Waiting...") wg.Wait() }