STEP 1 / 5
Intake · Step 1

Which vertical are you in?

We build industry-specific stacks, not generic ones. Start by telling us where you operate — the next questions adjust to your world.

Intake · Step 2

A bit about the business.

Enough context to calibrate the diagnostic. Revenue bands and team size only — no pressure for specifics.

Intake · Step 3

Where does the friction live?

Select any that apply. The diagnostic will zero in on the top opportunities for your context.

Pick as many as are relevant. You'll add more detail next.
Intake · Step 4

What's already running?

Helps us separate "add more" from "wire up what you already bought."

Honest answer, not aspirational. This changes the first engagement shape.
Intake · Step 5

How do we reach you?

The diagnostic runs in real time and appears below. A copy is sent to us so we can follow up — within one business day, human-sent.

Received

Thank you. We'll be in touch.

Your diagnostic has been logged and sent to our team. We respond to every qualifying inquiry personally within one business day. If you need to add anything before then, reply to the confirmation email.

Return to Tacavar
# Tacavar Blog Module A lightweight content system that unifies two sources: 1. **Your existing hand-written blog posts** — imported into Postgres, edited from the admin panel 2. **Research engine's weekly briefs** — auto-published by R-02 when it emits a brief One feed, one design, two content types. Readers see a single, coherent blog. You see two pipelines on the back end. --- ## 1. Schema addition Append to `sql/001_init.sql` (or make a new migration file): ```sql -- Posts: unified for hand-written + auto-generated briefs CREATE TABLE IF NOT EXISTS posts ( id BIGSERIAL PRIMARY KEY, slug TEXT UNIQUE NOT NULL, title TEXT NOT NULL, subtitle TEXT, kind TEXT NOT NULL DEFAULT 'article', -- 'article' | 'brief' category TEXT, -- free-form: AI Ops, Healthcare, etc. body_html TEXT NOT NULL, excerpt TEXT, cover_image TEXT, -- optional URL author TEXT NOT NULL DEFAULT 'Tacavar', read_minutes INT, published BOOLEAN NOT NULL DEFAULT FALSE, published_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- For briefs, preserve provenance back into the research pipeline source_run_id TEXT, source_themes TEXT[] -- array of theme tags ); CREATE INDEX IF NOT EXISTS idx_posts_published ON posts (published, published_at DESC); CREATE INDEX IF NOT EXISTS idx_posts_kind ON posts (kind); CREATE INDEX IF NOT EXISTS idx_posts_slug ON posts (slug); -- Trigger to keep updated_at fresh CREATE OR REPLACE FUNCTION touch_updated_at() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; DROP TRIGGER IF EXISTS trg_posts_updated_at ON posts; CREATE TRIGGER trg_posts_updated_at BEFORE UPDATE ON posts FOR EACH ROW EXECUTE FUNCTION touch_updated_at(); ``` Run: ```bash psql "$DATABASE_URL" -f sql/002_posts.sql ``` --- ## 2. API — `app/routes/posts.py` ```python """ Posts API. Public routes: list + detail of PUBLISHED posts only. Admin routes: full CRUD + publish on token auth. Ingest route: R-02 pushes weekly briefs here. """ from fastapi import APIRouter, Header, HTTPException, Query from pydantic import BaseModel, Field, constr from datetime import datetime from typing import Optional import re import html as html_lib from ..db import pool from ..config import settings router = APIRouter(tags=["posts"]) def _require_internal(token: str): if token != settings.api_internal_token: raise HTTPException(status_code=401, detail="unauthorized") def slugify(s: str) -> str: s = s.lower().strip() s = re.sub(r"[^a-z0-9\s-]", "", s) s = re.sub(r"\s+", "-", s) s = re.sub(r"-+", "-", s) return s[:100].strip("-") def strip_html(s: str) -> str: """Very light HTML strip for building excerpts.""" txt = re.sub(r"<[^>]+>", " ", s or "") txt = html_lib.unescape(txt) return re.sub(r"\s+", " ", txt).strip() def estimate_read_minutes(html_body: str) -> int: words = len(strip_html(html_body).split()) return max(1, round(words / 220)) # ========== Schemas ========== class PostIn(BaseModel): slug: Optional[constr(max_length=120)] = None title: constr(min_length=1, max_length=300) subtitle: Optional[constr(max_length=400)] = None kind: Optional[str] = "article" category: Optional[constr(max_length=80)] = None body_html: constr(min_length=1) excerpt: Optional[constr(max_length=500)] = None cover_image: Optional[constr(max_length=500)] = None author: Optional[constr(max_length=80)] = "Tacavar" source_run_id: Optional[str] = None source_themes: Optional[list[str]] = None class PostOut(BaseModel): id: int slug: str title: str subtitle: Optional[str] = None kind: str category: Optional[str] = None body_html: Optional[str] = None # omitted on list excerpt: Optional[str] = None cover_image: Optional[str] = None author: str read_minutes: Optional[int] = None published: bool published_at: Optional[datetime] = None created_at: datetime source_themes: Optional[list[str]] = None # ========== Public routes ========== @router.get("/v1/posts", response_model=list[PostOut]) async def list_published( limit: int = Query(30, le=100), offset: int = 0, kind: Optional[str] = None, category: Optional[str] = None, ): where = ["published = TRUE"] params: list = [] if kind: params.append(kind) where.append(f"kind = ${len(params)}") if category: params.append(category) where.append(f"category = ${len(params)}") where_sql = " AND ".join(where) params.extend([limit, offset]) sql = f""" SELECT id, slug, title, subtitle, kind, category, excerpt, cover_image, author, read_minutes, published, published_at, created_at, source_themes FROM posts WHERE {where_sql} ORDER BY published_at DESC NULLS LAST, created_at DESC LIMIT ${len(params)-1} OFFSET ${len(params)} """ async with pool().acquire() as conn: rows = await conn.fetch(sql, *params) return [PostOut(**dict(r)) for r in rows] @router.get("/v1/posts/{slug}", response_model=PostOut) async def get_published(slug: str): async with pool().acquire() as conn: row = await conn.fetchrow( """ SELECT id, slug, title, subtitle, kind, category, body_html, excerpt, cover_image, author, read_minutes, published, published_at, created_at, source_themes FROM posts WHERE slug = $1 AND published = TRUE """, slug, ) if not row: raise HTTPException(status_code=404, detail="not_found") return PostOut(**dict(row)) # ========== Admin routes ========== @router.get("/v1/admin/posts", response_model=list[PostOut]) async def admin_list(x_internal_token: str = Header(default="")): _require_internal(x_internal_token) async with pool().acquire() as conn: rows = await conn.fetch( """ SELECT id, slug, title, subtitle, kind, category, excerpt, cover_image, author, read_minutes, published, published_at, created_at, source_themes FROM posts ORDER BY created_at DESC """ ) return [PostOut(**dict(r)) for r in rows] @router.get("/v1/admin/posts/{post_id}", response_model=PostOut) async def admin_get(post_id: int, x_internal_token: str = Header(default="")): _require_internal(x_internal_token) async with pool().acquire() as conn: row = await conn.fetchrow( "SELECT * FROM posts WHERE id = $1", post_id ) if not row: raise HTTPException(status_code=404, detail="not_found") return PostOut(**dict(row)) @router.post("/v1/admin/posts", response_model=PostOut) async def admin_create(payload: PostIn, x_internal_token: str = Header(default="")): _require_internal(x_internal_token) slug = payload.slug or slugify(payload.title) excerpt = payload.excerpt or strip_html(payload.body_html)[:280] + "…" read_mins = estimate_read_minutes(payload.body_html) async with pool().acquire() as conn: try: row = await conn.fetchrow( """ INSERT INTO posts (slug, title, subtitle, kind, category, body_html, excerpt, cover_image, author, read_minutes, source_run_id, source_themes) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12) RETURNING * """, slug, payload.title, payload.subtitle, payload.kind or "article", payload.category, payload.body_html, excerpt, payload.cover_image, payload.author or "Tacavar", read_mins, payload.source_run_id, payload.source_themes, ) except Exception as e: raise HTTPException(status_code=409, detail=f"create_failed: {e}") return PostOut(**dict(row)) @router.patch("/v1/admin/posts/{post_id}", response_model=PostOut) async def admin_update( post_id: int, payload: dict, x_internal_token: str = Header(default=""), ): _require_internal(x_internal_token) allowed = {"title", "subtitle", "kind", "category", "body_html", "excerpt", "cover_image", "author", "published", "published_at", "source_themes"} fields = {k: v for k, v in payload.items() if k in allowed} if not fields: raise HTTPException(status_code=422, detail="no_valid_fields") # If publishing and no published_at, set it now if fields.get("published") is True and "published_at" not in fields: fields["published_at"] = datetime.utcnow() if "body_html" in fields: fields["read_minutes"] = estimate_read_minutes(fields["body_html"]) sets = ", ".join(f"{k} = ${i+2}" for i, k in enumerate(fields.keys())) values = list(fields.values()) async with pool().acquire() as conn: row = await conn.fetchrow( f"UPDATE posts SET {sets} WHERE id = $1 RETURNING *", post_id, *values, ) if not row: raise HTTPException(status_code=404, detail="not_found") return PostOut(**dict(row)) @router.delete("/v1/admin/posts/{post_id}") async def admin_delete(post_id: int, x_internal_token: str = Header(default="")): _require_internal(x_internal_token) async with pool().acquire() as conn: result = await conn.execute("DELETE FROM posts WHERE id = $1", post_id) if result.endswith(" 0"): raise HTTPException(status_code=404, detail="not_found") return {"ok": True} # ========== Research-engine ingest ========== class BriefIn(BaseModel): title: constr(min_length=1, max_length=300) subtitle: Optional[str] = None body_html: constr(min_length=1) themes: Optional[list[str]] = None run_id: Optional[str] = None auto_publish: bool = False # default to draft — safer @router.post("/v1/admin/briefs", response_model=PostOut) async def ingest_brief( payload: BriefIn, x_internal_token: str = Header(default=""), ): """ Called by R-02 (Synthesizer) when a weekly brief is ready. By default the brief lands as a draft. You approve in the admin panel before it goes public. This matches the house rule: nothing the agents produce publishes without a human touch. """ _require_internal(x_internal_token) slug = slugify(payload.title) excerpt = strip_html(payload.body_html)[:280] + "…" read_mins = estimate_read_minutes(payload.body_html) # Disambiguate duplicate slugs from multiple briefs in the same week async with pool().acquire() as conn: existing = await conn.fetchval("SELECT 1 FROM posts WHERE slug = $1", slug) if existing: slug = f"{slug}-{datetime.utcnow().strftime('%Y%m%d-%H%M')}" published_at = datetime.utcnow() if payload.auto_publish else None row = await conn.fetchrow( """ INSERT INTO posts (slug, title, subtitle, kind, category, body_html, excerpt, author, read_minutes, published, published_at, source_run_id, source_themes) VALUES ($1,$2,$3,'brief','Weekly Brief',$4,$5,'Tacavar Research', $6,$7,$8,$9,$10) RETURNING * """, slug, payload.title, payload.subtitle, payload.body_html, excerpt, read_mins, payload.auto_publish, published_at, payload.run_id, payload.themes, ) return PostOut(**dict(row)) ``` Wire into `app/main.py`: ```python from .routes import leads, audit, posts app.include_router(leads.router) app.include_router(audit.router) app.include_router(posts.router) ``` --- ## 3. R-02 emitter for publishing briefs Drop this into your research engine alongside the existing `emit()`: ```python # tacavar_publish.py import os, httpx API = "https://api.tacavar.com/v1/admin/briefs" TOKEN = os.environ["TACAVAR_API_INTERNAL_TOKEN"] def publish_brief(title: str, body_html: str, subtitle: str | None = None, themes: list[str] | None = None, run_id: str | None = None, auto_publish: bool = False) -> int | None: """ Push a weekly brief to the blog as a DRAFT by default. The post won't appear publicly until you approve in the admin panel. """ try: r = httpx.post( API, json={ "title": title, "subtitle": subtitle, "body_html": body_html, "themes": themes or [], "run_id": run_id, "auto_publish": auto_publish, }, headers={"X-Internal-Token": TOKEN}, timeout=10.0, ) if r.status_code == 200: return r.json().get("id") except Exception: pass return None ``` Then in R-02 where you finalize a weekly brief: ```python from tacavar_publish import publish_brief from tacavar_audit import emit def finalize_weekly_brief(title, html_body, themes, run_id): post_id = publish_brief( title=title, subtitle=f"Weekly research synthesis · {len(themes)} themes", body_html=html_body, themes=themes, run_id=run_id, auto_publish=False, # lands as draft; you approve ) emit("R-02", f"Weekly brief synthesized · {len(themes)} themes", "OK", public=True) return post_id ``` --- ## 4. Migrating your existing blog posts Two paths depending on how many you have. ### Path A — small number (under 20), copy by hand From the admin panel (coming in section 6) you just paste each post's title, body HTML, and category. Takes maybe 3 minutes per post. ### Path B — many posts, bulk import script If you have many, dump them from wherever they live now (WordPress export, Markdown files, a Notion export, etc.) and run: ```python # import_legacy_posts.py import os, json, httpx, sys, pathlib API = "https://api.tacavar.com/v1/admin/posts" PATCH = "https://api.tacavar.com/v1/admin/posts/{id}" TOKEN = os.environ["TACAVAR_API_INTERNAL_TOKEN"] HEADERS = {"X-Internal-Token": TOKEN, "Content-Type": "application/json"} # posts.json shape: # [{"title": "...", "body_html": "...", "category": "...", "published_at": "2025-09-01T..."}] posts = json.loads(pathlib.Path(sys.argv[1]).read_text()) for p in posts: # Create r = httpx.post(API, json={ "title": p["title"], "body_html": p["body_html"], "category": p.get("category"), "subtitle": p.get("subtitle"), "cover_image": p.get("cover_image"), "author": p.get("author", "Tacavar"), }, headers=HEADERS, timeout=30) r.raise_for_status() post = r.json() # Publish immediately (preserves original published_at if provided) pub_fields = {"published": True} if "published_at" in p: pub_fields["published_at"] = p["published_at"] r2 = httpx.patch(PATCH.format(id=post["id"]), json=pub_fields, headers=HEADERS, timeout=30) r2.raise_for_status() print("✓", post["slug"]) ``` Run: `python import_legacy_posts.py posts.json` If your posts are in Markdown, convert first: `pip install markdown` then `markdown.markdown(md_text)` to get HTML. If they're on the current tacavar.com/blog, the simplest approach is to send me the export (or the direct URLs to the posts I should pull) and I'll generate the posts.json for you. --- ## 5. Public blog page — `blog.html` This replaces/supplements whatever is at `tacavar.com/blog` now. Same design language as advisory + stack pages. Host it the same way as the others (Next.js page, static file, whatever your current site uses). It's dependency-free. ```html