feat(graph): add brain_entities + brain_edges store and wikilink parser
Foundation for Track A (GraphRAG on top of existing wiki). Two new packages, both unwired — service behaviour unchanged until commit 2 hooks the pipeline. - internal/graph: pure parser. Extract() walks markdown + frontmatter and emits one Entity + N wikilink Edges per doc. Dedupes per (dst, line), ignores self-references, classifies hall/concept/entity/ source/knowledge from path layout. - internal/graphstore: pgx-backed PGStore mirroring vectorstore's shape. Idempotent Init() creates brain_entities + brain_edges with indexes on src_slug, dst_slug, src_doc, wing, type. Operations: UpsertEntity, ReplaceEdgesForDoc (tx), DeleteByDoc, Neighbors, Subgraph (recursive CTE, depth ≤6), Path (shortest path, depth ≤8). Schema lives on the shared postgres18 instance alongside the brain_embeddings table — no new datastore. See docs/superpowers/specs/2026-05-homelab-training-graph-next-step.md in infra repo + infra#62. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
351
ingestion/internal/graphstore/pg.go
Normal file
351
ingestion/internal/graphstore/pg.go
Normal file
@@ -0,0 +1,351 @@
|
||||
// Package graphstore stores the brain knowledge graph (entities +
|
||||
// directed edges) in PostgreSQL on the shared postgres18 instance,
|
||||
// alongside the pgvector embeddings in [vectorstore].
|
||||
//
|
||||
// Schema (created idempotently by Init):
|
||||
//
|
||||
// brain_entities(slug PK, type, wing, hall, doc_path, title, updated_at)
|
||||
// brain_edges(id PK, src_slug FK, dst_slug, edge_type, src_doc, src_line,
|
||||
// weight, updated_at)
|
||||
//
|
||||
// Edges fan-out from a source document; calling [PGStore.ReplaceEdgesForDoc]
|
||||
// replaces every edge previously emitted from that document so re-ingest is
|
||||
// idempotent without bookkeeping.
|
||||
//
|
||||
// All slug strings are stored verbatim — callers are expected to canonicalise
|
||||
// before persisting. Dst slugs may reference entities that don't yet exist
|
||||
// (dangling edges); resolution is deferred to query time so ingestion order
|
||||
// doesn't matter.
|
||||
package graphstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/graph"
|
||||
)
|
||||
|
||||
// PGStore is the postgres-backed brain knowledge-graph store. Construct
|
||||
// with New + call Init once to create tables and indexes. Use Close to
|
||||
// release the pool.
|
||||
type PGStore struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
// New opens a pgxpool against dsn and pings to verify connectivity. The
|
||||
// caller owns the resulting PGStore and must invoke Close.
|
||||
func New(ctx context.Context, dsn string) (*PGStore, error) {
|
||||
pool, err := pgxpool.New(ctx, dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgxpool: %w", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("ping: %w", err)
|
||||
}
|
||||
return &PGStore{pool: pool}, nil
|
||||
}
|
||||
|
||||
// Close releases the underlying connection pool.
|
||||
func (s *PGStore) Close() {
|
||||
if s.pool != nil {
|
||||
s.pool.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Init creates brain_entities + brain_edges tables and their indexes if
|
||||
// they don't yet exist. Safe to call on every startup. No-op when the
|
||||
// schema already matches.
|
||||
func (s *PGStore) Init(ctx context.Context) error {
|
||||
const ddl = `
|
||||
CREATE TABLE IF NOT EXISTS brain_entities (
|
||||
slug TEXT PRIMARY KEY,
|
||||
type TEXT NOT NULL DEFAULT 'knowledge',
|
||||
wing TEXT NOT NULL DEFAULT '',
|
||||
hall TEXT NOT NULL DEFAULT '',
|
||||
doc_path TEXT NOT NULL,
|
||||
title TEXT NOT NULL DEFAULT '',
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS brain_entities_wing_idx
|
||||
ON brain_entities (wing) WHERE wing <> '';
|
||||
CREATE INDEX IF NOT EXISTS brain_entities_type_idx
|
||||
ON brain_entities (type);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS brain_edges (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
src_slug TEXT NOT NULL,
|
||||
dst_slug TEXT NOT NULL,
|
||||
edge_type TEXT NOT NULL DEFAULT 'wikilink',
|
||||
src_doc TEXT NOT NULL,
|
||||
src_line INTEGER NOT NULL DEFAULT 0,
|
||||
weight REAL NOT NULL DEFAULT 1.0,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS brain_edges_src_idx
|
||||
ON brain_edges (src_slug, edge_type);
|
||||
CREATE INDEX IF NOT EXISTS brain_edges_dst_idx
|
||||
ON brain_edges (dst_slug, edge_type);
|
||||
CREATE INDEX IF NOT EXISTS brain_edges_src_doc_idx
|
||||
ON brain_edges (src_doc);
|
||||
`
|
||||
_, err := s.pool.Exec(ctx, ddl)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpsertEntity inserts or updates one entity by slug.
|
||||
func (s *PGStore) UpsertEntity(ctx context.Context, e graph.Entity) error {
|
||||
if e.Slug == "" {
|
||||
return errors.New("entity slug is required")
|
||||
}
|
||||
if e.Type == "" {
|
||||
e.Type = "knowledge"
|
||||
}
|
||||
_, err := s.pool.Exec(ctx, `
|
||||
INSERT INTO brain_entities (slug, type, wing, hall, doc_path, title, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, now())
|
||||
ON CONFLICT (slug) DO UPDATE
|
||||
SET type = EXCLUDED.type,
|
||||
wing = EXCLUDED.wing,
|
||||
hall = EXCLUDED.hall,
|
||||
doc_path = EXCLUDED.doc_path,
|
||||
title = EXCLUDED.title,
|
||||
updated_at = now()
|
||||
`, e.Slug, e.Type, e.Wing, e.Hall, e.DocPath, e.Title)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upsert entity %q: %w", e.Slug, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReplaceEdgesForDoc deletes every edge previously emitted from docPath
|
||||
// and inserts the new set in one transaction. Caller should pass the
|
||||
// complete edge set for the doc — partial updates are not supported.
|
||||
func (s *PGStore) ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error {
|
||||
if docPath == "" {
|
||||
return errors.New("doc path is required")
|
||||
}
|
||||
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback(ctx) }()
|
||||
|
||||
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
|
||||
return fmt.Errorf("delete prior edges for %q: %w", docPath, err)
|
||||
}
|
||||
for _, e := range edges {
|
||||
if e.SrcSlug == "" || e.DstSlug == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := tx.Exec(ctx, `
|
||||
INSERT INTO brain_edges (src_slug, dst_slug, edge_type, src_doc, src_line, weight)
|
||||
VALUES ($1, $2, $3, $4, $5, 1.0)
|
||||
`, e.SrcSlug, e.DstSlug, e.EdgeType, e.SrcDoc, e.SrcLine); err != nil {
|
||||
return fmt.Errorf("insert edge %s->%s: %w", e.SrcSlug, e.DstSlug, err)
|
||||
}
|
||||
}
|
||||
if err := tx.Commit(ctx); err != nil {
|
||||
return fmt.Errorf("commit: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteByDoc removes the entity at docPath and every edge it sourced.
|
||||
// Use when a wiki page is deleted on disk.
|
||||
func (s *PGStore) DeleteByDoc(ctx context.Context, docPath string) error {
|
||||
if docPath == "" {
|
||||
return errors.New("doc path is required")
|
||||
}
|
||||
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback(ctx) }()
|
||||
|
||||
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
|
||||
return fmt.Errorf("delete edges: %w", err)
|
||||
}
|
||||
if _, err := tx.Exec(ctx, `DELETE FROM brain_entities WHERE doc_path = $1`, docPath); err != nil {
|
||||
return fmt.Errorf("delete entity: %w", err)
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// Neighbor is one row in a Neighbors / Subgraph response.
|
||||
type Neighbor struct {
|
||||
Slug string
|
||||
Type string
|
||||
Wing string
|
||||
Hall string
|
||||
DocPath string
|
||||
Title string
|
||||
EdgeType string
|
||||
Distance int // hop count from origin; 1 for direct neighbors
|
||||
}
|
||||
|
||||
// Neighbors returns the direct (1-hop) outgoing neighbours of slug.
|
||||
// edgeType filters by relationship kind; "" returns all kinds.
|
||||
// limit defaults to 25 when <= 0.
|
||||
func (s *PGStore) Neighbors(ctx context.Context, slug, edgeType string, limit int) ([]Neighbor, error) {
|
||||
if slug == "" {
|
||||
return nil, errors.New("slug is required")
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 25
|
||||
}
|
||||
q := `
|
||||
SELECT e.dst_slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
|
||||
COALESCE(t.doc_path,''), COALESCE(t.title,''), e.edge_type, 1
|
||||
FROM brain_edges e
|
||||
LEFT JOIN brain_entities t ON t.slug = e.dst_slug
|
||||
WHERE e.src_slug = $1
|
||||
AND ($2 = '' OR e.edge_type = $2)
|
||||
ORDER BY e.updated_at DESC
|
||||
LIMIT $3
|
||||
`
|
||||
rows, err := s.pool.Query(ctx, q, slug, edgeType, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query neighbors: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
return scanNeighbors(rows)
|
||||
}
|
||||
|
||||
// Subgraph returns every distinct slug reachable from origin within
|
||||
// depth outgoing hops, annotated with the shortest hop distance. The
|
||||
// origin itself is omitted. depth defaults to 2 when <= 0; values
|
||||
// above 6 are clamped to 6 to bound traversal cost.
|
||||
func (s *PGStore) Subgraph(ctx context.Context, origin string, depth int) ([]Neighbor, error) {
|
||||
if origin == "" {
|
||||
return nil, errors.New("origin slug is required")
|
||||
}
|
||||
if depth <= 0 {
|
||||
depth = 2
|
||||
}
|
||||
if depth > 6 {
|
||||
depth = 6
|
||||
}
|
||||
q := `
|
||||
WITH RECURSIVE walk(slug, edge_type, distance) AS (
|
||||
SELECT e.dst_slug, e.edge_type, 1
|
||||
FROM brain_edges e
|
||||
WHERE e.src_slug = $1
|
||||
UNION
|
||||
SELECT e.dst_slug, e.edge_type, w.distance + 1
|
||||
FROM walk w
|
||||
JOIN brain_edges e ON e.src_slug = w.slug
|
||||
WHERE w.distance < $2
|
||||
)
|
||||
SELECT w.slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
|
||||
COALESCE(t.doc_path,''), COALESCE(t.title,''), w.edge_type, MIN(w.distance)
|
||||
FROM walk w
|
||||
LEFT JOIN brain_entities t ON t.slug = w.slug
|
||||
WHERE w.slug <> $1
|
||||
GROUP BY w.slug, t.type, t.wing, t.hall, t.doc_path, t.title, w.edge_type
|
||||
ORDER BY MIN(w.distance), w.slug
|
||||
`
|
||||
rows, err := s.pool.Query(ctx, q, origin, depth)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query subgraph: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
return scanNeighbors(rows)
|
||||
}
|
||||
|
||||
// PathStep is one hop in a Path response.
|
||||
type PathStep struct {
|
||||
FromSlug string
|
||||
ToSlug string
|
||||
EdgeType string
|
||||
}
|
||||
|
||||
// Path returns the shortest directed path from src to dst within
|
||||
// maxDepth hops, as an ordered list of edges. Empty slice means no
|
||||
// path exists. maxDepth defaults to 4 when <= 0; values above 8 are
|
||||
// clamped to 8.
|
||||
func (s *PGStore) Path(ctx context.Context, src, dst string, maxDepth int) ([]PathStep, error) {
|
||||
if src == "" || dst == "" {
|
||||
return nil, errors.New("src and dst are required")
|
||||
}
|
||||
if maxDepth <= 0 {
|
||||
maxDepth = 4
|
||||
}
|
||||
if maxDepth > 8 {
|
||||
maxDepth = 8
|
||||
}
|
||||
q := `
|
||||
WITH RECURSIVE walk(cur, path_slugs, path_edges, distance) AS (
|
||||
SELECT e.dst_slug,
|
||||
ARRAY[e.src_slug, e.dst_slug]::TEXT[],
|
||||
ARRAY[e.edge_type]::TEXT[],
|
||||
1
|
||||
FROM brain_edges e
|
||||
WHERE e.src_slug = $1
|
||||
UNION ALL
|
||||
SELECT e.dst_slug,
|
||||
w.path_slugs || e.dst_slug,
|
||||
w.path_edges || e.edge_type,
|
||||
w.distance + 1
|
||||
FROM walk w
|
||||
JOIN brain_edges e ON e.src_slug = w.cur
|
||||
WHERE w.distance < $3
|
||||
AND NOT (e.dst_slug = ANY(w.path_slugs))
|
||||
)
|
||||
SELECT path_slugs, path_edges
|
||||
FROM walk
|
||||
WHERE cur = $2
|
||||
ORDER BY distance ASC
|
||||
LIMIT 1
|
||||
`
|
||||
row := s.pool.QueryRow(ctx, q, src, dst, maxDepth)
|
||||
var (
|
||||
slugs []string
|
||||
kinds []string
|
||||
)
|
||||
if err := row.Scan(&slugs, &kinds); err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, fmt.Errorf("scan path: %w", err)
|
||||
}
|
||||
if len(slugs) < 2 || len(kinds) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
steps := make([]PathStep, 0, len(kinds))
|
||||
for i := 0; i < len(kinds) && i+1 < len(slugs); i++ {
|
||||
steps = append(steps, PathStep{
|
||||
FromSlug: slugs[i],
|
||||
ToSlug: slugs[i+1],
|
||||
EdgeType: kinds[i],
|
||||
})
|
||||
}
|
||||
return steps, nil
|
||||
}
|
||||
|
||||
// CountEdges is a debug helper — returns the total edges currently stored.
|
||||
// Used by tests and by the volume-gate diagnostic.
|
||||
func (s *PGStore) CountEdges(ctx context.Context) (int64, error) {
|
||||
var n int64
|
||||
err := s.pool.QueryRow(ctx, `SELECT count(*) FROM brain_edges`).Scan(&n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func scanNeighbors(rows pgx.Rows) ([]Neighbor, error) {
|
||||
var out []Neighbor
|
||||
for rows.Next() {
|
||||
var n Neighbor
|
||||
if err := rows.Scan(
|
||||
&n.Slug, &n.Type, &n.Wing, &n.Hall,
|
||||
&n.DocPath, &n.Title, &n.EdgeType, &n.Distance,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scan: %w", err)
|
||||
}
|
||||
out = append(out, n)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user