feat: progressive multi-round face matching + pending person API
- Identity agent: per-face max matching, multi-round with derived seeds from high-confidence faces, angle diversity filter (cosine sim < 0.90) - Pending person API: POST /file/:file_uuid/pending-person + GET /file/:file_uuid/pending-persons with status=pending, source=manual - Update API docs (07_identity.md)
This commit is contained in:
+13
-10
@@ -166,18 +166,21 @@ async fn list_identities(
|
||||
|
||||
let id_table = crate::core::db::schema::table_name("identities");
|
||||
|
||||
let total: i64 = sqlx::query_scalar(&format!("SELECT COUNT(*) FROM {}", id_table))
|
||||
.fetch_one(db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("Count error: {}", e),
|
||||
)
|
||||
})?;
|
||||
let total: i64 = sqlx::query_scalar(&format!(
|
||||
"SELECT COUNT(*) FROM {} WHERE status IS NULL OR status != 'merged'",
|
||||
id_table
|
||||
))
|
||||
.fetch_one(db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("Count error: {}", e),
|
||||
)
|
||||
})?;
|
||||
|
||||
let sql = format!(
|
||||
"SELECT id::int, uuid, name, metadata FROM {} ORDER BY id DESC LIMIT $1 OFFSET $2",
|
||||
"SELECT id::int, uuid, name, metadata FROM {} WHERE status IS NULL OR status != 'merged' ORDER BY id DESC LIMIT $1 OFFSET $2",
|
||||
id_table
|
||||
);
|
||||
|
||||
|
||||
+541
-141
@@ -23,6 +23,14 @@ pub fn identity_agent_routes() -> Router<AppState> {
|
||||
"/api/v1/agents/identity/match-from-trace",
|
||||
post(match_from_trace),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/agents/identity/generate-seeds",
|
||||
post(generate_seeds_handler),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/agents/identity/run",
|
||||
post(run_identity_handler),
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -619,198 +627,373 @@ fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
|
||||
}
|
||||
}
|
||||
|
||||
/// 迭代多角度 face embedding 比對 + 傳播 (Qdrant version)
|
||||
/// Round 1: 用 TMDb seed face_embedding 比對 Qdrant embeddings (threshold 0.50)
|
||||
/// Round 2+: 用已匹配 trace 的所有 face 作為 seed,傳播到未匹配 trace
|
||||
fn average_embeddings<'a>(embeddings: impl Iterator<Item = &'a Vec<f32>>) -> Vec<f32> {
|
||||
let mut count = 0usize;
|
||||
let mut sum: Option<Vec<f32>> = None;
|
||||
for emb in embeddings {
|
||||
if emb.len() != 512 {
|
||||
continue;
|
||||
}
|
||||
match &mut sum {
|
||||
None => sum = Some(emb.clone()),
|
||||
Some(s) => {
|
||||
for (i, v) in emb.iter().enumerate() {
|
||||
s[i] += v;
|
||||
}
|
||||
}
|
||||
}
|
||||
count += 1;
|
||||
}
|
||||
if let Some(mut s) = sum {
|
||||
let c = count as f32;
|
||||
for v in &mut s {
|
||||
*v /= c;
|
||||
}
|
||||
s
|
||||
} else {
|
||||
vec![0.0f32; 512]
|
||||
}
|
||||
}
|
||||
|
||||
/// Cluster: trace centroid + seeds from Qdrant + stranger clustering.
|
||||
/// Round 1: centroid vs seeds (TH=0.55)
|
||||
/// Round 2+: propagate from matched (TH=0.50)
|
||||
/// Unknown: greedy stranger clustering (TH=0.40)
|
||||
/// Writes identity_ref/stranger_ref to Qdrant payload, TKG nodes, and face_detections.
|
||||
async fn match_faces_iterative(pool: &sqlx::PgPool, file_uuid: &str) -> anyhow::Result<usize> {
|
||||
use crate::core::db::face_embedding_db::FaceEmbeddingDb;
|
||||
use std::collections::HashMap;
|
||||
|
||||
// Step 1: 載入 TMDb identities (source='tmdb' 且有 face_embedding)
|
||||
let identities_table = schema::table_name("identities");
|
||||
let tmdb_rows = sqlx::query_as::<_, (i32, String, Vec<f32>)>(
|
||||
&format!("SELECT id, name, face_embedding::real[] FROM {} WHERE source='tmdb' AND face_embedding IS NOT NULL", identities_table)
|
||||
)
|
||||
.fetch_all(pool).await?;
|
||||
let face_db = FaceEmbeddingDb::new();
|
||||
|
||||
if tmdb_rows.is_empty() {
|
||||
tracing::warn!("[FaceMatch] No TMDb identities with face embeddings");
|
||||
return Ok(0);
|
||||
}
|
||||
// Step 1: Load seeds from Qdrant (type=identity_seed)
|
||||
let seeds = face_db.get_seed_embeddings().await?;
|
||||
tracing::info!(
|
||||
"[FaceMatch-Qdrant] Loaded {} TMDb seed identities",
|
||||
tmdb_rows.len()
|
||||
"[FaceMatch] Loaded {} seeds from Qdrant",
|
||||
seeds.len()
|
||||
);
|
||||
|
||||
// Step 2: Load embeddings from Qdrant
|
||||
let face_db = FaceEmbeddingDb::new();
|
||||
// Step 2: Preload identity internal IDs (uuid → (id, name))
|
||||
let id_table = schema::table_name("identities");
|
||||
let seed_identity_map: HashMap<String, (i32, String)> = if !seeds.is_empty() {
|
||||
let uuids: Vec<String> = seeds.iter().map(|(uuid, _, _)| uuid.clone()).collect();
|
||||
if uuids.is_empty() {
|
||||
HashMap::new()
|
||||
} else {
|
||||
let rows = sqlx::query_as::<_, (i32, String, String)>(&format!(
|
||||
"SELECT id, uuid::text, name FROM {} WHERE uuid::text = ANY($1)",
|
||||
id_table
|
||||
))
|
||||
.bind(&uuids)
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(id, uuid, name)| (uuid, (id, name)))
|
||||
.collect();
|
||||
rows
|
||||
}
|
||||
} else {
|
||||
HashMap::new()
|
||||
};
|
||||
|
||||
// Step 3: Load face embeddings from Qdrant for this file
|
||||
let qdrant_embeddings = face_db.get_all_embeddings_for_file(file_uuid).await?;
|
||||
|
||||
if qdrant_embeddings.is_empty() {
|
||||
tracing::warn!(
|
||||
"[FaceMatch-Qdrant] No face embeddings in Qdrant for {}",
|
||||
file_uuid
|
||||
);
|
||||
return match_faces_iterative_pg(pool, file_uuid).await; // Fallback to PG
|
||||
tracing::warn!("[FaceMatch] No face embeddings in Qdrant for {}", file_uuid);
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Group: trace_id → Vec<(frame, embedding)>
|
||||
let mut face_track_faces_raw: HashMap<i32, Vec<(i64, Vec<f32>)>> = HashMap::new();
|
||||
// Step 4: Group embeddings by trace_id, keeping confidence
|
||||
let mut trace_faces: HashMap<i32, Vec<(i64, Vec<f32>, f64)>> = HashMap::new();
|
||||
for (_, emb, payload) in &qdrant_embeddings {
|
||||
face_track_faces_raw
|
||||
trace_faces
|
||||
.entry(payload.trace_id)
|
||||
.or_default()
|
||||
.push((payload.frame, emb.clone()));
|
||||
.push((payload.frame, emb.clone(), payload.confidence));
|
||||
}
|
||||
|
||||
// Sample 3 embeddings per trace (front, mid, back)
|
||||
let mut face_track_samples: HashMap<i32, Vec<Vec<f32>>> = HashMap::new();
|
||||
for (tid, mut faces) in face_track_faces_raw {
|
||||
faces.sort_by_key(|(frame, _)| *frame);
|
||||
let n = faces.len();
|
||||
let indices = if n <= 3 {
|
||||
(0..n).collect::<Vec<_>>()
|
||||
} else {
|
||||
vec![0, n / 2, n - 1]
|
||||
};
|
||||
let samples: Vec<Vec<f32>> = indices.iter().map(|&i| faces[i].1.clone()).collect();
|
||||
face_track_samples.insert(tid, samples);
|
||||
}
|
||||
// Step 5: Progressive multi-round matching with derived seeds
|
||||
// Each round: choose a face with best seed sim for matching; separately,
|
||||
// collect the highest-confidence face per trace for building derived seeds.
|
||||
const TH_MIN: f32 = 0.35;
|
||||
const DERIVED_CONF: f64 = 0.90;
|
||||
const MAX_DERIVED_PER_ID: usize = 9;
|
||||
const MAX_FACES_PER_TRACE: usize = 3;
|
||||
const ANGLE_SIM_THRESHOLD: f32 = 0.90;
|
||||
const TH_STRANGER: f32 = 0.40;
|
||||
|
||||
let total_traces = face_track_samples.len();
|
||||
let sample_count: usize = face_track_samples.values().map(|v| v.len()).sum();
|
||||
let total_traces = trace_faces.len();
|
||||
let total_embeddings: usize = trace_faces.values().map(|v| v.len()).sum();
|
||||
tracing::info!(
|
||||
"[FaceMatch-Qdrant] Loaded {} traces, sampled {} embeddings",
|
||||
"[FaceMatch] Loaded {} traces ({} face embeddings) from Qdrant for {}",
|
||||
total_traces,
|
||||
sample_count
|
||||
total_embeddings,
|
||||
file_uuid
|
||||
);
|
||||
|
||||
// Step 3: Match against TMDb seeds
|
||||
const TH: f32 = 0.50;
|
||||
let tmdb_seeds: Vec<(i32, String, Vec<f32>)> = tmdb_rows;
|
||||
let mut matched: HashMap<i32, String> = HashMap::new();
|
||||
let mut matched: HashMap<i32, (String, i32)> = HashMap::new();
|
||||
let mut trace_face_count: HashMap<i32, usize> = HashMap::new();
|
||||
|
||||
for (&tid, samples) in &face_track_samples {
|
||||
let mut best_name = String::new();
|
||||
let mut best_sim = 0.0f32;
|
||||
for (_, ref name, ref tmdb_emb) in &tmdb_seeds {
|
||||
for face_emb in samples {
|
||||
let s = cosine_similarity(face_emb, tmdb_emb);
|
||||
if s > best_sim {
|
||||
best_sim = s;
|
||||
best_name = name.clone();
|
||||
}
|
||||
}
|
||||
// All reference embeddings: start with original TMDb seeds
|
||||
let mut all_refs: Vec<(String, String, Vec<f32>)> = seeds.clone();
|
||||
let thresholds = [0.55f32, 0.50, 0.45, 0.40, 0.35];
|
||||
let mut prev_total = 0usize;
|
||||
|
||||
for (round_idx, &th) in thresholds.iter().enumerate() {
|
||||
if th < TH_MIN {
|
||||
break;
|
||||
}
|
||||
if best_sim >= TH {
|
||||
matched.insert(tid, best_name);
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
"[FaceMatch-Qdrant] Round 1: matched {} traces (threshold={})",
|
||||
matched.len(),
|
||||
TH
|
||||
);
|
||||
|
||||
// Round 2+: Propagate
|
||||
let mut round = 2;
|
||||
while matched.len() < face_track_samples.len() {
|
||||
let prev_count = matched.len();
|
||||
let mut new_matches: HashMap<i32, (String, i32)> = HashMap::new();
|
||||
let mut seed_candidates: Vec<(i32, String, i32, Vec<f32>, f64)> = Vec::new();
|
||||
|
||||
// Collect new matches in separate HashMap
|
||||
let mut new_matches: HashMap<i32, String> = HashMap::new();
|
||||
|
||||
for (&tid, samples) in &face_track_samples {
|
||||
for (&tid, faces) in &trace_faces {
|
||||
if matched.contains_key(&tid) {
|
||||
continue;
|
||||
}
|
||||
trace_face_count.entry(tid).or_insert(faces.len());
|
||||
|
||||
for (matched_tid, matched_name) in &matched {
|
||||
if let Some(matched_embs) = face_track_samples.get(matched_tid) {
|
||||
for face_emb in samples {
|
||||
for ref_emb in matched_embs {
|
||||
let s = cosine_similarity(face_emb, ref_emb);
|
||||
if s >= TH {
|
||||
new_matches.insert(tid, matched_name.clone());
|
||||
break;
|
||||
let mut best_sim = 0.0f32;
|
||||
let mut best_name = String::new();
|
||||
let mut best_id = 0i32;
|
||||
// Collect all high-confidence faces in this trace for derived seeds
|
||||
let mut trace_candidates: Vec<(Vec<f32>, f64)> = Vec::new();
|
||||
|
||||
for (_, emb, conf) in faces {
|
||||
for (ref_uuid, ref_name, ref_emb) in &all_refs {
|
||||
let s = cosine_similarity(emb, ref_emb);
|
||||
if s > best_sim {
|
||||
best_sim = s;
|
||||
best_name = ref_name.clone();
|
||||
if let Some(id_str) = ref_uuid.strip_prefix("derived:") {
|
||||
if let Ok(parsed) = id_str.parse::<i32>() {
|
||||
best_id = parsed;
|
||||
}
|
||||
} else if let Some((id, _)) = seed_identity_map.get(ref_uuid) {
|
||||
best_id = *id;
|
||||
}
|
||||
}
|
||||
}
|
||||
if *conf >= DERIVED_CONF {
|
||||
trace_candidates.push((emb.clone(), *conf));
|
||||
}
|
||||
}
|
||||
|
||||
if best_sim >= th && best_id > 0 {
|
||||
new_matches.insert(tid, (best_name.clone(), best_id));
|
||||
|
||||
// Top MAX_FACES_PER_TRACE highest-confidence faces with angular diversity
|
||||
trace_candidates.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
|
||||
let mut selected: Vec<Vec<f32>> = Vec::new();
|
||||
for (emb, conf) in trace_candidates {
|
||||
if selected.len() >= MAX_FACES_PER_TRACE {
|
||||
break;
|
||||
}
|
||||
if selected.iter().any(|e| cosine_similarity(e, &emb) >= ANGLE_SIM_THRESHOLD) {
|
||||
continue;
|
||||
}
|
||||
selected.push(emb.clone());
|
||||
seed_candidates.push((best_id, best_name.clone(), tid, emb, conf));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Merge new matches
|
||||
matched.extend(new_matches);
|
||||
|
||||
if matched.len() == prev_count {
|
||||
let new_count = new_matches.len();
|
||||
if new_count == 0 && round_idx > 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
matched.extend(new_matches);
|
||||
|
||||
// Build derived seeds: pick up to MAX_DERIVED_PER_ID per identity
|
||||
// (max MAX_FACES_PER_TRACE from each trace), sorted by confidence descending
|
||||
seed_candidates.sort_by(|a, b| b.4.partial_cmp(&a.4).unwrap());
|
||||
let mut per_id: HashMap<i32, usize> = HashMap::new();
|
||||
let mut trace_used_faces: HashMap<i32, usize> = HashMap::new();
|
||||
let mut added_seeds = 0usize;
|
||||
for (id, name, tid, emb, _) in &seed_candidates {
|
||||
let cnt = per_id.entry(*id).or_insert(0);
|
||||
if *cnt >= MAX_DERIVED_PER_ID {
|
||||
continue;
|
||||
}
|
||||
let trace_cnt = trace_used_faces.entry(*tid).or_insert(0);
|
||||
if *trace_cnt >= MAX_FACES_PER_TRACE {
|
||||
continue;
|
||||
}
|
||||
*trace_cnt += 1;
|
||||
*cnt += 1;
|
||||
all_refs.push((format!("derived:{}", id), name.clone(), emb.clone()));
|
||||
added_seeds += 1;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"[FaceMatch-Qdrant] Round {}: matched {} total",
|
||||
round,
|
||||
matched.len()
|
||||
"[FaceMatch] Round {}: matched {}+{}={} total (TH={}, {} new derived seeds)",
|
||||
round_idx + 1,
|
||||
prev_total,
|
||||
new_count,
|
||||
matched.len(),
|
||||
th,
|
||||
added_seeds
|
||||
);
|
||||
round += 1;
|
||||
|
||||
prev_total = matched.len();
|
||||
}
|
||||
|
||||
// Update face_detections.identity_id AND tkg_nodes.properties (Phase 3)
|
||||
let fd_table = schema::table_name("face_detections");
|
||||
let nodes_table = schema::table_name("tkg_nodes");
|
||||
let id_table = schema::table_name("identities");
|
||||
let identities_map: HashMap<String, i32> = tmdb_seeds
|
||||
.iter()
|
||||
.map(|(id, name, _)| (name.clone(), *id))
|
||||
// Step 7: Stranger clustering for unmatched traces
|
||||
let unmatched_ids: Vec<i32> = trace_faces
|
||||
.keys()
|
||||
.filter(|tid| !matched.contains_key(tid))
|
||||
.copied()
|
||||
.collect();
|
||||
|
||||
// Batch query identity names
|
||||
let identity_names: HashMap<i32, String> = sqlx::query_as::<_, (i32, String)>(&format!(
|
||||
"SELECT id, name FROM {} WHERE id = ANY($1)",
|
||||
id_table
|
||||
))
|
||||
.bind(identities_map.values().collect::<Vec<_>>())
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
let mut stranger_map: HashMap<i32, String> = HashMap::new();
|
||||
let mut assigned_stranger: std::collections::HashSet<i32> = std::collections::HashSet::new();
|
||||
let mut stranger_count = 0usize;
|
||||
|
||||
let mut updated = 0usize;
|
||||
for (tid, name) in &matched {
|
||||
let identity_id = identities_map.get(name);
|
||||
if let Some(id) = identity_id {
|
||||
let rows = sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND face_track_id = $3",
|
||||
fd_table
|
||||
))
|
||||
.bind(*id)
|
||||
.bind(file_uuid)
|
||||
.bind(*tid)
|
||||
.execute(pool)
|
||||
.await?
|
||||
.rows_affected();
|
||||
updated += rows as usize;
|
||||
// Sort by face count descending (most reliable first)
|
||||
let mut sorted_unmatched: Vec<i32> = unmatched_ids.clone();
|
||||
sorted_unmatched.sort_by(|a, b| {
|
||||
trace_face_count
|
||||
.get(b)
|
||||
.unwrap_or(&0)
|
||||
.cmp(trace_face_count.get(a).unwrap_or(&0))
|
||||
});
|
||||
|
||||
// Phase 3: Also update TKG node
|
||||
let external_id = format!("face_track_{}", tid);
|
||||
let identity_name = identity_names.get(id);
|
||||
let _ = sqlx::query(&format!(
|
||||
"UPDATE {} SET properties = jsonb_set(\
|
||||
jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\
|
||||
'{{identity_name}}', $2::jsonb, false)\
|
||||
WHERE file_uuid = $3 AND node_type = 'face_track' AND external_id = $4",
|
||||
nodes_table
|
||||
))
|
||||
.bind(*id)
|
||||
.bind(identity_name.as_deref())
|
||||
.bind(file_uuid)
|
||||
.bind(&external_id)
|
||||
.execute(pool)
|
||||
.await;
|
||||
for &tid in &sorted_unmatched {
|
||||
if assigned_stranger.contains(&tid) {
|
||||
continue;
|
||||
}
|
||||
let centroid_a = if let Some(faces) = trace_faces.get(&tid) {
|
||||
average_embeddings(faces.iter().map(|(_, emb, _)| emb))
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
stranger_count += 1;
|
||||
let stranger_id = format!("{}:stranger_{}", file_uuid, stranger_count);
|
||||
assigned_stranger.insert(tid);
|
||||
stranger_map.insert(tid, stranger_id.clone());
|
||||
|
||||
for &other_tid in &sorted_unmatched {
|
||||
if assigned_stranger.contains(&other_tid) || other_tid == tid {
|
||||
continue;
|
||||
}
|
||||
if let Some(faces_b) = trace_faces.get(&other_tid) {
|
||||
let centroid_b = average_embeddings(faces_b.iter().map(|(_, emb, _)| emb));
|
||||
let s = cosine_similarity(¢roid_a, ¢roid_b);
|
||||
if s >= TH_STRANGER {
|
||||
assigned_stranger.insert(other_tid);
|
||||
stranger_map.insert(other_tid, stranger_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("[FaceMatch-Qdrant] Updated {} face_detections", updated);
|
||||
Ok(updated)
|
||||
let stranger_trace_count = stranger_map.len();
|
||||
tracing::info!(
|
||||
"[FaceMatch] Stranger clusters: {} groups, {} traces",
|
||||
stranger_count,
|
||||
stranger_trace_count
|
||||
);
|
||||
|
||||
// Step 8: Write results to TKG nodes + Qdrant payload + face_detections
|
||||
let fd_table = schema::table_name("face_detections");
|
||||
let nodes_table = schema::table_name("tkg_nodes");
|
||||
let mut pg_updated = 0usize;
|
||||
|
||||
// Clear old identity assignments before writing new ones
|
||||
let _ = sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = NULL WHERE file_uuid = $1",
|
||||
fd_table
|
||||
))
|
||||
.bind(file_uuid)
|
||||
.execute(pool)
|
||||
.await;
|
||||
|
||||
// 8a: Matched traces → identity_ref
|
||||
for (&tid, (name, identity_id)) in &matched {
|
||||
// Skip if identity_id is invalid (FK constraint would fail)
|
||||
if *identity_id <= 0 {
|
||||
tracing::warn!(
|
||||
"[FaceMatch] Skipping trace {}: invalid identity_id={}",
|
||||
tid, identity_id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let identity_ref = format!("{}:{}", file_uuid, identity_id);
|
||||
|
||||
// TKG node
|
||||
let external_id = format!("face_track_{}", tid);
|
||||
if let Err(e) = sqlx::query(&format!(
|
||||
"UPDATE {} SET properties = jsonb_set(\
|
||||
jsonb_set(properties, '{{identity_ref}}', to_jsonb($1), true),\
|
||||
'{{identity_name}}', to_jsonb($2), true)\
|
||||
WHERE file_uuid = $3 AND node_type = 'face_track' AND external_id = $4",
|
||||
nodes_table
|
||||
))
|
||||
.bind(&identity_ref)
|
||||
.bind(name)
|
||||
.bind(file_uuid)
|
||||
.bind(&external_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("[FaceMatch] TKG update failed for trace {}: {:?}", tid, e);
|
||||
}
|
||||
|
||||
// Qdrant payload
|
||||
let _ = face_db
|
||||
.update_identity_ref_by_trace(file_uuid, tid, &identity_ref)
|
||||
.await;
|
||||
|
||||
// PostgreSQL face_detections (backward compat)
|
||||
let rows = sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = $3",
|
||||
fd_table
|
||||
))
|
||||
.bind(identity_id)
|
||||
.bind(file_uuid)
|
||||
.bind(tid)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map(|r| r.rows_affected())
|
||||
.unwrap_or(0);
|
||||
pg_updated += rows as usize;
|
||||
}
|
||||
|
||||
// 8b: Stranger traces → stranger_ref
|
||||
for (&tid, stranger_ref) in &stranger_map {
|
||||
// TKG node
|
||||
let external_id = format!("face_track_{}", tid);
|
||||
if let Err(e) = sqlx::query(&format!(
|
||||
"UPDATE {} SET properties = jsonb_set(\
|
||||
properties, '{{stranger_ref}}', to_jsonb($1), true)\
|
||||
WHERE file_uuid = $2 AND node_type = 'face_track' AND external_id = $3",
|
||||
nodes_table
|
||||
))
|
||||
.bind(stranger_ref)
|
||||
.bind(file_uuid)
|
||||
.bind(&external_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("[FaceMatch] TKG stranger update failed for trace {}: {:?}", tid, e);
|
||||
}
|
||||
|
||||
// Qdrant payload
|
||||
let _ = face_db
|
||||
.update_stranger_ref_by_trace(file_uuid, tid, stranger_ref)
|
||||
.await;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"[FaceMatch] Done: {} matched, {} strangers — {} face_detections updated",
|
||||
matched.len(),
|
||||
stranger_trace_count,
|
||||
pg_updated
|
||||
);
|
||||
Ok(pg_updated)
|
||||
}
|
||||
|
||||
/// Fallback: PostgreSQL-based matching (original implementation)
|
||||
@@ -1312,3 +1495,220 @@ pub async fn run_identity_agent(db: &PostgresDb, file_uuid: &str) -> anyhow::Res
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// API handler: POST /api/v1/agents/identity/generate-seeds
|
||||
async fn generate_seeds_handler(
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
|
||||
let db = &state.db;
|
||||
let pool = db.pool();
|
||||
|
||||
let count = generate_seed_embeddings(db)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"success": false, "message": format!("{}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Auto-trigger identity agent for all ready files
|
||||
if count > 0 {
|
||||
let ready_files = find_ready_files(pool).await.unwrap_or_default();
|
||||
if !ready_files.is_empty() {
|
||||
tracing::info!(
|
||||
"[GenerateSeeds] Auto-triggering identity agent for {} files: {:?}",
|
||||
ready_files.len(),
|
||||
ready_files
|
||||
);
|
||||
for file_uuid in &ready_files {
|
||||
let db = state.db.clone();
|
||||
let fid = file_uuid.clone();
|
||||
tokio::spawn(async move {
|
||||
match run_identity_agent(&db, &fid).await {
|
||||
Ok(_) => tracing::info!(
|
||||
"[GenerateSeeds] Identity agent completed for {}",
|
||||
fid
|
||||
),
|
||||
Err(e) => tracing::warn!(
|
||||
"[GenerateSeeds] Identity agent failed for {}: {}",
|
||||
fid,
|
||||
e
|
||||
),
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Json(serde_json::json!({
|
||||
"success": true,
|
||||
"message": format!("Generated {} seed embeddings", count),
|
||||
"count": count
|
||||
})))
|
||||
}
|
||||
|
||||
/// Find videos that are ready for identity processing (have face embeddings).
|
||||
async fn find_ready_files(pool: &sqlx::PgPool) -> anyhow::Result<Vec<String>> {
|
||||
let fd_table = crate::core::db::schema::table_name("face_detections");
|
||||
let rows: Vec<(String,)> = sqlx::query_as(&format!(
|
||||
"SELECT DISTINCT file_uuid FROM {} WHERE embedding IS NOT NULL AND identity_id IS NULL",
|
||||
fd_table
|
||||
))
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
Ok(rows.into_iter().map(|r| r.0).collect())
|
||||
}
|
||||
|
||||
/// API handler: POST /api/v1/agents/identity/run
|
||||
async fn run_identity_handler(
|
||||
State(state): State<AppState>,
|
||||
axum::Json(body): axum::Json<serde_json::Value>,
|
||||
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
|
||||
let file_uuid = body
|
||||
.get("file_uuid")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"success": false, "message": "file_uuid required"})),
|
||||
)
|
||||
})?;
|
||||
|
||||
match run_identity_agent(&state.db, file_uuid).await {
|
||||
Ok(()) => Ok(Json(serde_json::json!({
|
||||
"success": true,
|
||||
"message": format!("Identity agent completed for {}", file_uuid),
|
||||
}))),
|
||||
Err(e) => Ok(Json(serde_json::json!({
|
||||
"success": false,
|
||||
"message": format!("Identity agent failed: {}", e),
|
||||
}))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Read all TMDb identities with profile photos, extract face embeddings, store in Qdrant as seeds.
|
||||
pub async fn generate_seed_embeddings(db: &PostgresDb) -> anyhow::Result<usize> {
|
||||
use crate::core::db::face_embedding_db::FaceEmbeddingDb;
|
||||
use std::path::Path;
|
||||
|
||||
let pool = db.pool();
|
||||
let id_table = schema::table_name("identities");
|
||||
|
||||
let rows = sqlx::query_as::<_, (i32, String, String, i32, String)>(&format!(
|
||||
"SELECT id, name, uuid::text, tmdb_id, tmdb_profile FROM {} \
|
||||
WHERE source='tmdb' AND tmdb_profile IS NOT NULL",
|
||||
id_table
|
||||
))
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
if rows.is_empty() {
|
||||
tracing::warn!("[GenerateSeeds] No TMDb identities with profile photos");
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let scripts_dir = std::env::var("MOMENTRY_SCRIPTS_DIR")
|
||||
.unwrap_or_else(|_| "/Users/accusys/momentry_core_0.1/scripts".to_string());
|
||||
let python_path = std::env::var("MOMENTRY_PYTHON_PATH")
|
||||
.unwrap_or_else(|_| "/opt/homebrew/bin/python3.11".to_string());
|
||||
|
||||
let extract_script = Path::new(&scripts_dir).join("extract_face_embedding.py");
|
||||
let face_db = FaceEmbeddingDb::new();
|
||||
|
||||
let mut success = 0usize;
|
||||
for (id, name, uuid, tmdb_id, profile_url) in &rows {
|
||||
tracing::info!("[GenerateSeeds] Processing {} ({})", name, uuid);
|
||||
|
||||
// Download profile image
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap_or_else(|_| reqwest::Client::new());
|
||||
let resp = client.get(profile_url).send().await;
|
||||
let image_bytes = match resp {
|
||||
Ok(r) if r.status().is_success() => r.bytes().await.unwrap_or_default(),
|
||||
_ => {
|
||||
tracing::warn!("[GenerateSeeds] Failed to download: {} from {}", name, profile_url);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if image_bytes.is_empty() {
|
||||
tracing::warn!("[GenerateSeeds] Empty image for {}", name);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Save to temp file
|
||||
let temp_dir = std::env::temp_dir().join("momentry_seed_faces");
|
||||
std::fs::create_dir_all(&temp_dir)?;
|
||||
let temp_img = temp_dir.join(format!("{}.jpg", uuid));
|
||||
std::fs::write(&temp_img, &image_bytes)?;
|
||||
|
||||
// Extract embedding with timeout
|
||||
use tokio::time::timeout;
|
||||
let output = timeout(
|
||||
std::time::Duration::from_secs(180),
|
||||
tokio::process::Command::new(&python_path)
|
||||
.arg(&extract_script)
|
||||
.arg(&temp_img)
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("Extract embedding timed out for {}", name))??;
|
||||
|
||||
let _ = std::fs::remove_file(&temp_img);
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
tracing::warn!(
|
||||
"[GenerateSeeds] Extraction failed for {}: {}",
|
||||
name,
|
||||
stderr.trim()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
let extract_result: serde_json::Value = match serde_json::from_str(&stdout) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
tracing::warn!("[GenerateSeeds] Parse error for {}: {}", name, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let embedding: Vec<f64> = match serde_json::from_value(
|
||||
extract_result.get("embedding").ok_or_else(|| anyhow::anyhow!("No embedding"))?.clone(),
|
||||
) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
tracing::warn!("[GenerateSeeds] Embedding format error for {}: {}", name, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let embedding_f32: Vec<f32> = embedding.into_iter().map(|v| v as f32).collect();
|
||||
|
||||
// Store in Qdrant
|
||||
match face_db
|
||||
.upsert_seed_embedding(uuid, name, *tmdb_id, &embedding_f32)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
success += 1;
|
||||
tracing::info!("[GenerateSeeds] Stored seed for {}", name);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("[GenerateSeeds] Qdrant error for {}: {}", name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"[GenerateSeeds] Done: {}/{} seeds generated",
|
||||
success,
|
||||
rows.len()
|
||||
);
|
||||
Ok(success)
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use axum::{
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Row;
|
||||
use std::process::Command;
|
||||
|
||||
use crate::core::db::ResourceRecord;
|
||||
|
||||
@@ -45,6 +46,10 @@ pub fn identity_routes() -> Router<crate::api::types::AppState> {
|
||||
"/api/v1/identity/:identity_uuid/profile-image",
|
||||
post(upload_profile_image).get(get_profile_image),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/identity/:identity_uuid/profile-image/from-face",
|
||||
post(set_profile_from_face),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/identity/:identity_uuid/status",
|
||||
get(get_identity_status),
|
||||
@@ -1279,6 +1284,163 @@ async fn get_profile_image(
|
||||
Err(StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct SetProfileFromFaceRequest {
|
||||
pub file_uuid: String,
|
||||
pub face_id: Option<String>,
|
||||
pub id: Option<i64>,
|
||||
}
|
||||
|
||||
async fn set_profile_from_face(
|
||||
State(state): State<crate::api::types::AppState>,
|
||||
Path(identity_uuid): Path<String>,
|
||||
Json(req): Json<SetProfileFromFaceRequest>,
|
||||
) -> Result<Json<ProfileImageResponse>, (StatusCode, Json<serde_json::Value>)> {
|
||||
use crate::core::db::schema;
|
||||
let fd_table = schema::table_name("face_detections");
|
||||
let videos_table = schema::table_name("videos");
|
||||
|
||||
let uuid_clean = identity_uuid.replace('-', "");
|
||||
|
||||
let face_identifier = match (&req.face_id, req.id) {
|
||||
(Some(fid), _) => fid.clone(),
|
||||
(None, Some(id)) => id.to_string(),
|
||||
(None, None) => {
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"success": false, "message": "Either face_id or id is required"})),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let use_id_field = req.id.is_some();
|
||||
|
||||
let row: Option<(i64, i32, i32, i32, i32, f64)> = if use_id_field {
|
||||
sqlx::query_as(&format!(
|
||||
"SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND id = $2",
|
||||
fd_table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_as(&format!(
|
||||
"SELECT frame_number, x, y, width, height, confidence FROM {} WHERE file_uuid = $1 AND face_id = $2",
|
||||
fd_table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"success": false, "message": format!("DB error: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let (frame_number, x, y, width, height, confidence) = row.ok_or_else(|| {
|
||||
(
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"success": false, "message": "Face not found"})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let video_row: Option<(String, Option<i32>, Option<i32>)> = sqlx::query_as(&format!(
|
||||
"SELECT file_path, width, height FROM {} WHERE file_uuid = $1",
|
||||
videos_table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"success": false, "message": format!("DB error: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let (file_path, video_width, video_height) = video_row.ok_or_else(|| {
|
||||
(
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"success": false, "message": "Video file not found"})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let vw = video_width.unwrap_or(1920);
|
||||
let vh = video_height.unwrap_or(1080);
|
||||
|
||||
crate::core::thumbnail::validator::validate_crop(x, y, width, height, vw, vh).map_err(|e| {
|
||||
(
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"success": false, "message": format!("Crop validation failed: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let select = format!("select=eq(n\\,{})", frame_number);
|
||||
let vf = format!("{},crop={}:{}:{}:{}", select, width, height, x, y);
|
||||
|
||||
let output = Command::new("ffmpeg")
|
||||
.args([
|
||||
"-i",
|
||||
&file_path,
|
||||
"-vf",
|
||||
&vf,
|
||||
"-frames:v",
|
||||
"1",
|
||||
"-f",
|
||||
"image2pipe",
|
||||
"-vcodec",
|
||||
"mjpeg",
|
||||
"-",
|
||||
])
|
||||
.output()
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"success": false, "message": format!("FFmpeg failed: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"success": false, "message": "FFmpeg extraction failed"})),
|
||||
));
|
||||
}
|
||||
|
||||
crate::core::thumbnail::validator::validate_jpeg(&output.stdout).map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"success": false, "message": format!("JPEG validation failed: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let dir = crate::core::identity::storage::identity_dir(&uuid_clean);
|
||||
std::fs::create_dir_all(&dir).map_err(|e| {
|
||||
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"success": false, "message": format!("Failed to create dir: {}", e)})))
|
||||
})?;
|
||||
|
||||
let file_name = "profile.jpg";
|
||||
let file_path = dir.join(file_name);
|
||||
std::fs::write(&file_path, &output.stdout).map_err(|e| {
|
||||
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"success": false, "message": format!("Failed to write file: {}", e)})))
|
||||
})?;
|
||||
|
||||
let pool = state.db.pool().clone();
|
||||
let uuid_clone = uuid_clean.clone();
|
||||
let _ = crate::core::identity::storage::save_identity_file_by_pool(&pool, &uuid_clone).await;
|
||||
|
||||
Ok(Json(ProfileImageResponse {
|
||||
success: true,
|
||||
identity_uuid: uuid_clean,
|
||||
path: file_path.to_string_lossy().to_string(),
|
||||
message: format!("Profile image set from face {} (frame {}, confidence {:.2})", face_identifier, frame_number, confidence),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn get_identity_json(
|
||||
State(state): State<crate::api::types::AppState>,
|
||||
Path(identity_uuid): Path<String>,
|
||||
|
||||
+447
-62
@@ -93,15 +93,38 @@ pub async fn bind_identity(
|
||||
)
|
||||
})?;
|
||||
|
||||
// Capture old identity_id before bind
|
||||
let old_identity_id: Option<i32> = sqlx::query_scalar(&format!(
|
||||
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&req.face_id)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
let face_identifier = match (&req.face_id, req.id) {
|
||||
(Some(fid), _) => fid.clone(),
|
||||
(None, Some(id)) => id.to_string(),
|
||||
(None, None) => {
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Either face_id or id is required"})),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let use_id_field = req.id.is_some();
|
||||
|
||||
let old_identity_id: Option<i32> = if use_id_field {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
@@ -110,16 +133,27 @@ pub async fn bind_identity(
|
||||
})?
|
||||
.flatten();
|
||||
|
||||
// Direct UPDATE face_detections.identity_id
|
||||
let result = sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND face_id = $3",
|
||||
table
|
||||
))
|
||||
.bind(identity_id)
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&req.face_id)
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
let result = if use_id_field {
|
||||
sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND id = $3",
|
||||
table
|
||||
))
|
||||
.bind(identity_id)
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND face_id = $3",
|
||||
table
|
||||
))
|
||||
.bind(identity_id)
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
@@ -127,6 +161,67 @@ pub async fn bind_identity(
|
||||
)
|
||||
})?;
|
||||
|
||||
let trace_id: Option<i32> = if use_id_field {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND id = $2 LIMIT 1",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND face_id = $2 LIMIT 1",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": e.to_string()})),
|
||||
)
|
||||
})?
|
||||
.flatten();
|
||||
|
||||
// Update Qdrant + TKG if trace_id exists
|
||||
if let Some(tid) = trace_id {
|
||||
// 1. Update Qdrant payload
|
||||
let face_db = crate::core::db::FaceEmbeddingDb::new();
|
||||
if let Err(e) = face_db
|
||||
.update_identity_by_trace(&req.file_uuid, tid, &uuid_clean)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
"[bind] Failed to update Qdrant identity_uuid for trace {}: {}",
|
||||
tid, e
|
||||
);
|
||||
}
|
||||
|
||||
// 2. Update TKG face_track node (dual-field design)
|
||||
let tkg_table = crate::core::db::schema::table_name("tkg_nodes");
|
||||
let ext_id = format!("face_track_{}", tid);
|
||||
let identity_ref = format!("{}:identity_{}", req.file_uuid, identity_id);
|
||||
|
||||
let _ = sqlx::query(&format!(
|
||||
"UPDATE {} SET properties = properties || $1::jsonb - 'stranger_ref' \
|
||||
WHERE file_uuid = $2 AND node_type = 'face_track' AND external_id = $3",
|
||||
tkg_table
|
||||
))
|
||||
.bind(serde_json::json!({
|
||||
"identity_uuid": uuid_clean,
|
||||
"identity_ref": identity_ref
|
||||
}))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&ext_id)
|
||||
.execute(state.db.pool())
|
||||
.await;
|
||||
}
|
||||
|
||||
// Clear bind redo stack
|
||||
let _ = sqlx::query(&format!(
|
||||
"DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')",
|
||||
@@ -144,10 +239,10 @@ pub async fn bind_identity(
|
||||
crate::api::middleware::AuthSource::ApiKey => "api_key",
|
||||
};
|
||||
let before = serde_json::json!({
|
||||
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_before": old_identity_id
|
||||
"file_uuid": req.file_uuid, "face_id": face_identifier, "identity_id_before": old_identity_id
|
||||
});
|
||||
let after = serde_json::json!({
|
||||
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_after": identity_id
|
||||
"file_uuid": req.file_uuid, "face_id": face_identifier, "identity_id_after": identity_id
|
||||
});
|
||||
let _ = sqlx::query(&format!(
|
||||
"INSERT INTO {} (identity_id, operation, before_snapshot, after_snapshot, is_undone, user_id, user_source) VALUES ($1, 'bind', $2, $3, false, $4, $5)",
|
||||
@@ -161,7 +256,6 @@ pub async fn bind_identity(
|
||||
.execute(state.db.pool())
|
||||
.await;
|
||||
|
||||
// Sync identity JSON file
|
||||
if let Err(e) =
|
||||
crate::core::identity::storage::save_identity_file_by_pool(state.db.pool(), &uuid_clean)
|
||||
.await
|
||||
@@ -177,7 +271,7 @@ pub async fn bind_identity(
|
||||
success: true,
|
||||
message: format!(
|
||||
"Bound face {} of {} to {}",
|
||||
req.face_id, req.file_uuid, name
|
||||
face_identifier, req.file_uuid, name
|
||||
),
|
||||
data: Some(serde_json::json!({"rows_affected": result.rows_affected()})),
|
||||
}))
|
||||
@@ -193,15 +287,38 @@ pub async fn unbind_identity(
|
||||
let id_table = crate::core::db::schema::table_name("identities");
|
||||
let history_table = crate::core::db::schema::table_name("identity_history");
|
||||
|
||||
// Capture old identity_id before unbind
|
||||
let old_identity_id: Option<i32> = sqlx::query_scalar(&format!(
|
||||
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&req.face_id)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
let face_identifier = match (&req.face_id, req.id) {
|
||||
(Some(fid), _) => fid.clone(),
|
||||
(None, Some(id)) => id.to_string(),
|
||||
(None, None) => {
|
||||
return Err((
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Either face_id or id is required"})),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let use_id_field = req.id.is_some();
|
||||
|
||||
let old_identity_id: Option<i32> = if use_id_field {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT identity_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
@@ -210,14 +327,25 @@ pub async fn unbind_identity(
|
||||
})?
|
||||
.flatten();
|
||||
|
||||
let result = sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = NULL WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&req.face_id)
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
let result = if use_id_field {
|
||||
sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = NULL WHERE file_uuid = $1 AND id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = NULL WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
@@ -225,15 +353,85 @@ pub async fn unbind_identity(
|
||||
)
|
||||
})?;
|
||||
|
||||
// Phase 2.3: Also update TKG node (find face_track_id first)
|
||||
let trace_id_opt: Option<i32> = sqlx::query_scalar(&format!(
|
||||
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&req.face_id)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
let trace_id: Option<i32> = if use_id_field {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND id = $2 LIMIT 1",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND face_id = $2 LIMIT 1",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": e.to_string()})),
|
||||
)
|
||||
})?
|
||||
.flatten();
|
||||
|
||||
// Clear Qdrant + TKG if trace_id exists
|
||||
if let Some(tid) = trace_id {
|
||||
// 1. Clear Qdrant payload
|
||||
let face_db = crate::core::db::FaceEmbeddingDb::new();
|
||||
if let Err(e) = face_db
|
||||
.clear_identity_by_trace(&req.file_uuid, tid)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(
|
||||
"[unbind] Failed to clear Qdrant identity_uuid for trace {}: {}",
|
||||
tid, e
|
||||
);
|
||||
}
|
||||
|
||||
// 2. Update TKG face_track node (restore stranger_ref)
|
||||
let tkg_table = crate::core::db::schema::table_name("tkg_nodes");
|
||||
let ext_id = format!("face_track_{}", tid);
|
||||
let stranger_ref = format!("{}:stranger_trace_{}", req.file_uuid, tid);
|
||||
|
||||
let _ = sqlx::query(&format!(
|
||||
"UPDATE {} SET properties = properties || $1::jsonb - 'identity_uuid' - 'identity_ref' \
|
||||
WHERE file_uuid = $2 AND node_type = 'face_track' AND external_id = $3",
|
||||
tkg_table
|
||||
))
|
||||
.bind(serde_json::json!({
|
||||
"stranger_ref": stranger_ref
|
||||
}))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&ext_id)
|
||||
.execute(state.db.pool())
|
||||
.await;
|
||||
}
|
||||
|
||||
let trace_id_opt: Option<i32> = if use_id_field {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(req.id.unwrap())
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
} else {
|
||||
sqlx::query_scalar(&format!(
|
||||
"SELECT trace_id FROM {} WHERE file_uuid = $1 AND face_id = $2",
|
||||
table
|
||||
))
|
||||
.bind(&req.file_uuid)
|
||||
.bind(&face_identifier)
|
||||
.fetch_optional(state.db.pool())
|
||||
.await
|
||||
}
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
@@ -251,9 +449,7 @@ pub async fn unbind_identity(
|
||||
.await;
|
||||
}
|
||||
|
||||
// Record history if there was a binding
|
||||
if let Some(identity_id) = old_identity_id {
|
||||
// Clear bind redo stack
|
||||
let _ = sqlx::query(&format!(
|
||||
"DELETE FROM {} WHERE identity_id = $1 AND is_undone = true AND operation IN ('bind','unbind','bind_trace')",
|
||||
history_table
|
||||
@@ -262,7 +458,6 @@ pub async fn unbind_identity(
|
||||
.execute(state.db.pool())
|
||||
.await;
|
||||
|
||||
// Insert history record
|
||||
let uid = auth.user_id.to_string();
|
||||
let usrc = match auth.source {
|
||||
crate::api::middleware::AuthSource::Jwt => "jwt",
|
||||
@@ -270,10 +465,10 @@ pub async fn unbind_identity(
|
||||
crate::api::middleware::AuthSource::ApiKey => "api_key",
|
||||
};
|
||||
let before = serde_json::json!({
|
||||
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_before": old_identity_id
|
||||
"file_uuid": req.file_uuid, "face_id": face_identifier, "identity_id_before": old_identity_id
|
||||
});
|
||||
let after = serde_json::json!({
|
||||
"file_uuid": req.file_uuid, "face_id": req.face_id, "identity_id_after": null
|
||||
"file_uuid": req.file_uuid, "face_id": face_identifier, "identity_id_after": null
|
||||
});
|
||||
let _ = sqlx::query(&format!(
|
||||
"INSERT INTO {} (identity_id, operation, before_snapshot, after_snapshot, is_undone, user_id, user_source) VALUES ($1, 'unbind', $2, $3, false, $4, $5)",
|
||||
@@ -315,7 +510,7 @@ pub async fn unbind_identity(
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
success: true,
|
||||
message: format!("Unbound face {} from {}", req.face_id, req.file_uuid),
|
||||
message: format!("Unbound face {} from {}", face_identifier, req.file_uuid),
|
||||
data: Some(serde_json::json!({"rows_affected": result.rows_affected()})),
|
||||
}))
|
||||
}
|
||||
@@ -933,14 +1128,14 @@ pub async fn get_identity_traces(
|
||||
COUNT(*)::bigint AS frame_count,
|
||||
MIN(fd.frame_number)::int AS first_frame,
|
||||
MAX(fd.frame_number)::int AS last_frame,
|
||||
ROUND(MIN(fd.frame_number)::numeric / NULLIF(v.fps, 0)::numeric, 1)::float8 AS first_sec,
|
||||
ROUND(MAX(fd.frame_number)::numeric / NULLIF(v.fps, 0)::numeric, 1)::float8 AS last_sec,
|
||||
COALESCE(ROUND(MIN(fd.frame_number)::numeric / NULLIF(v.fps, 0)::numeric, 1), 0)::float8 AS first_sec,
|
||||
COALESCE(ROUND(MAX(fd.frame_number)::numeric / NULLIF(v.fps, 0)::numeric, 1), 0)::float8 AS last_sec,
|
||||
ROUND(AVG(fd.confidence)::numeric, 4)::float8 AS avg_confidence
|
||||
FROM {} fd
|
||||
LEFT JOIN dev.videos v ON fd.file_uuid = v.file_uuid
|
||||
WHERE fd.identity_id = $1
|
||||
GROUP BY trace_id, v.fps
|
||||
ORDER BY trace_id
|
||||
LEFT JOIN videos v ON fd.file_uuid = v.file_uuid
|
||||
WHERE fd.identity_id = $1 AND fd.trace_id IS NOT NULL
|
||||
GROUP BY fd.file_uuid, fd.trace_id, v.fps
|
||||
ORDER BY fd.trace_id
|
||||
LIMIT $2 OFFSET $3"#,
|
||||
fd_table
|
||||
))
|
||||
@@ -953,7 +1148,7 @@ pub async fn get_identity_traces(
|
||||
|
||||
// Get total count for pagination
|
||||
let total: (i64,) = sqlx::query_as(&format!(
|
||||
"SELECT COUNT(*) FROM (SELECT 1 FROM {} fd WHERE trace_id) sub",
|
||||
"SELECT COUNT(*) FROM (SELECT 1 FROM {} fd WHERE fd.identity_id = $1 AND fd.trace_id IS NOT NULL GROUP BY fd.trace_id) sub",
|
||||
fd_table
|
||||
))
|
||||
.bind(identity_id)
|
||||
@@ -1864,6 +2059,188 @@ pub async fn bind_history(
|
||||
}))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Pending Person API (file-scoped)
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CreatePendingPersonRequest {
|
||||
#[serde(default)]
|
||||
pub trace_ids: Vec<i32>,
|
||||
pub name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PendingPersonItem {
|
||||
pub identity_uuid: String,
|
||||
pub identity_id: i32,
|
||||
pub name: String,
|
||||
pub created_at: String,
|
||||
pub trace_count: i64,
|
||||
pub bound_traces: Option<Vec<i32>>,
|
||||
}
|
||||
|
||||
/// Create a pending person under a file, optionally binding traces.
|
||||
pub async fn create_pending_person(
|
||||
State(state): State<crate::api::types::AppState>,
|
||||
Extension(_auth): Extension<crate::api::middleware::UserAuth>,
|
||||
Path(file_uuid): Path<String>,
|
||||
Json(req): Json<CreatePendingPersonRequest>,
|
||||
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, Json<serde_json::Value>)> {
|
||||
let id_table = crate::core::db::schema::table_name("identities");
|
||||
let fd_table = crate::core::db::schema::table_name("face_detections");
|
||||
let nodes_table = crate::core::db::schema::table_name("tkg_nodes");
|
||||
|
||||
// Auto-generate name if not provided
|
||||
let name = if let Some(n) = &req.name {
|
||||
n.clone()
|
||||
} else {
|
||||
let count: i64 = sqlx::query_scalar(&format!(
|
||||
"SELECT COUNT(*) FROM {} WHERE file_uuid = $1 AND status = 'pending'",
|
||||
id_table
|
||||
))
|
||||
.bind(&file_uuid)
|
||||
.fetch_one(state.db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": e.to_string()})),
|
||||
)
|
||||
})?;
|
||||
format!("Person {}", count + 1)
|
||||
};
|
||||
|
||||
// Create identity with pending status
|
||||
let identity_row: (i32, String) = sqlx::query_as(&format!(
|
||||
"INSERT INTO {} (name, identity_type, source, status, file_uuid) VALUES ($1, 'people', 'manual', 'pending', $2) RETURNING id, uuid::text",
|
||||
id_table
|
||||
))
|
||||
.bind(&name)
|
||||
.bind(&file_uuid)
|
||||
.fetch_one(state.db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to create identity: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let (identity_id, identity_uuid): (i32, String) = identity_row;
|
||||
|
||||
// Bind traces if provided
|
||||
let bound_traces = if !req.trace_ids.is_empty() {
|
||||
// Update face_detections
|
||||
let _ = sqlx::query(&format!(
|
||||
"UPDATE {} SET identity_id = $1 WHERE file_uuid = $2 AND trace_id = ANY($3)",
|
||||
fd_table
|
||||
))
|
||||
.bind(identity_id)
|
||||
.bind(&file_uuid)
|
||||
.bind(&req.trace_ids)
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to bind traces: {}", e)})),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Update TKG nodes
|
||||
for &tid in &req.trace_ids {
|
||||
let external_id = format!("face_track_{}", tid);
|
||||
let _ = sqlx::query(&format!(
|
||||
"UPDATE {} SET properties = jsonb_set(\
|
||||
jsonb_set(properties, '{{identity_id}}', $1::jsonb, false),\
|
||||
'{{identity_name}}', $2::jsonb, false)\
|
||||
WHERE file_uuid = $3 AND node_type = 'face_track' AND external_id = $4",
|
||||
nodes_table
|
||||
))
|
||||
.bind(identity_id)
|
||||
.bind(&name)
|
||||
.bind(&file_uuid)
|
||||
.bind(&external_id)
|
||||
.execute(state.db.pool())
|
||||
.await;
|
||||
}
|
||||
Some(req.trace_ids.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Sync identity file
|
||||
let _ = crate::core::identity::storage::save_identity_file_by_pool(
|
||||
state.db.pool(),
|
||||
&identity_uuid,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
success: true,
|
||||
message: format!("Created pending person: {} (uuid: {})", name, identity_uuid),
|
||||
data: Some(serde_json::json!({
|
||||
"identity_uuid": identity_uuid,
|
||||
"identity_id": identity_id,
|
||||
"name": name,
|
||||
"bound_traces": bound_traces.map(|v| v.len()).unwrap_or(0),
|
||||
})),
|
||||
}))
|
||||
}
|
||||
|
||||
/// List pending persons for a file.
|
||||
pub async fn list_pending_persons(
|
||||
State(state): State<crate::api::types::AppState>,
|
||||
Extension(_auth): Extension<crate::api::middleware::UserAuth>,
|
||||
Path(file_uuid): Path<String>,
|
||||
) -> Result<Json<ApiResponse<Vec<PendingPersonItem>>>, (StatusCode, Json<serde_json::Value>)> {
|
||||
let id_table = crate::core::db::schema::table_name("identities");
|
||||
let fd_table = crate::core::db::schema::table_name("face_detections");
|
||||
|
||||
let rows: Vec<(i32, String, String, chrono::NaiveDateTime)> = sqlx::query_as(&format!(
|
||||
"SELECT id, uuid::text, name, created_at FROM {} WHERE file_uuid = $1 AND status = 'pending' ORDER BY created_at DESC",
|
||||
id_table
|
||||
))
|
||||
.bind(&file_uuid)
|
||||
.fetch_all(state.db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": e.to_string()})),
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut items = Vec::new();
|
||||
for (id, uuid, name, created_at) in rows {
|
||||
let trace_count: i64 = sqlx::query_scalar(&format!(
|
||||
"SELECT COUNT(DISTINCT trace_id) FROM {} WHERE identity_id = $1 AND file_uuid = $2",
|
||||
fd_table
|
||||
))
|
||||
.bind(id)
|
||||
.bind(&file_uuid)
|
||||
.fetch_one(state.db.pool())
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
|
||||
items.push(PendingPersonItem {
|
||||
identity_uuid: uuid,
|
||||
identity_id: id,
|
||||
name,
|
||||
created_at: created_at.format("%Y-%m-%d %H:%M:%S").to_string(),
|
||||
trace_count,
|
||||
bound_traces: None,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
success: true,
|
||||
message: format!("Found {} pending persons for {}", items.len(), file_uuid),
|
||||
data: Some(items),
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn identity_binding_routes() -> Router<crate::api::types::AppState> {
|
||||
Router::new()
|
||||
.route("/api/v1/identity/:identity_uuid/bind", post(bind_identity))
|
||||
@@ -1892,4 +2269,12 @@ pub fn identity_binding_routes() -> Router<crate::api::types::AppState> {
|
||||
.route("/api/v1/identity/merge/:merge_id/undo", post(undo_merge))
|
||||
.route("/api/v1/identity/merge/:merge_id/redo", post(redo_merge))
|
||||
.route("/api/v1/identity/merge/history", get(get_merge_history))
|
||||
.route(
|
||||
"/api/v1/file/:file_uuid/pending-person",
|
||||
post(create_pending_person),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/file/:file_uuid/pending-persons",
|
||||
get(list_pending_persons),
|
||||
)
|
||||
}
|
||||
|
||||
+64
-22
@@ -59,6 +59,7 @@ struct JobDetailResponse {
|
||||
created_at: String,
|
||||
started_at: Option<String>,
|
||||
updated_at: Option<String>,
|
||||
queue_position: Option<i32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -286,6 +287,31 @@ async fn trigger_processing(
|
||||
tracing::error!("[TRIGGER] Failed to update monitor job for {}: {}", file_uuid, e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
// Update videos.processing_status to PROCESSING immediately
|
||||
let processor_names_upper: Vec<String> = processors_to_run.iter().map(|p| p.to_uppercase()).collect();
|
||||
let progress: serde_json::Map<String, serde_json::Value> = processors_to_run.iter().map(|p| {
|
||||
(p.to_uppercase(), serde_json::json!({
|
||||
"current_frame": 0, "total_frames": 0, "percentage": 0, "status": "pending"
|
||||
}))
|
||||
}).collect();
|
||||
let status = serde_json::json!({
|
||||
"phase": "PROCESSING",
|
||||
"active_processors": processor_names_upper,
|
||||
"total_frames": 0,
|
||||
"progress": progress
|
||||
});
|
||||
sqlx::query(&format!(
|
||||
"UPDATE {videos_table} SET processing_status = $1, updated_at = CURRENT_TIMESTAMP WHERE file_uuid = $2"
|
||||
))
|
||||
.bind(&status)
|
||||
.bind(&file_uuid)
|
||||
.execute(state.db.pool())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("[TRIGGER] Failed to update processing status for {}: {}", file_uuid, e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
let processors_to_run_refs: Vec<&str> = processors_to_run.iter().map(|s| s.as_str()).collect();
|
||||
|
||||
@@ -531,6 +557,21 @@ async fn get_job(Path(uuid): Path<String>) -> Result<Json<JobDetailResponse>, St
|
||||
started_at,
|
||||
updated_at,
|
||||
) = job.ok_or(StatusCode::NOT_FOUND)?;
|
||||
|
||||
// Calculate queue position if status is 'pending'
|
||||
let queue_position = if status == "pending" {
|
||||
sqlx::query_scalar::<_, i64>(&format!(
|
||||
"SELECT COUNT(*) + 1 FROM {} WHERE status = 'pending' AND created_at < (SELECT created_at FROM {} WHERE uuid = $1)",
|
||||
jobs_table, jobs_table
|
||||
))
|
||||
.bind(&uuid)
|
||||
.fetch_one(pg.pool())
|
||||
.await
|
||||
.ok()
|
||||
.map(|pos| pos as i32)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Json(JobDetailResponse {
|
||||
id,
|
||||
@@ -543,6 +584,7 @@ async fn get_job(Path(uuid): Path<String>) -> Result<Json<JobDetailResponse>, St
|
||||
created_at,
|
||||
started_at,
|
||||
updated_at,
|
||||
queue_position,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -655,28 +697,27 @@ async fn get_processor_counts(
|
||||
}
|
||||
|
||||
if let Ok(content) = std::fs::read_to_string(&json_path) {
|
||||
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&content) {
|
||||
// CUT: prioritize scenes count over frame_count
|
||||
if proc_name == "cut" {
|
||||
frame_count = json
|
||||
.get("scenes")
|
||||
.and_then(|v| v.as_array())
|
||||
.map(|arr| arr.len() as u32);
|
||||
} else {
|
||||
// Standard frame_count field
|
||||
frame_count = json
|
||||
.get("frame_count")
|
||||
.and_then(|v| v.as_u64())
|
||||
.map(|v| v as u32);
|
||||
|
||||
// YOLO: frames array
|
||||
if frame_count.is_none() {
|
||||
frame_count = json
|
||||
.get("frames")
|
||||
.and_then(|v| v.as_array())
|
||||
.map(|arr| arr.len() as u32);
|
||||
}
|
||||
}
|
||||
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&content) {
|
||||
// CUT: prioritize scenes count over frame_count
|
||||
if proc_name == "cut" {
|
||||
frame_count = json
|
||||
.get("scenes")
|
||||
.and_then(|v| v.as_array())
|
||||
.map(|arr| arr.len() as u32);
|
||||
} else if proc_name == "yolo" {
|
||||
// YOLO: use metadata.total_frames (avoids parsing huge frames array)
|
||||
frame_count = json
|
||||
.get("metadata")
|
||||
.and_then(|m| m.get("total_frames"))
|
||||
.and_then(|v| v.as_u64())
|
||||
.map(|v| v as u32);
|
||||
} else {
|
||||
// Standard frame_count field
|
||||
frame_count = json
|
||||
.get("frame_count")
|
||||
.and_then(|v| v.as_u64())
|
||||
.map(|v| v as u32);
|
||||
}
|
||||
|
||||
segment_count = json
|
||||
.get("segments")
|
||||
@@ -738,6 +779,7 @@ pub fn processing_routes() -> Router<AppState> {
|
||||
)
|
||||
.route("/api/v1/progress/:file_uuid", post(get_progress))
|
||||
.route("/api/v1/jobs", post(list_jobs))
|
||||
.route("/api/v1/job/:uuid", get(get_job))
|
||||
.route("/api/v1/config/cache", post(cache_toggle))
|
||||
.route("/api/v1/config/auto-pipeline", post(auto_pipeline_toggle))
|
||||
.route(
|
||||
|
||||
Reference in New Issue
Block a user