Prasanth Janardhanan

Background task processor in Go with persistence support using BadgerDB

Goroutines can run tasks concurrently. However, for most practical scenarios, you have to keep track of the status of those tasks. In case the process exited, killed, or power cycled, a mechanism should restart the unfinished tasks. For example, imagine you moved order status emailing to a goroutine. If the process was terminated or restarted we have no way to keep track of the tasks that were in progress. A background task manager can keep track of the task in progress, retry if required, and also manage scheduled and recurring tasks. qUP (called queue up) is a background task manager. It persists the task status and tracking information to the disc so that restarting the process does not lose the tasks that were in progress. BadgerDB is used for persistence.

Simple Usage

Startup and keep ready worker routines

jq := qup.NewJobQueue().Workers(10)
//Bring up the worker pool
jq.Start()

Register task executers:

jq.Register(&OrderEmail{}, sender)

Task executor must implement an interface Execute

func (this *OrderSender) Execute(t interface{}) error {
    orderEmail,ok :=  t.(*OrderEmail)
    
    sendOrderEmail(orderEmail)
    
}

Push tasks to the queue:

//Add a delayed job
j := qup.NewJob(&OrderEmail{orderID}).After(5 * time.Minute)
jq.QueueUp(j)

The OrderEmail task is set to run after 5 minutes. Suppose the process got terminated after 2 minutes. The email will be sent when the process is restarted. This wouldn’t happen if say, you had implemented the task as a goroutine using time.AfterFunc().

Asynchronous task execution

RESTful API endpoints are expected to return immediately. The client would have a time-out so that it does not wait endlessly. So you have to delegate the long processes to a background task manager.

j := qup.NewJob(&SendNotification{UserID: id, Email:email})
jq.QueueUp(j)

Here we initialize the task information and push it to the Job Queue and returns immediately - without waiting for the task to finish. qUp executes the task in the background. The task originator can check the status of the task later.

Recurring tasks

Most applications require running some tasks every x number of minutes. For example, cleaning temporary tables, temporary files, indexing to make searches faster. The background task processor can help in such cases also. Here is an example:

j := qup.NewJob(&CleanupTables{}).Every(1 * time.Hour)
jq.QueueUp(j)

Inner Workings of qUp

On starting up, qUp starts up N number of worker routines.

func (d *JobQueue) Start() error {
    
    for i := 0; i < d.NumWorkers; i++ {
		go d.worker(i + 1)
	}
    
    return nil
}

The worker routines wait for a task to arrive.

func (d *JobQueue) worker(wid int) {
	d.log.Logf("Worker %d starting\n", wid)
	d.wg.Add(1)
	defer d.wg.Done()
	for {
		select {
		case jid := <-d.jobs:
			d.runTask(wid, jid)
		case <-d.ticker.C:
			d.periodicChecks()
		case <-d.close:
			d.log.Logf("Workder %d stopping", wid)
			return
		}
	}
}

The routine that gets the task through the jobs channel executes it calling d.runTask()

Adding a job to the queue

To add a new job, qUp first adds the job to a “ready” virtual table. Then it sends the signal through the jobs channel.

func (d *JobQueue) addJobToReadyQueue(job *Job) error {
	jid, err := d.store.Table("jobqueue.ready").Insert(job)
	if err != nil {
		d.log.Logf("Error adding job to ready queue %v", err)
		return err
	}

	d.log.Logf("Signalling taskID %s Job ID is  %v", job.Task.GetTaskID(), jid)
	d.signalNewJob(jid)
	return nil
}
func (d *JobQueue) signalNewJob(jid string) bool {
	d.log.Logf("signalNewJob %s ", jid)
	//This is to keep track when the job signalled ready
	// We will signal again if job is not picked even after some time
	d.markJobReady(jid)
	//check whether already shutting down and close channel is signalled
	select {
	case <-d.close:
		return false
	default:
		d.log.Logf("signalling job %s ", jid)
		select {
		case d.jobs <- jid:
		default:
			//The queue is full. job got dropped
			d.log.Logf("signal dropped %s ", jid)
			d.markJobDropped(jid)
		}

	}
	return true
}

Queue full!

There is one thing that can happen while writing to the job channel. All the worker routines are busy. There is no one to pick the call. In this case, we will mark the job as dropped. It doesn’t mean we have given up on the job though.

Remember periodicChecks() that keeps running in the main worker routine loop? In the periodicChecks() we will try to pick any dropped jobs.

Shutdown as soon as possible

If a shutdown is triggered by signaling the close channel, the queue will try to stop immediately. So the check for the close channel is placed at every possible point.