Skip to content

Instantly share code, notes, and snippets.

@a-agmon
Created August 3, 2025 18:17
Show Gist options
  • Save a-agmon/a9f898ea1e34e4f360bf95ea0c70e61f to your computer and use it in GitHub Desktop.
Save a-agmon/a9f898ea1e34e4f360bf95ea0c70e61f to your computer and use it in GitHub Desktop.
Using graph-flow to identify object
use async_trait::async_trait;
use chrono::Local;
use graph_flow::{
Context, ExecutionStatus, FlowRunner, GraphBuilder, GraphStorage, InMemoryGraphStorage,
InMemorySessionStorage, NextAction, Session, SessionStorage, Task, TaskResult,
};
use image::{ImageBuffer, Rgb};
use minifb::{Key, Window, WindowOptions};
use nokhwa::{
pixel_format::RgbFormat,
utils::{CameraIndex, RequestedFormat, RequestedFormatType},
Camera,
};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fs, path::Path, thread};
use tracing::{error, info};
const WINDOW_WIDTH: usize = 640;
const WINDOW_HEIGHT: usize = 480;
const CAPTURE_INTERVAL_SECS: u64 = 5;
const OUTPUT_FOLDER: &str = "stream";
struct CaptureTask;
#[async_trait]
impl Task for CaptureTask {
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let _image_data: Vec<u8> = context
.get("image_data")
.await
.ok_or_else(|| graph_flow::GraphError::ContextError("image_data not found".to_string()))?;
let timestamp: String = context
.get("timestamp")
.await
.ok_or_else(|| graph_flow::GraphError::ContextError("timestamp not found".to_string()))?;
info!("Processing capture from {}", timestamp);
Ok(TaskResult::new(
Some(format!("Captured image at {}", timestamp)),
NextAction::ContinueAndExecute,
))
}
}
async fn detect_teapot_in_image(image_data: &[u8], api_key: &str) -> Result<String, Box<dyn std::error::Error>> {
use base64::Engine as _;
let base64_image = base64::engine::general_purpose::STANDARD.encode(image_data);
let client = reqwest::Client::new();
let request_body = serde_json::json!({
"model": "openai/gpt-4o-mini",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Analyze this image and tell me if you see a teapot. Look carefully for any teapots, tea pots, or ceramic vessels used for brewing tea. Respond with exactly 'TEAPOT DETECTED' if you see any teapot, otherwise respond with exactly 'NO TEAPOT'."
},
{
"type": "image_url",
"image_url": {
"url": format!("data:image/jpeg;base64,{}", base64_image)
}
}
]
}
],
"max_tokens": 50
});
let response = client
.post("https://openrouter.ai/api/v1/chat/completions")
.header("Authorization", format!("Bearer {}", api_key))
.header("Content-Type", "application/json")
.json(&request_body)
.send()
.await?;
let response_text = response.text().await?;
let json_resp = serde_json::from_str::<serde_json::Value>(&response_text)?;
if let Some(content) = json_resp["choices"][0]["message"]["content"].as_str() {
Ok(content.to_string())
} else {
Err(format!("Failed to parse LLM response: {}", response_text).into())
}
}
struct TeapotDetectionTask;
#[async_trait]
impl Task for TeapotDetectionTask {
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let image_data: Vec<u8> = context
.get("image_data")
.await
.ok_or_else(|| graph_flow::GraphError::ContextError("image_data not found".to_string()))?;
let timestamp: String = context
.get("timestamp")
.await
.ok_or_else(|| graph_flow::GraphError::ContextError("timestamp not found".to_string()))?;
let api_key = std::env::var("OPENROUTER_API_KEY")
.unwrap_or_else(|_| {
error!("OPENROUTER_API_KEY not set - using mock response");
String::new()
});
if api_key.is_empty() {
info!("[{}] Mock LLM analysis: No teapot detected", timestamp);
return Ok(TaskResult::new(
Some("Mock analysis: No teapot detected".to_string()),
NextAction::End,
));
}
match detect_teapot_in_image(&image_data, &api_key).await {
Ok(response) => {
info!("[{}] LLM Response: {}", timestamp, response);
if response.to_uppercase().contains("TEAPOT DETECTED") {
info!("🫖 TEAPOT DETECTED at {}!", timestamp);
}
Ok(TaskResult::new(Some(response), NextAction::End))
}
Err(e) => {
error!("[{}] Error detecting teapot: {}", timestamp, e);
Ok(TaskResult::new(
Some(format!("Error analyzing image: {}", e)),
NextAction::End,
))
}
}
}
}
#[derive(Clone)]
struct ImageMessage {
data: Vec<u8>,
timestamp: String,
}
fn initialize_camera() -> Result<Camera, Box<dyn std::error::Error>> {
let index = CameraIndex::Index(0);
let requested = RequestedFormat::new::<RgbFormat>(RequestedFormatType::AbsoluteHighestFrameRate);
let mut camera = Camera::new(index, requested)?;
camera.open_stream()?;
Ok(camera)
}
fn create_window() -> Result<Window, Box<dyn std::error::Error>> {
let mut window = Window::new(
"Camera Stream - Press ESC or Q to exit",
WINDOW_WIDTH,
WINDOW_HEIGHT,
WindowOptions::default(),
)?;
window.set_target_fps(30);
Ok(window)
}
fn ensure_output_folder() -> Result<(), Box<dyn std::error::Error>> {
if !Path::new(OUTPUT_FOLDER).exists() {
fs::create_dir(OUTPUT_FOLDER)?;
println!("Created output folder: {OUTPUT_FOLDER}");
}
Ok(())
}
fn convert_rgb_to_buffer(rgb_data: &[u8], width: u32, height: u32) -> Vec<u32> {
let scale_x = width as f32 / WINDOW_WIDTH as f32;
let scale_y = height as f32 / WINDOW_HEIGHT as f32;
let mut buffer = vec![0u32; WINDOW_WIDTH * WINDOW_HEIGHT];
for y in 0..WINDOW_HEIGHT {
for x in 0..WINDOW_WIDTH {
let src_x = (x as f32 * scale_x) as usize;
let src_y = (y as f32 * scale_y) as usize;
let src_idx = (src_y * width as usize + src_x) * 3;
if src_idx + 2 < rgb_data.len() {
let r = rgb_data[src_idx] as u32;
let g = rgb_data[src_idx + 1] as u32;
let b = rgb_data[src_idx + 2] as u32;
buffer[y * WINDOW_WIDTH + x] = (r << 16) | (g << 8) | b;
}
}
}
buffer
}
fn save_frame(rgb_data: Vec<u8>, width: u32, height: u32) -> Result<(String, Vec<u8>), Box<dyn std::error::Error>> {
let timestamp = Local::now().format("%Y%m%d_%H%M%S").to_string();
let filename = format!("capture_{timestamp}.jpg");
let filepath = Path::new(OUTPUT_FOLDER).join(&filename);
let img = ImageBuffer::<Rgb<u8>, Vec<u8>>::from_raw(width, height, rgb_data)
.ok_or("Failed to create image buffer")?;
img.save(&filepath)?;
let mut jpeg_data = Vec::new();
img.write_to(&mut std::io::Cursor::new(&mut jpeg_data), image::ImageFormat::Jpeg)?;
Ok((filepath.to_string_lossy().to_string(), jpeg_data))
}
async fn process_images(receiver: std::sync::mpsc::Receiver<ImageMessage>) {
let session_storage = Arc::new(InMemorySessionStorage::new());
let graph_storage = Arc::new(InMemoryGraphStorage::new());
let capture_task = Arc::new(CaptureTask);
let capture_task_id = capture_task.id().to_string();
let detection_task = Arc::new(TeapotDetectionTask);
let detection_task_id = detection_task.id().to_string();
let graph = Arc::new(
GraphBuilder::new("teapot_detection_workflow")
.add_task(capture_task)
.add_task(detection_task)
.add_edge(&capture_task_id, &detection_task_id)
.build(),
);
graph_storage
.save("teapot_detection_workflow".to_string(), graph.clone())
.await
.expect("Failed to save graph");
let runner = FlowRunner::new(graph.clone(), session_storage.clone());
while let Ok(msg) = receiver.recv() {
let session_id = format!("session_{}", uuid::Uuid::new_v4());
let session = Session::new_from_task(session_id.clone(), &capture_task_id);
session.context.set("image_data", msg.data).await;
session.context.set("timestamp", msg.timestamp).await;
session_storage
.save(session.clone())
.await
.expect("Failed to save session");
info!("Starting workflow for capture at {}", session.context.get::<String>("timestamp").await.unwrap());
match runner.run(&session_id).await {
Ok(execution_result) => {
if let Some(response) = &execution_result.response {
info!("Task response: {}", response);
}
match execution_result.status {
ExecutionStatus::Completed => {
info!("Workflow completed for session {}", session_id);
}
_ => {
info!("Workflow status: {:?}", execution_result.status);
}
}
}
Err(e) => {
error!("Runner error: {}", e);
break;
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
info!("Starting camera capture application with teapot detection");
ensure_output_folder()?;
let (tx, rx) = channel::<ImageMessage>();
let processor_handle = tokio::spawn(async move {
process_images(rx).await;
});
let mut camera = initialize_camera()?;
let mut window = create_window()?;
let resolution = camera.resolution();
let width = resolution.width_x;
let height = resolution.height_y;
println!("Camera resolution: {width}x{height}");
println!("Display window: {WINDOW_WIDTH}x{WINDOW_HEIGHT}");
println!("Capturing images every {CAPTURE_INTERVAL_SECS} seconds");
println!("Saving images to: {OUTPUT_FOLDER}/");
println!("Press ESC or Q to exit");
println!();
println!("Teapot detection enabled - Show a teapot to the camera!");
if std::env::var("OPENROUTER_API_KEY").is_err() {
println!("OPENROUTER_API_KEY not set - using mock responses");
}
let mut last_capture = Instant::now();
let capture_interval = Duration::from_secs(CAPTURE_INTERVAL_SECS);
while window.is_open() && !window.is_key_down(Key::Escape) && !window.is_key_down(Key::Q) {
let frame = camera.frame()?;
let decoded = frame.decode_image::<RgbFormat>()?;
let buffer = convert_rgb_to_buffer(&decoded, width, height);
window.update_with_buffer(&buffer, WINDOW_WIDTH, WINDOW_HEIGHT)?;
if last_capture.elapsed() >= capture_interval {
match save_frame(decoded.to_vec(), width, height) {
Ok((filename, jpeg_data)) => {
println!("Saved: {filename}");
let timestamp = Local::now().format("%Y%m%d_%H%M%S").to_string();
let msg = ImageMessage {
data: jpeg_data,
timestamp: timestamp.clone(),
};
if let Err(e) = tx.send(msg) {
error!("Failed to send image to processor: {}", e);
}
}
Err(e) => error!("Failed to save frame: {}", e),
}
last_capture = Instant::now();
}
thread::sleep(Duration::from_millis(1));
}
camera.stop_stream()?;
println!("Camera stream stopped");
drop(tx);
processor_handle.await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment