Skip to content

Instantly share code, notes, and snippets.

@prabirshrestha
Created January 17, 2025 07:06
Show Gist options
  • Save prabirshrestha/d9868e1180140003427514b04f3c2f39 to your computer and use it in GitHub Desktop.
Save prabirshrestha/d9868e1180140003427514b04f3c2f39 to your computer and use it in GitHub Desktop.
simple sqlite queue in rust created by Claude
[package]
name = "sqlite-queue"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.39"
rusqlite = "0.32.1"
serde = { version = "1.0.217", features = ["serde_derive"] }
serde_json = "1.0.135"
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["full"] }
use rusqlite::{params, Connection, OptionalExtension, Result as SqlResult};
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum QueueError {
#[error("Database error: {0}")]
Database(#[from] rusqlite::Error),
#[error("Job not found: {0}")]
JobNotFound(i64),
#[error("Job already claimed: {0}")]
JobAlreadyClaimed(i64),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Job {
pub id: i64,
pub job_type: String,
pub payload: serde_json::Value,
pub status: JobStatus,
pub expire_time: i64,
pub attempts: i32,
pub last_error: Option<String>,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum JobStatus {
Pending,
Processing,
Completed,
Failed,
}
pub struct Queue {
conn: Connection,
}
impl Queue {
pub fn new(db_path: &str) -> SqlResult<Self> {
let conn = Connection::open(db_path)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
job_type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT NOT NULL,
expire_time INTEGER NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)",
[],
)?;
Ok(Queue { conn })
}
pub fn enqueue_job<T: Serialize>(
&self,
job_type: &str,
payload: &T,
) -> Result<i64, QueueError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let payload_json = serde_json::to_string(payload)?;
let id = self.conn.execute(
"INSERT INTO jobs (
job_type, payload, status, expire_time, attempts,
created_at, updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![
job_type,
payload_json,
"Pending",
0, // initial expire_time
0, // initial attempts
now,
now,
],
)?;
Ok(id as i64)
}
pub fn claim_job(&self) -> Result<Option<Job>, QueueError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let expire_time = now + 3600; // 1 hour lease
let mut stmt = self.conn.prepare(
"UPDATE jobs
SET status = 'Processing',
expire_time = ?1,
attempts = attempts + 1,
updated_at = ?2
WHERE id IN (
SELECT id FROM jobs
WHERE status = 'Pending'
OR (status = 'Processing' AND expire_time < ?3)
LIMIT 1
)
RETURNING *",
)?;
let job = stmt
.query_row(params![expire_time, now, now], |row| {
Ok(Job {
id: row.get(0)?,
job_type: row.get(1)?,
payload: serde_json::from_str(&row.get::<_, String>(2)?).unwrap(),
status: JobStatus::Processing,
expire_time: row.get(4)?,
attempts: row.get(5)?,
last_error: row.get(6)?,
created_at: row.get(7)?,
updated_at: row.get(8)?,
})
})
.optional()?;
Ok(job)
}
pub fn complete_job(&self, job_id: i64) -> Result<(), QueueError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let rows = self.conn.execute(
"UPDATE jobs
SET status = 'Completed',
updated_at = ?1
WHERE id = ?2 AND status = 'Processing'",
params![now, job_id],
)?;
if rows == 0 {
return Err(QueueError::JobNotFound(job_id));
}
Ok(())
}
pub fn fail_job(&self, job_id: i64, error: &str) -> Result<(), QueueError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let rows = self.conn.execute(
"UPDATE jobs
SET status = 'Failed',
last_error = ?1,
updated_at = ?2
WHERE id = ?3 AND status = 'Processing'",
params![error, now, job_id],
)?;
if rows == 0 {
return Err(QueueError::JobNotFound(job_id));
}
Ok(())
}
}
// Example usage
fn main() -> Result<(), QueueError> {
let queue = Queue::new("jobs.db")?;
// Enqueue a job
let payload = serde_json::json!({
"email": "[email protected]",
"template": "welcome"
});
let job_id = queue.enqueue_job("send_email", &payload)?;
println!("Enqueued job: {}", job_id);
// Enqueue a job
let payload = serde_json::json!({
"email": "[email protected]",
"template": "welcome"
});
let job_id = queue.enqueue_job("send_email", &payload)?;
println!("Enqueued job: {}", job_id);
println!("-----------------------------------");
// Process jobs
loop {
if let Some(job) = queue.claim_job()? {
println!("Processing job: {:?}", job);
// Simulate processing
match process_job(&job) {
Ok(_) => queue.complete_job(job.id)?,
Err(e) => queue.fail_job(job.id, &e.to_string())?,
}
} else {
// Sleep for a while before checking for more jobs
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
}
// Example job processor
fn process_job(job: &Job) -> Result<(), Box<dyn std::error::Error>> {
match job.job_type.as_str() {
"send_email" => {
println!("Sending email with payload: {:?}", job.payload);
Ok(())
}
_ => Err("Unknown job type".into()),
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment