Created
August 3, 2025 18:17
-
-
Save a-agmon/a9f898ea1e34e4f360bf95ea0c70e61f to your computer and use it in GitHub Desktop.
Using graph-flow to identify object
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use async_trait::async_trait; | |
use chrono::Local; | |
use graph_flow::{ | |
Context, ExecutionStatus, FlowRunner, GraphBuilder, GraphStorage, InMemoryGraphStorage, | |
InMemorySessionStorage, NextAction, Session, SessionStorage, Task, TaskResult, | |
}; | |
use image::{ImageBuffer, Rgb}; | |
use minifb::{Key, Window, WindowOptions}; | |
use nokhwa::{ | |
pixel_format::RgbFormat, | |
utils::{CameraIndex, RequestedFormat, RequestedFormatType}, | |
Camera, | |
}; | |
use std::sync::mpsc::channel; | |
use std::sync::Arc; | |
use std::time::{Duration, Instant}; | |
use std::{fs, path::Path, thread}; | |
use tracing::{error, info}; | |
const WINDOW_WIDTH: usize = 640; | |
const WINDOW_HEIGHT: usize = 480; | |
const CAPTURE_INTERVAL_SECS: u64 = 5; | |
const OUTPUT_FOLDER: &str = "stream"; | |
struct CaptureTask; | |
#[async_trait] | |
impl Task for CaptureTask { | |
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> { | |
let _image_data: Vec<u8> = context | |
.get("image_data") | |
.await | |
.ok_or_else(|| graph_flow::GraphError::ContextError("image_data not found".to_string()))?; | |
let timestamp: String = context | |
.get("timestamp") | |
.await | |
.ok_or_else(|| graph_flow::GraphError::ContextError("timestamp not found".to_string()))?; | |
info!("Processing capture from {}", timestamp); | |
Ok(TaskResult::new( | |
Some(format!("Captured image at {}", timestamp)), | |
NextAction::ContinueAndExecute, | |
)) | |
} | |
} | |
async fn detect_teapot_in_image(image_data: &[u8], api_key: &str) -> Result<String, Box<dyn std::error::Error>> { | |
use base64::Engine as _; | |
let base64_image = base64::engine::general_purpose::STANDARD.encode(image_data); | |
let client = reqwest::Client::new(); | |
let request_body = serde_json::json!({ | |
"model": "openai/gpt-4o-mini", | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": "Analyze this image and tell me if you see a teapot. Look carefully for any teapots, tea pots, or ceramic vessels used for brewing tea. Respond with exactly 'TEAPOT DETECTED' if you see any teapot, otherwise respond with exactly 'NO TEAPOT'." | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": format!("data:image/jpeg;base64,{}", base64_image) | |
} | |
} | |
] | |
} | |
], | |
"max_tokens": 50 | |
}); | |
let response = client | |
.post("https://openrouter.ai/api/v1/chat/completions") | |
.header("Authorization", format!("Bearer {}", api_key)) | |
.header("Content-Type", "application/json") | |
.json(&request_body) | |
.send() | |
.await?; | |
let response_text = response.text().await?; | |
let json_resp = serde_json::from_str::<serde_json::Value>(&response_text)?; | |
if let Some(content) = json_resp["choices"][0]["message"]["content"].as_str() { | |
Ok(content.to_string()) | |
} else { | |
Err(format!("Failed to parse LLM response: {}", response_text).into()) | |
} | |
} | |
struct TeapotDetectionTask; | |
#[async_trait] | |
impl Task for TeapotDetectionTask { | |
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> { | |
let image_data: Vec<u8> = context | |
.get("image_data") | |
.await | |
.ok_or_else(|| graph_flow::GraphError::ContextError("image_data not found".to_string()))?; | |
let timestamp: String = context | |
.get("timestamp") | |
.await | |
.ok_or_else(|| graph_flow::GraphError::ContextError("timestamp not found".to_string()))?; | |
let api_key = std::env::var("OPENROUTER_API_KEY") | |
.unwrap_or_else(|_| { | |
error!("OPENROUTER_API_KEY not set - using mock response"); | |
String::new() | |
}); | |
if api_key.is_empty() { | |
info!("[{}] Mock LLM analysis: No teapot detected", timestamp); | |
return Ok(TaskResult::new( | |
Some("Mock analysis: No teapot detected".to_string()), | |
NextAction::End, | |
)); | |
} | |
match detect_teapot_in_image(&image_data, &api_key).await { | |
Ok(response) => { | |
info!("[{}] LLM Response: {}", timestamp, response); | |
if response.to_uppercase().contains("TEAPOT DETECTED") { | |
info!("🫖 TEAPOT DETECTED at {}!", timestamp); | |
} | |
Ok(TaskResult::new(Some(response), NextAction::End)) | |
} | |
Err(e) => { | |
error!("[{}] Error detecting teapot: {}", timestamp, e); | |
Ok(TaskResult::new( | |
Some(format!("Error analyzing image: {}", e)), | |
NextAction::End, | |
)) | |
} | |
} | |
} | |
} | |
#[derive(Clone)] | |
struct ImageMessage { | |
data: Vec<u8>, | |
timestamp: String, | |
} | |
fn initialize_camera() -> Result<Camera, Box<dyn std::error::Error>> { | |
let index = CameraIndex::Index(0); | |
let requested = RequestedFormat::new::<RgbFormat>(RequestedFormatType::AbsoluteHighestFrameRate); | |
let mut camera = Camera::new(index, requested)?; | |
camera.open_stream()?; | |
Ok(camera) | |
} | |
fn create_window() -> Result<Window, Box<dyn std::error::Error>> { | |
let mut window = Window::new( | |
"Camera Stream - Press ESC or Q to exit", | |
WINDOW_WIDTH, | |
WINDOW_HEIGHT, | |
WindowOptions::default(), | |
)?; | |
window.set_target_fps(30); | |
Ok(window) | |
} | |
fn ensure_output_folder() -> Result<(), Box<dyn std::error::Error>> { | |
if !Path::new(OUTPUT_FOLDER).exists() { | |
fs::create_dir(OUTPUT_FOLDER)?; | |
println!("Created output folder: {OUTPUT_FOLDER}"); | |
} | |
Ok(()) | |
} | |
fn convert_rgb_to_buffer(rgb_data: &[u8], width: u32, height: u32) -> Vec<u32> { | |
let scale_x = width as f32 / WINDOW_WIDTH as f32; | |
let scale_y = height as f32 / WINDOW_HEIGHT as f32; | |
let mut buffer = vec![0u32; WINDOW_WIDTH * WINDOW_HEIGHT]; | |
for y in 0..WINDOW_HEIGHT { | |
for x in 0..WINDOW_WIDTH { | |
let src_x = (x as f32 * scale_x) as usize; | |
let src_y = (y as f32 * scale_y) as usize; | |
let src_idx = (src_y * width as usize + src_x) * 3; | |
if src_idx + 2 < rgb_data.len() { | |
let r = rgb_data[src_idx] as u32; | |
let g = rgb_data[src_idx + 1] as u32; | |
let b = rgb_data[src_idx + 2] as u32; | |
buffer[y * WINDOW_WIDTH + x] = (r << 16) | (g << 8) | b; | |
} | |
} | |
} | |
buffer | |
} | |
fn save_frame(rgb_data: Vec<u8>, width: u32, height: u32) -> Result<(String, Vec<u8>), Box<dyn std::error::Error>> { | |
let timestamp = Local::now().format("%Y%m%d_%H%M%S").to_string(); | |
let filename = format!("capture_{timestamp}.jpg"); | |
let filepath = Path::new(OUTPUT_FOLDER).join(&filename); | |
let img = ImageBuffer::<Rgb<u8>, Vec<u8>>::from_raw(width, height, rgb_data) | |
.ok_or("Failed to create image buffer")?; | |
img.save(&filepath)?; | |
let mut jpeg_data = Vec::new(); | |
img.write_to(&mut std::io::Cursor::new(&mut jpeg_data), image::ImageFormat::Jpeg)?; | |
Ok((filepath.to_string_lossy().to_string(), jpeg_data)) | |
} | |
async fn process_images(receiver: std::sync::mpsc::Receiver<ImageMessage>) { | |
let session_storage = Arc::new(InMemorySessionStorage::new()); | |
let graph_storage = Arc::new(InMemoryGraphStorage::new()); | |
let capture_task = Arc::new(CaptureTask); | |
let capture_task_id = capture_task.id().to_string(); | |
let detection_task = Arc::new(TeapotDetectionTask); | |
let detection_task_id = detection_task.id().to_string(); | |
let graph = Arc::new( | |
GraphBuilder::new("teapot_detection_workflow") | |
.add_task(capture_task) | |
.add_task(detection_task) | |
.add_edge(&capture_task_id, &detection_task_id) | |
.build(), | |
); | |
graph_storage | |
.save("teapot_detection_workflow".to_string(), graph.clone()) | |
.await | |
.expect("Failed to save graph"); | |
let runner = FlowRunner::new(graph.clone(), session_storage.clone()); | |
while let Ok(msg) = receiver.recv() { | |
let session_id = format!("session_{}", uuid::Uuid::new_v4()); | |
let session = Session::new_from_task(session_id.clone(), &capture_task_id); | |
session.context.set("image_data", msg.data).await; | |
session.context.set("timestamp", msg.timestamp).await; | |
session_storage | |
.save(session.clone()) | |
.await | |
.expect("Failed to save session"); | |
info!("Starting workflow for capture at {}", session.context.get::<String>("timestamp").await.unwrap()); | |
match runner.run(&session_id).await { | |
Ok(execution_result) => { | |
if let Some(response) = &execution_result.response { | |
info!("Task response: {}", response); | |
} | |
match execution_result.status { | |
ExecutionStatus::Completed => { | |
info!("Workflow completed for session {}", session_id); | |
} | |
_ => { | |
info!("Workflow status: {:?}", execution_result.status); | |
} | |
} | |
} | |
Err(e) => { | |
error!("Runner error: {}", e); | |
break; | |
} | |
} | |
} | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
tracing_subscriber::fmt::init(); | |
info!("Starting camera capture application with teapot detection"); | |
ensure_output_folder()?; | |
let (tx, rx) = channel::<ImageMessage>(); | |
let processor_handle = tokio::spawn(async move { | |
process_images(rx).await; | |
}); | |
let mut camera = initialize_camera()?; | |
let mut window = create_window()?; | |
let resolution = camera.resolution(); | |
let width = resolution.width_x; | |
let height = resolution.height_y; | |
println!("Camera resolution: {width}x{height}"); | |
println!("Display window: {WINDOW_WIDTH}x{WINDOW_HEIGHT}"); | |
println!("Capturing images every {CAPTURE_INTERVAL_SECS} seconds"); | |
println!("Saving images to: {OUTPUT_FOLDER}/"); | |
println!("Press ESC or Q to exit"); | |
println!(); | |
println!("Teapot detection enabled - Show a teapot to the camera!"); | |
if std::env::var("OPENROUTER_API_KEY").is_err() { | |
println!("OPENROUTER_API_KEY not set - using mock responses"); | |
} | |
let mut last_capture = Instant::now(); | |
let capture_interval = Duration::from_secs(CAPTURE_INTERVAL_SECS); | |
while window.is_open() && !window.is_key_down(Key::Escape) && !window.is_key_down(Key::Q) { | |
let frame = camera.frame()?; | |
let decoded = frame.decode_image::<RgbFormat>()?; | |
let buffer = convert_rgb_to_buffer(&decoded, width, height); | |
window.update_with_buffer(&buffer, WINDOW_WIDTH, WINDOW_HEIGHT)?; | |
if last_capture.elapsed() >= capture_interval { | |
match save_frame(decoded.to_vec(), width, height) { | |
Ok((filename, jpeg_data)) => { | |
println!("Saved: {filename}"); | |
let timestamp = Local::now().format("%Y%m%d_%H%M%S").to_string(); | |
let msg = ImageMessage { | |
data: jpeg_data, | |
timestamp: timestamp.clone(), | |
}; | |
if let Err(e) = tx.send(msg) { | |
error!("Failed to send image to processor: {}", e); | |
} | |
} | |
Err(e) => error!("Failed to save frame: {}", e), | |
} | |
last_capture = Instant::now(); | |
} | |
thread::sleep(Duration::from_millis(1)); | |
} | |
camera.stop_stream()?; | |
println!("Camera stream stopped"); | |
drop(tx); | |
processor_handle.await?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment