Files
momentry_core/src/api/search.rs
T
Accusys 2cfcfdd1af feat: Phase 2.6 edges migration to Qdrant (TKG-only architecture)
Phase 2.6.1: co_occurrence_edges migration
- build_co_occurrence_edges_from_qdrant()
- Qdrant embeddings → frame grouping → YOLO objects
- Result: 6679 edges (vs 6701 PostgreSQL)

Phase 2.6.2: face_face_edges migration
- build_face_face_edges_from_qdrant()
- Qdrant embeddings → frame grouping → face pairs
- mutual_gaze detection preserved
- Result: 6 edges (exact match)

Phase 2.6.3: speaker_face_edges migration
- build_speaker_face_edges_from_qdrant()
- Qdrant embeddings → trace_id frame ranges
- SPEAKS_AS edge creation

Architecture:
- All edges use Qdrant payload (no face_detections queries)
- PostgreSQL fallback for empty Qdrant
- Estimated 3.6x performance improvement

Testing:
- Playground (3003): ✓ All Phase 2.6 logs verified
- Edge counts: ✓ Close match with PostgreSQL
- Fallback: ✓ Working

Docs:
- docs_v1.0/DESIGN/TKG_PHASE2_6_EDGES_MIGRATION.md
- docs_v1.0/M4_workspace/2026-06-21_phase2_6_test.md
2026-06-21 04:47:49 +08:00

510 lines
17 KiB
Rust

//! Smart Search API
//! Hybrid search: semantic (Qdrant) + keyword (PG ILIKE) + identity (person name → chunks).
//! Uses Reciprocal Rank Fusion (RRF) to merge and deduplicate results.
use axum::{extract::State, http::StatusCode, response::Json, routing::post, Router};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::core::db::postgres_db::SemanticSearchResult;
use crate::core::embedding::Embedder;
// --- Request / Response Structures ---
#[derive(Debug, Deserialize)]
pub struct SmartSearchRequest {
#[serde(default)]
pub file_uuid: Option<String>,
pub query: String,
pub page: Option<usize>,
pub page_size: Option<usize>,
pub limit: Option<usize>,
}
#[derive(Debug, Clone, Serialize)]
pub struct SearchResult {
pub id: i32,
pub file_uuid: Option<String>,
pub parent_id: i32,
pub scene_order: Option<i32>,
pub start_frame: i64,
pub end_frame: i64,
pub fps: f64,
pub start_time: f64,
pub end_time: f64,
pub raw_text: Option<String>,
pub summary: Option<String>,
pub metadata: Option<serde_json::Value>,
pub similarity: Option<f64>,
pub file_name: Option<String>,
pub serve_url: Option<String>,
pub thumbnail_url: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct SmartSearchResponse {
pub query: String,
pub results: Vec<SearchResult>,
pub page: usize,
pub page_size: usize,
pub strategy: String,
}
/// Internal merged result with score-based merge
#[derive(Debug)]
struct MergedResult {
file_uuid: String,
chunk_id: String,
score: f64,
semantic_score: Option<f64>,
keyword_score: Option<f64>,
identity_score: Option<f64>,
source: String,
}
/// Enrich a Qdrant search result with full data from PostgreSQL
async fn enrich_from_pg(
db: &crate::core::db::PostgresDb,
file_uuid: &str,
chunk_id: &str,
qdrant_score: f32,
) -> Option<SearchResult> {
match db.get_chunk_by_file_and_chunk_id(file_uuid, chunk_id).await {
Ok(Some(p)) => Some(SearchResult {
id: 0,
file_uuid: p.file_uuid.clone(),
parent_id: p.scene_order,
scene_order: Some(p.scene_order),
start_frame: p.start_frame,
end_frame: p.end_frame,
fps: p.fps,
start_time: p.start_time,
end_time: p.end_time,
raw_text: None,
summary: Some(p.summary),
metadata: p.metadata.clone(),
similarity: Some(qdrant_score as f64),
file_name: None,
serve_url: None,
thumbnail_url: None,
}),
Ok(None) => None,
Err(e) => {
tracing::warn!("PG enrichment failed for {} {}: {}", file_uuid, chunk_id, e);
None
}
}
}
fn pg_result_to_search(p: &SemanticSearchResult) -> SearchResult {
SearchResult {
id: 0,
file_uuid: p.file_uuid.clone(),
parent_id: p.scene_order,
scene_order: Some(p.scene_order),
start_frame: p.start_frame,
end_frame: p.end_frame,
fps: p.fps,
start_time: p.start_time,
end_time: p.end_time,
raw_text: None,
summary: Some(p.summary.clone()),
metadata: p.metadata.clone(),
similarity: p.similarity,
file_name: None,
serve_url: None,
thumbnail_url: None,
}
}
// --- API Handler ---
pub async fn smart_search(
State(state): State<crate::api::types::AppState>,
Json(req): Json<SmartSearchRequest>,
) -> Result<Json<SmartSearchResponse>, (StatusCode, Json<serde_json::Value>)> {
let db = &state.db;
let qdrant = &state.qdrant;
let page = req.page.unwrap_or(1).max(1);
let page_size = if req.page_size.is_some() {
req.page_size.unwrap()
} else if req.limit.is_some() && req.page.is_none() {
req.limit.unwrap()
} else {
5
}
.max(1);
let hard_limit = req.limit.unwrap_or(usize::MAX);
let limit = hard_limit.min(page_size);
// 1. Generate embedding
let embedder = Embedder::new("embeddinggemma-300m".to_string());
let embedding = embedder.embed_query(&req.query).await.map_err(
|e| -> (StatusCode, Json<serde_json::Value>) {
tracing::error!("Embedding failed: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({ "error": e.to_string() })),
)
},
)?;
const KEYWORD_FIXED_SCORE: f64 = 0.5;
const IDENTITY_FIXED_SCORE: f64 = 0.85;
let fetch_limit = limit * 3;
// 2. Semantic search via Qdrant
let semantic_results: Vec<(String, String, f64)> = if let Some(file_uuid) = &req.file_uuid {
let qdrant_hits = qdrant
.search_in_uuid(&embedding, file_uuid, fetch_limit)
.await
.unwrap_or_default();
tracing::info!(
"Smart search: Qdrant search_in_uuid for {} returned {} hits",
file_uuid,
qdrant_hits.len()
);
qdrant_hits
.into_iter()
.map(|h| (h.uuid, h.chunk_id, h.score as f64))
.collect()
} else {
let qdrant_hits = qdrant
.search(&embedding, fetch_limit)
.await
.unwrap_or_default();
tracing::info!(
"Smart search: Qdrant search (no uuid filter) returned {} hits",
qdrant_hits.len()
);
qdrant_hits
.into_iter()
.map(|h| (h.uuid, h.chunk_id, h.score as f64))
.collect()
};
// 3. Keyword search via PG ILIKE
let keyword_results: Vec<(String, String, f64)> = match db
.search_bm25(&req.query, req.file_uuid.as_deref(), fetch_limit as i64)
.await
{
Ok(rows) => rows
.into_iter()
.map(|r| (r.file_uuid, r.chunk_id, r.combined_score))
.collect(),
Err(e) => {
tracing::warn!("Keyword search (bm25) failed: {}", e);
vec![]
}
};
// 3b. Video title search: if query matches a video title, get its chunks
const TITLE_MATCH_SCORE: f64 = 0.9;
let title_results: Vec<(String, String, f64)> = {
let clean_query = req.query.replace('\'', "''");
let v_table = crate::core::db::schema::table_name("videos");
let c_table = crate::core::db::schema::table_name("chunk");
let video_rows: Vec<(String,)> = sqlx::query_as(&format!(
"SELECT file_uuid::text FROM {} WHERE file_name ILIKE $1 LIMIT 5",
v_table
))
.bind(format!("%{}%", clean_query))
.fetch_all(db.pool())
.await
.unwrap_or_default();
let mut chunks = Vec::new();
for (fu,) in video_rows.iter() {
if let Some(ref f) = req.file_uuid {
if fu != f {
continue;
}
}
let rows: Vec<(String, String)> = sqlx::query_as(&format!(
"SELECT chunk_id, file_uuid::text FROM {} \
WHERE file_uuid = $1 AND embedding IS NOT NULL \
AND chunk_type = 'sentence' \
LIMIT 20",
c_table
))
.bind(fu)
.fetch_all(db.pool())
.await
.unwrap_or_default();
for (cid, file_uuid) in rows {
chunks.push((file_uuid, cid, TITLE_MATCH_SCORE));
}
}
chunks
};
// 4. Identity search: if query matches a person name, get their chunks
let identity_results: Vec<(String, String, f64)> = {
let id_table = crate::core::db::schema::table_name("identities");
let clean_query = req.query.replace('\'', "''");
let id_rows: Vec<(i32, String, String)> = sqlx::query_as(&format!(
"SELECT id, name, uuid::text FROM {} WHERE name ILIKE $1 LIMIT 5",
id_table
))
.bind(format!("%{}%", clean_query))
.fetch_all(db.pool())
.await
.unwrap_or_default();
let mut id_chunks = Vec::new();
for (identity_id, _, uuid_text) in id_rows.iter().take(3) {
let clean_uuid = uuid_text.replace('-', "");
match db.get_identity_chunks(&clean_uuid, 20, 0).await {
Ok(chunks) => {
for chunk in chunks {
if let Some(ref fu) = req.file_uuid {
if &chunk.file_uuid != fu {
continue;
}
}
id_chunks.push((chunk.file_uuid, chunk.chunk_id, 0.85));
}
}
Err(e) => {
tracing::debug!("get_identity_chunks for {} failed: {}", clean_uuid, e);
}
}
}
id_chunks
};
// 5. Score-based merge: combine results from all sources
let mut merged: HashMap<(String, String), MergedResult> = HashMap::new();
// Add semantic results (use Qdrant cosine score directly)
for (file_uuid, chunk_id, score) in semantic_results.iter() {
let key = (file_uuid.clone(), chunk_id.clone());
merged
.entry(key)
.and_modify(|e| {
e.score = e.score.max(*score);
e.semantic_score = Some(*score);
e.source = format!(
"{}_{}",
e.source.strip_prefix("semantic+").unwrap_or(&e.source),
"semantic"
);
})
.or_insert(MergedResult {
file_uuid: file_uuid.clone(),
chunk_id: chunk_id.clone(),
score: *score,
semantic_score: Some(*score),
keyword_score: None,
identity_score: None,
source: "semantic".to_string(),
});
}
// Add keyword results (fixed score 0.5)
let keyword_fixed = KEYWORD_FIXED_SCORE;
for (file_uuid, chunk_id, _) in keyword_results.iter() {
let key = (file_uuid.clone(), chunk_id.clone());
merged
.entry(key)
.and_modify(|e| {
e.score = e.score.max(keyword_fixed);
e.keyword_score = Some(keyword_fixed);
e.source = format!("{}_keyword", e.source);
})
.or_insert(MergedResult {
file_uuid: file_uuid.clone(),
chunk_id: chunk_id.clone(),
score: keyword_fixed,
semantic_score: None,
keyword_score: Some(keyword_fixed),
identity_score: None,
source: "keyword".to_string(),
});
}
// Add title match results (high score 0.9) — query matched video title
let has_title_match = !title_results.is_empty();
let title_fixed = TITLE_MATCH_SCORE;
for (file_uuid, chunk_id, _) in title_results.iter() {
let key = (file_uuid.clone(), chunk_id.clone());
merged
.entry(key)
.and_modify(|e| {
e.score = e.score.max(title_fixed);
e.source = format!("{}_title", e.source);
})
.or_insert(MergedResult {
file_uuid: file_uuid.clone(),
chunk_id: chunk_id.clone(),
score: title_fixed,
semantic_score: None,
keyword_score: None,
identity_score: None,
source: "title".to_string(),
});
}
// Add identity results (fixed score 0.85)
let has_identity_match = !identity_results.is_empty();
let identity_fixed = IDENTITY_FIXED_SCORE;
for (file_uuid, chunk_id, _) in identity_results.iter() {
let key = (file_uuid.clone(), chunk_id.clone());
merged
.entry(key)
.and_modify(|e| {
e.score = e.score.max(identity_fixed);
e.identity_score = Some(identity_fixed);
e.source = format!("{}_identity", e.source);
})
.or_insert(MergedResult {
file_uuid: file_uuid.clone(),
chunk_id: chunk_id.clone(),
score: identity_fixed,
semantic_score: None,
keyword_score: None,
identity_score: Some(identity_fixed),
source: "identity".to_string(),
});
}
// Sort by score descending (score-based merge)
let mut ranked: Vec<&MergedResult> = merged.values().collect();
ranked.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
// 6. Enrich top results from PG and build final response
let query_lower = req.query.to_lowercase();
let mut final_results = Vec::new();
for mr in ranked.iter().take(limit * 3) {
// 取更多結果以便過濾
if let Some(pg) = db
.get_chunk_by_file_and_chunk_id(&mr.file_uuid, &mr.chunk_id)
.await
.ok()
.flatten()
{
// 關鍵字過濾: CJK 用子字串匹配,英文用單詞邊界匹配
let summary_lower = pg.summary.to_lowercase();
let query_words: Vec<String> = query_lower
.split_whitespace()
.map(|s| s.to_string())
.collect();
let text_match = !pg.summary.is_empty() && {
let has_cjk = |s: &str| -> bool {
s.chars().any(|c| {
('\u{4E00}'..='\u{9FFF}').contains(&c)
|| ('\u{3040}'..='\u{309F}').contains(&c)
|| ('\u{30A0}'..='\u{30FF}').contains(&c)
|| ('\u{AC00}'..='\u{D7AF}').contains(&c)
})
};
if has_cjk(&query_lower) || has_cjk(&summary_lower) {
query_words.iter().all(|w| summary_lower.contains(w))
} else {
let bordered = format!(" {} ", summary_lower);
query_words
.iter()
.all(|w| bordered.contains(&format!(" {} ", w)))
}
};
if !text_match && mr.semantic_score.is_none() {
continue;
}
final_results.push(SearchResult {
id: 0,
file_uuid: pg.file_uuid.clone(),
parent_id: pg.scene_order,
scene_order: Some(pg.scene_order),
start_frame: pg.start_frame,
end_frame: pg.end_frame,
fps: pg.fps,
start_time: pg.start_time,
end_time: pg.end_time,
raw_text: None,
summary: Some(pg.summary),
metadata: pg.metadata.clone(),
similarity: Some(mr.score),
file_name: None,
serve_url: None,
thumbnail_url: pg.file_uuid.as_ref().map(|fu| {
format!(
"/wp-json/momentry/v1/media?type=chunk_thumbnail&file_uuid={}&chunk_id={}",
fu, mr.chunk_id
)
}),
});
}
}
// Trim to requested limit
final_results.truncate(limit);
// 7. Enrich results with file_name and serve_url from videos table
if !final_results.is_empty() {
let v_table = crate::core::db::schema::table_name("videos");
let file_uuids: Vec<String> = final_results
.iter()
.filter_map(|r| r.file_uuid.clone())
.collect();
let file_rows: Vec<(String, String, String)> = sqlx::query_as(&format!(
"SELECT file_uuid::text, file_name, file_path FROM {} WHERE file_uuid = ANY($1)",
v_table
))
.bind(&file_uuids)
.fetch_all(db.pool())
.await
.unwrap_or_default();
let file_map: std::collections::HashMap<String, (String, String)> = file_rows
.into_iter()
.map(|(uuid, name, path)| (uuid, (name, path)))
.collect();
let storage_root = crate::core::config::STORAGE_ROOT.as_str();
let serve_base = crate::core::config::SERVE_BASE_URL.as_str();
for r in &mut final_results {
if let Some(ref uuid) = r.file_uuid {
if let Some((name, path)) = file_map.get(uuid) {
r.file_name = Some(name.clone());
if let Some(relative) = path.strip_prefix(storage_root) {
r.serve_url = Some(format!("{}{}", serve_base, relative));
}
}
}
}
}
// Determine strategy string
let mut strategies = vec!["semantic"];
if !keyword_results.is_empty() {
strategies.push("keyword");
}
if has_identity_match {
strategies.push("identity");
}
if has_title_match {
strategies.push("title");
}
Ok(Json(SmartSearchResponse {
query: req.query,
results: final_results,
page,
page_size,
strategy: format!("hybrid_{}", strategies.join("+")),
}))
}
// --- Router Setup ---
pub fn search_routes() -> Router<crate::api::types::AppState> {
Router::new().route("/api/v1/search/smart", post(smart_search))
}