A Dead Simple Work Queue Using SQLite
Curious how asynchronous task queues like Celery work? I am too. In this article, I explore how to implement something like Celery using SQLite only. This is what I came up with. The full code can be found here.
I am sure that this implementation has issues. Please leave in the comments any issues you find with it, race conditions, etc. This implementation supports running tasks in tasks.py
and passing the tasks *args
and **kwargs
. It is left as an exercise for the reader to store results (hint, add a new field to the sqlite db).
I am assuming some familiarity with the sqlite3 package in Python.
We need a way to pass tasks with arguments from one python process to another. I am choosing SQLite here because it is a great piece of technology and it is built into the standard library. I create a new database table with only five fields:
id
name
args
kwargs
running
Did you know that sqlite does not require that you add types to your fields, and in fact, it mostly ignores them unless you use the STRICT
directive? Kinda cool, huh? The documentation has a cool page on The Advantages of Flexible Typing, which is worth a read.
On to the implementation, we have three Python packages:
init.py
to initialize the database:
import sqlite3
con = sqlite3.connect("queue.db")
cur = con.cursor()
cur.execute("DROP TABLE queue")
cur.execute("CREATE TABLE IF NOT EXISTS queue(id INTEGER NOT NULL PRIMARY KEY, name, args, kwargs, running)")
con.commit()
pusher.py
to push tasks into the queue:
import json
import sqlite3
con = sqlite3.connect("queue.db")
cur = con.cursor()
name = 'hello'
running = 0
args = ['tom']
kwargs = {"all_caps": True}
args = json.dumps(args)
kwargs = json.dumps(kwargs)
cur.execute("INSERT INTO queue(name, running, args, kwargs) values (?,?,?,?)", (name, running, args, kwargs))
con.commit()
puller.py
to pull tasks and run them in a loop:
import importlib
import json
import sqlite3
import sys
import time
WAIT_TIME = 0.1
con = sqlite3.connect("queue.db")
cur = con.cursor()
# Dynamically import the module name
if len(sys.argv) != 2:
print(f"usage: {sys.argv[0]} tasks_module_name")
sys.exit(1)
task_module_name = sys.argv[1]
module = importlib.import_module(task_module_name)
while True:
try:
con.execute("BEGIN TRANSACTION")
# Get the first row to be put into the queue, ordered by id
cur.execute("SELECT id, name, args, kwargs FROM queue WHERE running = 0 ORDER BY id LIMIT 1")
row = cur.fetchone()
if row is None:
# No row available
# Continue to the next iteration
con.execute("ROLLBACK")
time.sleep(WAIT_TIME)
continue
# Parse out the values of the row
_id, name, args, kwargs = row
# Here we are using JSON to serialize/deserialize the args and kwargs
# since there isn't an sqlite3 data type that they naturally fit in
args = json.loads(args)
kwargs = json.loads(kwargs)
# Let other workers know that we are working on this task
cur.execute("UPDATE queue SET running = 1 WHERE id = ?", (_id,))
if cur.rowcount == 0:
# Another worker beat us to this row
# Rollback and continue
con.execute("ROLLBACK")
time.sleep(WAIT_TIME)
continue
con.commit()
# Finally, run the task
task = getattr(module, name)
task(*args, **kwargs)
# Mark it complete
con.execute("UPDATE queue SET running = 2 WHERE id = ?", (_id,))
con.commit()
except sqlite3.OperationalError:
# Often, we get a database locked error here
con.execute("ROLLBACK")
except Exception as e:
con.execute("ROLLBACK")
print(f"Error processing queue: {e}")
This is what the output of runner puller
looks like:
$ python3 puller.py
usage: puller.py tasks_module_name
$ python3 puller.py tasks
HELLO, TOM
I decided to "serialize" *args
and **kwargs
using the json
library since sqlite does not have native field types for these. I then use getattr
to get the task name in tasks.py
and run it. Finally, I mark the task as running by setting running
to 1 and mark it as ran by setting it to 2.
I hope that this implementation was clear and concise enough to understand. Please let me know if there's more I can do to explain things. There are certainly better queues out there. One implementation I really like is litequeue. When it comes down to it though, passing data from one process to another is not all that difficult. And sqlite allows only one writer at a time, so I think there's less to worry about in the way of race conditions than a db like Postgres. I could be wrong though.
Hopefully you got something out of this article. Please let me know in the comments how I can make this better or other topics that you are interested in!