Created
January 17, 2025 07:06
-
-
Save prabirshrestha/d9868e1180140003427514b04f3c2f39 to your computer and use it in GitHub Desktop.
simple sqlite queue in rust created by Claude
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "sqlite-queue" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
chrono = "0.4.39" | |
rusqlite = "0.32.1" | |
serde = { version = "1.0.217", features = ["serde_derive"] } | |
serde_json = "1.0.135" | |
thiserror = "2.0.11" | |
tokio = { version = "1.43.0", features = ["full"] } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use rusqlite::{params, Connection, OptionalExtension, Result as SqlResult}; | |
use serde::{Deserialize, Serialize}; | |
use std::time::{SystemTime, UNIX_EPOCH}; | |
use thiserror::Error; | |
#[derive(Debug, Error)] | |
pub enum QueueError { | |
#[error("Database error: {0}")] | |
Database(#[from] rusqlite::Error), | |
#[error("Job not found: {0}")] | |
JobNotFound(i64), | |
#[error("Job already claimed: {0}")] | |
JobAlreadyClaimed(i64), | |
#[error("Serialization error: {0}")] | |
Serialization(#[from] serde_json::Error), | |
} | |
#[derive(Debug, Serialize, Deserialize)] | |
pub struct Job { | |
pub id: i64, | |
pub job_type: String, | |
pub payload: serde_json::Value, | |
pub status: JobStatus, | |
pub expire_time: i64, | |
pub attempts: i32, | |
pub last_error: Option<String>, | |
pub created_at: i64, | |
pub updated_at: i64, | |
} | |
#[derive(Debug, Serialize, Deserialize)] | |
pub enum JobStatus { | |
Pending, | |
Processing, | |
Completed, | |
Failed, | |
} | |
pub struct Queue { | |
conn: Connection, | |
} | |
impl Queue { | |
pub fn new(db_path: &str) -> SqlResult<Self> { | |
let conn = Connection::open(db_path)?; | |
conn.execute( | |
"CREATE TABLE IF NOT EXISTS jobs ( | |
id INTEGER PRIMARY KEY, | |
job_type TEXT NOT NULL, | |
payload TEXT NOT NULL, | |
status TEXT NOT NULL, | |
expire_time INTEGER NOT NULL, | |
attempts INTEGER NOT NULL DEFAULT 0, | |
last_error TEXT, | |
created_at INTEGER NOT NULL, | |
updated_at INTEGER NOT NULL | |
)", | |
[], | |
)?; | |
Ok(Queue { conn }) | |
} | |
pub fn enqueue_job<T: Serialize>( | |
&self, | |
job_type: &str, | |
payload: &T, | |
) -> Result<i64, QueueError> { | |
let now = SystemTime::now() | |
.duration_since(UNIX_EPOCH) | |
.unwrap() | |
.as_secs() as i64; | |
let payload_json = serde_json::to_string(payload)?; | |
let id = self.conn.execute( | |
"INSERT INTO jobs ( | |
job_type, payload, status, expire_time, attempts, | |
created_at, updated_at | |
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", | |
params![ | |
job_type, | |
payload_json, | |
"Pending", | |
0, // initial expire_time | |
0, // initial attempts | |
now, | |
now, | |
], | |
)?; | |
Ok(id as i64) | |
} | |
pub fn claim_job(&self) -> Result<Option<Job>, QueueError> { | |
let now = SystemTime::now() | |
.duration_since(UNIX_EPOCH) | |
.unwrap() | |
.as_secs() as i64; | |
let expire_time = now + 3600; // 1 hour lease | |
let mut stmt = self.conn.prepare( | |
"UPDATE jobs | |
SET status = 'Processing', | |
expire_time = ?1, | |
attempts = attempts + 1, | |
updated_at = ?2 | |
WHERE id IN ( | |
SELECT id FROM jobs | |
WHERE status = 'Pending' | |
OR (status = 'Processing' AND expire_time < ?3) | |
LIMIT 1 | |
) | |
RETURNING *", | |
)?; | |
let job = stmt | |
.query_row(params![expire_time, now, now], |row| { | |
Ok(Job { | |
id: row.get(0)?, | |
job_type: row.get(1)?, | |
payload: serde_json::from_str(&row.get::<_, String>(2)?).unwrap(), | |
status: JobStatus::Processing, | |
expire_time: row.get(4)?, | |
attempts: row.get(5)?, | |
last_error: row.get(6)?, | |
created_at: row.get(7)?, | |
updated_at: row.get(8)?, | |
}) | |
}) | |
.optional()?; | |
Ok(job) | |
} | |
pub fn complete_job(&self, job_id: i64) -> Result<(), QueueError> { | |
let now = SystemTime::now() | |
.duration_since(UNIX_EPOCH) | |
.unwrap() | |
.as_secs() as i64; | |
let rows = self.conn.execute( | |
"UPDATE jobs | |
SET status = 'Completed', | |
updated_at = ?1 | |
WHERE id = ?2 AND status = 'Processing'", | |
params![now, job_id], | |
)?; | |
if rows == 0 { | |
return Err(QueueError::JobNotFound(job_id)); | |
} | |
Ok(()) | |
} | |
pub fn fail_job(&self, job_id: i64, error: &str) -> Result<(), QueueError> { | |
let now = SystemTime::now() | |
.duration_since(UNIX_EPOCH) | |
.unwrap() | |
.as_secs() as i64; | |
let rows = self.conn.execute( | |
"UPDATE jobs | |
SET status = 'Failed', | |
last_error = ?1, | |
updated_at = ?2 | |
WHERE id = ?3 AND status = 'Processing'", | |
params![error, now, job_id], | |
)?; | |
if rows == 0 { | |
return Err(QueueError::JobNotFound(job_id)); | |
} | |
Ok(()) | |
} | |
} | |
// Example usage | |
fn main() -> Result<(), QueueError> { | |
let queue = Queue::new("jobs.db")?; | |
// Enqueue a job | |
let payload = serde_json::json!({ | |
"email": "[email protected]", | |
"template": "welcome" | |
}); | |
let job_id = queue.enqueue_job("send_email", &payload)?; | |
println!("Enqueued job: {}", job_id); | |
// Enqueue a job | |
let payload = serde_json::json!({ | |
"email": "[email protected]", | |
"template": "welcome" | |
}); | |
let job_id = queue.enqueue_job("send_email", &payload)?; | |
println!("Enqueued job: {}", job_id); | |
println!("-----------------------------------"); | |
// Process jobs | |
loop { | |
if let Some(job) = queue.claim_job()? { | |
println!("Processing job: {:?}", job); | |
// Simulate processing | |
match process_job(&job) { | |
Ok(_) => queue.complete_job(job.id)?, | |
Err(e) => queue.fail_job(job.id, &e.to_string())?, | |
} | |
} else { | |
// Sleep for a while before checking for more jobs | |
std::thread::sleep(std::time::Duration::from_secs(1)); | |
} | |
} | |
} | |
// Example job processor | |
fn process_job(job: &Job) -> Result<(), Box<dyn std::error::Error>> { | |
match job.job_type.as_str() { | |
"send_email" => { | |
println!("Sending email with payload: {:?}", job.payload); | |
Ok(()) | |
} | |
_ => Err("Unknown job type".into()), | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment