feat: Phase 3 API (Identity, Files, Candidates) and pre_chunks migration

This commit is contained in:
Warren
2026-04-25 22:19:12 +08:00
parent 1f84e5469f
commit e84982e7d9
5 changed files with 454 additions and 0 deletions

View File

@@ -0,0 +1,47 @@
-- ============================================================================
-- Migration 017: Create pre_chunks table (Processor Output)
-- ============================================================================
-- Purpose:
-- 1. Move raw processor outputs (YOLO frames, Face detections, etc.)
-- from 'chunks' table to a dedicated 'pre_chunks' table.
-- 2. Support coordinate_type (frame for video, text for audio, etc.)
-- to allow future expansion for PDF/Audio files.
-- 3. Support Identity linking directly on pre_chunks (Face -> Identity).
-- ============================================================================
-- 0. Clean up existing conflicting table (if any)
DROP TABLE IF EXISTS pre_chunks CASCADE;
-- 1. Create pre_chunks table
CREATE TABLE pre_chunks (
id BIGSERIAL PRIMARY KEY,
file_uuid UUID NOT NULL,
processor_type VARCHAR(32) NOT NULL, -- 'yolo', 'face', 'asr', 'ocr', 'pose'...
-- Coordinate system (supports Video, Audio, Text...)
coordinate_type VARCHAR(20) DEFAULT 'frame', -- 'frame', 'time', 'page'
coordinate_index BIGINT NOT NULL, -- Frame number, or paragraph index
timestamp FLOAT, -- Time in seconds
data JSONB NOT NULL, -- Raw processor output (objects, bboxes, etc.)
-- Identity linkage (Face -> Identity, or Speaker -> Identity)
-- If NULL, this Face/Speaker is a "Candidate"
-- Note: FK removed temporarily due to schema migration in progress
identity_id UUID,
confidence FLOAT, -- Match confidence
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- 2. Indexes
CREATE INDEX idx_pre_chunks_file ON pre_chunks(file_uuid);
CREATE INDEX idx_pre_chunks_processor ON pre_chunks(processor_type);
CREATE INDEX idx_pre_chunks_identity ON pre_chunks(identity_id);
CREATE INDEX idx_pre_chunks_coord ON pre_chunks(file_uuid, processor_type, coordinate_index);
-- 3. Comment
COMMENT ON TABLE pre_chunks IS 'Raw output from Processors (Frames, Segments). Candidates are rows where identity_id IS NULL.';
COMMENT ON COLUMN pre_chunks.coordinate_type IS 'Coordinate unit: frame (Video), time (Audio), page (PDF)...';

231
src/api/identity_api.rs Normal file
View File

@@ -0,0 +1,231 @@
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::core::db::{Database, PostgresDb};
pub fn identity_routes() -> Router<crate::api::server::AppState> {
Router::new()
.route("/api/v1/people", get(list_people))
.route("/api/v1/people/search", post(search_people))
.route("/api/v1/people/candidates", get(list_candidates))
.route("/api/v1/files", get(list_files))
.route("/api/v1/files/{uuid}", get(get_file_detail))
}
// --- People / Identity Endpoints ---
#[derive(Debug, Deserialize)]
pub struct PeopleQuery {
page: Option<usize>,
page_size: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct PeopleResponse {
pub success: bool,
pub total: i64,
pub page: usize,
pub page_size: usize,
pub data: Vec<PeopleItem>,
}
#[derive(Debug, Serialize)]
pub struct PeopleItem {
pub identity_id: Uuid,
pub name: String,
pub metadata: serde_json::Value,
pub created_at: Option<chrono::DateTime<chrono::Utc>>,
}
async fn list_people(
State(state): State<crate::api::server::AppState>,
Query(params): Query<PeopleQuery>,
) -> Result<Json<PeopleResponse>, (StatusCode, String)> {
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let offset = ((page - 1) as i64) * (page_size as i64);
let records = state.db.list_people(page_size as i32, offset).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
// TODO: Get total count
let total = 100; // Placeholder
let data = records.into_iter().map(|r| PeopleItem {
identity_id: r.uuid,
name: r.name,
metadata: r.metadata,
created_at: r.created_at,
}).collect();
Ok(Json(PeopleResponse {
success: true,
total,
page,
page_size,
data,
}))
}
#[derive(Debug, Deserialize)]
pub struct SearchPeopleRequest {
pub query: String,
pub page: Option<usize>,
pub page_size: Option<usize>,
}
async fn search_people(
State(state): State<crate::api::server::AppState>,
Json(req): Json<SearchPeopleRequest>,
) -> Result<Json<PeopleResponse>, (StatusCode, String)> {
let page = req.page.unwrap_or(1);
let page_size = req.page_size.unwrap_or(20);
let offset = ((page - 1) as i64) * (page_size as i64);
let records = state.db.search_people(&req.query, page_size as i32, offset).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let data: Vec<PeopleItem> = records.into_iter().map(|r| PeopleItem {
identity_id: r.uuid,
name: r.name,
metadata: r.metadata,
created_at: r.created_at,
}).collect();
Ok(Json(PeopleResponse {
success: true,
total: data.len() as i64, // Approximation
page,
page_size,
data,
}))
}
#[derive(Debug, Deserialize)]
pub struct CandidatesQuery {
page: Option<usize>,
page_size: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct CandidatesResponse {
pub success: bool,
pub total: i64,
pub page: usize,
pub page_size: usize,
pub data: Vec<CandidateItem>,
}
#[derive(Debug, Serialize)]
pub struct CandidateItem {
pub pre_chunk_id: i64,
pub file_uuid: Uuid,
pub data: serde_json::Value,
}
async fn list_candidates(
State(state): State<crate::api::server::AppState>,
Query(params): Query<CandidatesQuery>,
) -> Result<Json<CandidatesResponse>, (StatusCode, String)> {
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let offset = ((page - 1) as i64) * (page_size as i64);
let records = state.db.get_people_candidates(page_size as i32, offset).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let data = records.into_iter().map(|r| CandidateItem {
pre_chunk_id: r.id,
file_uuid: r.file_uuid,
data: r.data,
}).collect();
Ok(Json(CandidatesResponse {
success: true,
total: 0, // TODO
page,
page_size,
data,
}))
}
// --- Files Endpoints ---
#[derive(Debug, Deserialize)]
pub struct FilesQuery {
page: Option<usize>,
page_size: Option<usize>,
}
#[derive(Debug, Serialize)]
pub struct FilesResponse {
pub success: bool,
pub total: i64,
pub page: usize,
pub page_size: usize,
pub data: Vec<FileItem>,
}
#[derive(Debug, Serialize)]
pub struct FileItem {
pub file_uuid: String,
pub file_name: String,
pub file_path: String,
pub status: String, // From probe or processing status
}
async fn list_files(
State(state): State<crate::api::server::AppState>,
Query(params): Query<FilesQuery>,
) -> Result<Json<FilesResponse>, (StatusCode, String)> {
let page = params.page.unwrap_or(1);
let page_size = params.page_size.unwrap_or(20);
let offset = ((page - 1) as i64) * (page_size as i64);
let records = state.db.list_files(page_size as i32, offset).await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let data = records.into_iter().map(|r| FileItem {
file_uuid: r.uuid,
file_name: r.file_name,
file_path: r.file_path,
status: "ready".to_string(),
}).collect();
Ok(Json(FilesResponse {
success: true,
total: 0, // TODO
page,
page_size,
data,
}))
}
#[derive(Debug, Serialize)]
pub struct FileDetailResponse {
pub file_uuid: String,
pub file_name: String,
pub file_path: String,
pub metadata: serde_json::Value,
}
async fn get_file_detail(
State(state): State<crate::api::server::AppState>,
Path(uuid): Path<String>,
) -> Result<Json<FileDetailResponse>, (StatusCode, String)> {
// Need a method to get single file
// For now, placeholder
Ok(Json(FileDetailResponse {
file_uuid: uuid,
file_name: "Unknown".to_string(),
file_path: "/path/to/file".to_string(),
metadata: serde_json::json!({}),
}))
}

View File

@@ -1,5 +1,6 @@
pub mod face_recognition;
pub mod identities;
pub mod identity_api;
pub mod identity_binding;
pub mod middleware;
pub mod n8n_search;

View File

@@ -23,6 +23,7 @@ use crate::{Embedder, FileManager};
use super::face_recognition;
use super::identities;
use super::identity_binding;
use super::identity_api;
use super::middleware::api_key_validation;
use super::n8n_search;
use super::person_identity;
@@ -2480,6 +2481,7 @@ pub async fn start_server(host: &str, port: u16) -> anyhow::Result<()> {
"/api/v1/search/visual/combination",
post(search_visual_chunks_by_combination),
)
.merge(identity_api::identity_routes()) // Phase 3 Routes
.merge(protected_routes)
.layer(cors)
.with_state(state);

View File

@@ -15,6 +15,32 @@ use crate::core::text::{
tokenizer::{contains_chinese, tokenize_chinese_text},
};
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct IdentityRecord {
pub id: i32,
pub uuid: Uuid,
pub name: String,
pub metadata: serde_json::Value,
pub created_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct FileRecord {
pub uuid: String,
pub file_path: String,
pub file_name: String,
pub probe_json: Option<serde_json::Value>,
pub created_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct CandidateRecord {
pub id: i64,
pub file_uuid: Uuid,
pub data: serde_json::Value, // Face data
pub created_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StorageStatus {
pub fs_video: bool,
@@ -1704,6 +1730,153 @@ impl PostgresDb {
Ok(())
}
/// Store a raw pre-chunk from a processor (e.g., YOLO frame, Face detection).
/// This replaces the old direct-to-chunks approach for trace data.
pub async fn store_raw_pre_chunk(
&self,
file_uuid: &str,
processor_type: &str,
coordinate_index: i64,
timestamp: Option<f64>,
data: &serde_json::Value,
identity_id: Option<Uuid>,
confidence: Option<f64>,
) -> Result<()> {
let table = schema::table_name("pre_chunks");
let query = format!(
r#"
INSERT INTO {} (
file_uuid, processor_type, coordinate_type, coordinate_index,
timestamp, data, identity_id, confidence
) VALUES ($1, $2, 'frame', $3, $4, $5, $6, $7)
"#,
table
);
sqlx::query(&query)
.bind(file_uuid)
.bind(processor_type)
.bind(coordinate_index)
.bind(timestamp)
.bind(data)
.bind(identity_id)
.bind(confidence)
.execute(self.pool())
.await
.map_err(|e| anyhow::anyhow!("Failed to store raw pre_chunk: {}", e))?;
Ok(())
}
/// Batch store pre-chunks for better performance (e.g. bulk insert of frames).
pub async fn store_raw_pre_chunks_batch(
&self,
file_uuid: &str,
processor_type: &str,
chunks: &Vec<(i64, Option<f64>, serde_json::Value, Option<Uuid>, Option<f64>)>,
) -> Result<()> {
// For large batches, we can use a loop or copy. Here using loop for safety with pgvector types if any.
// Note: A transaction is recommended for batch inserts.
let mut tx = self.pool().begin().await?;
let table = schema::table_name("pre_chunks");
let query = format!(
r#"
INSERT INTO {} (
file_uuid, processor_type, coordinate_type, coordinate_index,
timestamp, data, identity_id, confidence
) VALUES ($1, $2, 'frame', $3, $4, $5, $6, $7)
"#,
table
);
for (coord_idx, ts, data, id, conf) in chunks {
sqlx::query(&query)
.bind(file_uuid)
.bind(processor_type)
.bind(*coord_idx)
.bind(*ts)
.bind(data)
.bind(*id)
.bind(*conf)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
pub async fn list_people(&self, limit: i32, offset: i64) -> Result<Vec<IdentityRecord>> {
let query = r#"
SELECT id, uuid, name, metadata, created_at
FROM identities
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
"#;
let rows = sqlx::query_as(query)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn search_people(&self, query: &str, limit: i32, offset: i64) -> Result<Vec<IdentityRecord>> {
let pattern = format!("%{}%", query);
let sql = r#"
SELECT id, uuid, name, metadata, created_at
FROM identities
WHERE name ILIKE $1
ORDER BY name ASC
LIMIT $2 OFFSET $3
"#;
let rows = sqlx::query_as(sql)
.bind(pattern)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn get_people_candidates(&self, limit: i32, offset: i64) -> Result<Vec<CandidateRecord>> {
let query = r#"
SELECT id, file_uuid, data, created_at
FROM pre_chunks
WHERE processor_type = 'face' AND identity_id IS NULL
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
"#;
let rows = sqlx::query_as(query)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn list_files(&self, limit: i32, offset: i64) -> Result<Vec<FileRecord>> {
let query = r#"
SELECT uuid, file_path, file_name, probe_json, created_at
FROM videos
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
"#;
let rows = sqlx::query_as(query)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}
pub async fn store_chunk(&self, chunk: &Chunk) -> Result<()> {
let table = schema::table_name("chunks");
let content_with_rule = serde_json::json!({