Background task processor in Go with persistence support using BadgerDB
- 🏷 badgerdb
- 🏷 golang
- 🏷 task queue
Goroutines can run tasks concurrently. However, for most practical scenarios, you have to keep track of the status of those tasks. In case the process exited, killed, or power cycled, a mechanism should restart the unfinished tasks. For example, imagine you moved order status emailing to a goroutine. If the process was terminated or restarted we have no way to keep track of the tasks that were in progress. A background task manager can keep track of the task in progress, retry if required, and also manage scheduled and recurring tasks. qUP (called queue up) is a background task manager. It persists the task status and tracking information to the disc so that restarting the process does not lose the tasks that were in progress. BadgerDB is used for persistence.
Simple Usage
Startup and keep ready worker routines
jq := qup.NewJobQueue().Workers(10)
//Bring up the worker pool
jq.Start()
Register task executers:
jq.Register(&OrderEmail{}, sender)
Task executor must implement an interface Execute
func (this *OrderSender) Execute(t interface{}) error {
orderEmail,ok := t.(*OrderEmail)
sendOrderEmail(orderEmail)
}
Push tasks to the queue:
//Add a delayed job
j := qup.NewJob(&OrderEmail{orderID}).After(5 * time.Minute)
jq.QueueUp(j)
The OrderEmail task is set to run after 5 minutes. Suppose the process got terminated after 2 minutes. The email will be sent when the process is restarted. This wouldn’t happen if say, you had implemented the task as a goroutine using time.AfterFunc().
Asynchronous task execution
RESTful API endpoints are expected to return immediately. The client would have a time-out so that it does not wait endlessly. So you have to delegate the long processes to a background task manager.
j := qup.NewJob(&SendNotification{UserID: id, Email:email})
jq.QueueUp(j)
Here we initialize the task information and push it to the Job Queue and returns immediately - without waiting for the task to finish. qUp executes the task in the background. The task originator can check the status of the task later.
Recurring tasks
Most applications require running some tasks every x number of minutes. For example, cleaning temporary tables, temporary files, indexing to make searches faster. The background task processor can help in such cases also. Here is an example:
j := qup.NewJob(&CleanupTables{}).Every(1 * time.Hour)
jq.QueueUp(j)
Inner Workings of qUp
On starting up, qUp starts up N number of worker routines.
func (d *JobQueue) Start() error {
for i := 0; i < d.NumWorkers; i++ {
go d.worker(i + 1)
}
return nil
}
The worker routines wait for a task to arrive.
func (d *JobQueue) worker(wid int) {
d.log.Logf("Worker %d starting\n", wid)
d.wg.Add(1)
defer d.wg.Done()
for {
select {
case jid := <-d.jobs:
d.runTask(wid, jid)
case <-d.ticker.C:
d.periodicChecks()
case <-d.close:
d.log.Logf("Workder %d stopping", wid)
return
}
}
}
The routine that gets the task through the jobs channel executes it calling d.runTask()
Adding a job to the queue
To add a new job, qUp first adds the job to a “ready” virtual table. Then it sends the signal through the jobs channel.
func (d *JobQueue) addJobToReadyQueue(job *Job) error {
jid, err := d.store.Table("jobqueue.ready").Insert(job)
if err != nil {
d.log.Logf("Error adding job to ready queue %v", err)
return err
}
d.log.Logf("Signalling taskID %s Job ID is %v", job.Task.GetTaskID(), jid)
d.signalNewJob(jid)
return nil
}
func (d *JobQueue) signalNewJob(jid string) bool {
d.log.Logf("signalNewJob %s ", jid)
//This is to keep track when the job signalled ready
// We will signal again if job is not picked even after some time
d.markJobReady(jid)
//check whether already shutting down and close channel is signalled
select {
case <-d.close:
return false
default:
d.log.Logf("signalling job %s ", jid)
select {
case d.jobs <- jid:
default:
//The queue is full. job got dropped
d.log.Logf("signal dropped %s ", jid)
d.markJobDropped(jid)
}
}
return true
}
Queue full!
There is one thing that can happen while writing to the job channel. All the worker routines are busy. There is no one to pick the call. In this case, we will mark the job as dropped. It doesn’t mean we have given up on the job though.
Remember periodicChecks() that keeps running in the main worker routine loop? In the periodicChecks() we will try to pick any dropped jobs.
Shutdown as soon as possible
If a shutdown is triggered by signaling the close channel, the queue will try to stop immediately. So the check for the close channel is placed at every possible point.