OpenArg Backend: The AI Engine Making Argentine Government Data Actually Usable
What if every citizen could interrogate their government with a simple question? Not through FOIA requests that take months. Not through bureaucratic portals designed in 2003. But by typing "How much did public infrastructure spending grow in Córdoba versus Buenos Aires last year?" and getting a cited, charted, confidence-scored answer in seconds.
This isn't a pitch for some Silicon Valley startup selling vaporware to VCs. This is already running at openarg.org, powered by an open-source engine that might be the most sophisticated civic technology project you've never heard of.
Here's the painful truth developers know: government open data is a graveyard of good intentions. Portals dump CSVs with cryptic column names. APIs return XML from the Bush administration. Datasets contradict each other. And the tools meant to "democratize" this data? They're built for data scientists with PhDs, not journalists, activists, or curious citizens.
Enter OpenArg Backend — the open-source analysis engine that orchestrates multiple AI agents, nine data connectors, natural-language-to-SQL translation, and real-time streaming to turn Argentina's fragmented public data into coherent, verifiable answers. Built with FastAPI, PostgreSQL + pgvector, Celery, and Claude LLMs, this isn't just another RAG demo. It's production-grade civic infrastructure with hexagonal architecture, semantic caching, and a spec-driven development process that puts most enterprise codebases to shame.
Ready to see how they pulled this off? Let's dissect what makes this system tick.
What Is OpenArg Backend?
OpenArg Backend is the AI-powered analysis engine behind OpenArg, a platform that answers natural-language questions about Argentine public data. Created by Luciano Carreno and Dante De Agostino under the ColossusLab research organization, it represents a rare intersection of cutting-edge AI engineering and genuine civic technology.
The project emerged from a frustrating reality: Argentina, like many nations, publishes substantial open government data through initiatives like datos.gob.ar, but the data remains largely inaccessible to non-technical users. Economic time series, legislative records, geographic databases, transparency declarations — they're all "open" in theory, locked behind technical barriers in practice.
What makes OpenArg Backend trending now is its architectural ambition. While most LLM applications slap a chat interface on a vector database and call it "AI," this system implements a multi-agent pipeline using LangGraph, where specialized AI collaborators decompose questions, gather evidence in parallel, validate SQL execution, and synthesize cited responses. It's the difference between a search engine and a research assistant.
The repository has gained traction among developers interested in production LLM systems, civic tech architecture, and LangGraph patterns beyond simple chatbots. Its hexagonal architecture, comprehensive spec-driven design documentation, and real-world deployment at scale make it a reference implementation for anyone building serious data analysis pipelines with language models.
Key Features That Separate It From the Pack
OpenArg Backend isn't a prototype — it's a production system with engineering decisions that reveal deep battle scars. Here's what stands out:
Multi-Agent LangGraph Pipeline with Specialized Roles. The system implements four (optionally five) distinct AI agents that collaborate through a state machine: a Strategist decomposes questions and selects data sources; Researchers dispatch to nine connectors concurrently; an Analyst synthesizes findings with citations and confidence scoring; a Policy Analyst (user-activated) evaluates government programs against DNFCG criteria; and a Writer assembles the final formatted response. This isn't prompt chaining — it's structured collaboration with replanning loops when data proves insufficient.
Nine Data Connectors with Intelligent Routing. The engine connects to economic time series (30,000+ from INDEC/BCRA), dollar rates, central bank indicators, legislative session transcripts (with vector search), patrimonial declarations, congressional staff data, geographic entities, 3,000+ CKAN datasets across 20 portals, and an internal SQL sandbox. The Strategist agent selects which connectors to invoke, avoiding the brute-force "search everything" approach that kills latency.
NL2SQL with Three-Layer Validation. Natural language queries get translated to SQL against cached dataset tables, but with defense in depth: syntax validation, semantic checks against schema metadata, and read-only execution constraints. This isn't your demo-day "ask questions of your CSV" toy — it's production SQL generation with guardrails.
Semantic Caching via Redis + pgvector. Before burning LLM tokens on a query, the system checks for semantically similar recent questions using vector similarity search. For repetitive analytical patterns, this cuts costs and latency dramatically.
Real-Time WebSocket Streaming. The /api/v1/query/ws/smart endpoint streams pipeline progress to the frontend, so users see their answer being constructed in real-time — which agent is running, which connectors returned data, when analysis begins.
Hexagonal Architecture with Dishka DI. The codebase enforces clean separation through domain ports (IDataSource, ILLMProvider, IVectorSearch, ISQLSandbox, ICacheService) and infrastructure adapters, all wired by Dishka. This isn't architectural astronautism — it enables swapping Claude for another LLM, PostgreSQL for another vector store, without touching business logic.
Celery Worker Army with Tuned Concurrency. Eight specialized worker types run on dedicated queues with concurrency tuned to their workload: scrapers at 2 (rate-limited by portals), embedding generators at 8 (CPU-bound vectorization), analysts at 2 (LLM API-throttled). This is operations engineering most AI projects ignore.
Where OpenArg Backend Actually Shines: Real Use Cases
Let's move beyond feature lists to concrete scenarios where this architecture proves its worth:
Investigative Journalism on Public Spending. A journalist needs to trace infrastructure investment across Argentine provinces over five years, comparing execution rates against budget allocations. Without OpenArg: download dozens of CSVs from datos.gob.ar, manually reconcile inconsistent province name formats, write pivot tables, hope the datasets use compatible inflation indices. With OpenArg: "Compare infrastructure budget execution in Mendoza, Córdoba and Santa Fe from 2019-2024, adjusted for inflation" — the Strategist selects Series Tiempo and CKAN connectors, Researchers pull the relevant datasets, Analyst detects the journalist's implicit need for real versus nominal values, and Writer returns cited charts with confidence intervals.
Academic Policy Analysis. A researcher evaluates a social program's effectiveness using DNFCG criteria (pertinence, efficacy, efficiency, impact, sustainability). The Policy Analyst agent activates, specifically designed to apply this framework with evidence-based scoring — something generic LLM chatbots can't do without extensive prompting. The system cites specific datasets, flags data gaps that weaken conclusions, and maintains methodological transparency.
Real-Time Legislative Monitoring. Staffers track congressional session transcripts for mentions of specific policy topics. The vector search connector on "Sesiones" enables semantic retrieval — finding discussions about "inflation control" even when speakers use terms like "price stability" or "monetary erosion." Combined with temporal filtering, this replaces manual transcript review that previously took days.
Anti-Corruption Due Diligence. Investigators cross-reference patrimonial declarations (DDJJ) from 195 deputies against congressional staff data and public contracts. The SQL sandbox enables precise structured queries — "Show deputies whose declared net worth increased over 200% during their term, with staff members who appear in transparency datasets" — while the multi-agent pipeline handles the fuzzy matching and synthesis that pure SQL can't express.
Economic Indicator Dashboards. Financial analysts need composite views combining BCRA reserves, country risk (EMBI), multiple dollar exchange rates, and monetary base — updated daily. The ingestion pipeline's Celery workers automate this refresh, with embedding workers keeping vector search current and scrapers detecting new dataset publications across 20 government portals.
Step-by-Step: Getting OpenArg Backend Running
The maintainers optimized for Docker deployment and local development with sensible defaults. Here's how to get started:
Docker Deployment (Recommended for Production)
# Clone the repository
git clone https://github.com/colossus-lab/openarg_backend.git
cd openarg_backend
# Copy and configure environment variables
cp .env.example .env
# Edit .env with your credentials:
# - AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY (for Bedrock Claude + Cohere embeddings)
# - DATABASE_URL (PostgreSQL 16 with pgvector extension)
# - REDIS_URL (for caching and Celery broker)
# - ANTHROPIC_API_KEY (fallback if Bedrock fails)
# Launch the full stack
docker compose up -d
The Docker Compose configuration orchestrates: FastAPI application servers, PostgreSQL 16 with pgvector, Redis 7, Celery workers across all eight specializations, Celery Beat scheduler, and Flower monitoring UI.
Local Development Environment
For active development with hot reload:
# Install dependencies (requires uv — the Rust-based Python package manager)
make install
# Start PostgreSQL and Redis containers
make db.up
# Run database migrations (Alembic)
make db.migrate
# Start API with hot reload on port 8080
make dev
The Makefile provides comprehensive development commands:
# Start individual worker types for debugging
make workers.scraper # CKAN portal scraping
make workers.collector # Dataset download and parsing
make workers.embedding # Vector embedding generation
make workers.analyst # Query pipeline execution
make workers.transparency # Budget and transparency data
make workers.ingest # Structured source ingestion
make workers.s3 # Large dataset S3 operations
make beat # Periodic task scheduler
make flower # Celery monitoring dashboard
# Code quality
make code.format # Ruff formatting
make code.lint # Ruff + mypy
make code.test # Pytest with coverage
make code.check # Full validation pipeline
Critical configuration notes: The system requires PostgreSQL 16 with the pgvector extension enabled and HNSW indexes configured for 1024-dimensional vectors. Redis 7 serves dual purpose as cache backend and Celery message broker. AWS credentials need Bedrock access for Claude 3 and Cohere Embed Multilingual v3 — the Anthropic API key provides resilience if AWS regions experience issues.
Real Code: Inside the Multi-Agent Pipeline
Let's examine actual implementation patterns from the repository, starting with the core pipeline orchestration:
Pipeline State Definition
The LangGraph pipeline operates on a structured state object that carries context across agent boundaries:
# From specs/001-query-pipeline/ — state carries execution context
# through all 16 pipeline nodes
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from enum import Enum
class QueryCategory(Enum):
CASUAL = "casual" # Simple greetings, chitchat
META = "meta" # Questions about the system itself
INJECTION = "injection" # Prompt injection attempts (blocked)
OFF_TOPIC = "off_topic" # Outside scope
ANALYTICAL = "analytical" # Requires data analysis
@dataclass
class QueryState:
"""Mutable state passed between LangGraph nodes.
Each agent reads its inputs and writes its outputs here,
enabling checkpointing and replanning loops.
"""
query_id: str # Unique trace ID for observability
raw_question: str # Original user input
category: Optional[QueryCategory] = None # Classification result (0 LLM calls — heuristic-based)
# Semantic cache results
cache_hit: bool = False
cached_response: Optional[str] = None
# Planning phase (Strategist agent — 1 LLM call)
execution_plan: Optional[Dict[str, Any]] = None # Which connectors, in what order
required_connectors: List[str] = field(default_factory=list)
# Data collection phase (Researchers — parallel, 0 LLM calls)
connector_results: Dict[str, Any] = field(default_factory=dict)
sql_results: Optional[List[Dict]] = None # NL2SQL sandbox output
# Analysis phase (Analyst agent — 1 LLM call)
synthesized_analysis: Optional[str] = None
confidence_score: float = 0.0 # 0.0-1.0 based on data quality
citations: List[Dict[str, str]] = field(default_factory=list)
chart_data: Optional[List[Dict]] = None # Structured for frontend rendering
# Policy analysis (optional — user-activated, 1 LLM call)
policy_evaluation: Optional[Dict[str, Any]] = None
# Replanning loop
replan_count: int = 0 # Prevent infinite loops
needs_replan: bool = False
# Final assembly (Writer agent — streaming)
final_markdown: Optional[str] = None
sources: List[Dict[str, str]] = field(default_factory=list)
This state design enables checkpointing and recovery — if the Analyst agent detects insufficient data, it sets needs_replan=True, triggering the Strategist to generate an alternative approach with different connectors or a relaxed query interpretation.
Domain Port for Data Sources
The hexagonal architecture's power shows in abstract interfaces that hide infrastructure complexity:
# From domain layer — ports define contracts, adapters implement them
from abc import ABC, abstractmethod
from typing import AsyncIterator, Dict, Any, Optional
from dataclasses import dataclass
@dataclass(frozen=True)
class DataSourceResult:
"""Standardized output from any data connector.
The Analyst agent consumes this uniformly regardless of
whether data came from CKAN, SQL, or an API.
"""
source_name: str # "series_tiempo", "ckan", "nl2sql", etc.
data: Dict[str, Any] # Structured payload
metadata: Dict[str, Any] # Provenance, freshness, coverage gaps
raw_query: str # What was actually sent to the source
latency_ms: int # For performance monitoring
# Critical for trust: explicit confidence in this data slice
reliability_score: float # 0.0-1.0 based on source quality, recency, gaps
class IDataSource(ABC):
"""Port: Abstract interface for all data connectors.
Infrastructure adapters implement this for each of the 9 sources.
The Application layer's SmartQueryService routes to adapters
based on the Strategist's execution plan.
"""
@property
@abstractmethod
def source_name(self) -> str:
"""Unique identifier for routing and metrics."""
pass
@abstractmethod
async def search(self, query: str, context: Dict[str, Any]) -> DataSourceResult:
"""Execute a search/query against this data source.
Implementations handle: API authentication, rate limiting,
response parsing, error recovery, and normalization to
the standard DataSourceResult format.
"""
pass
@abstractmethod
async def health_check(self) -> Dict[str, Any]:
"""Return connectivity and freshness status.
Used by /health endpoint for dependency monitoring.
"""
pass
This abstraction lets the system add new government data sources without modifying the pipeline logic. A developer implements IDataSource, registers the adapter with Dishka, and the Strategist can immediately route queries to it.
NL2SQL with Validation Layers
The natural-language-to-SQL component demonstrates production safety patterns:
# From infrastructure layer — SQL sandbox with defense in depth
import re
from typing import Optional, Tuple
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
class SQLSandboxAdapter:
"""Read-only SQL execution with three-layer validation.
Layer 1: Syntactic — parse and validate SQL structure
Layer 2: Semantic — match against known schema metadata
Layer 3: Runtime — execute in read-only transaction with timeouts
"""
# Blacklist of destructive operations — explicit deny
FORBIDDEN_PATTERNS = [
r'\b(DROP|DELETE|TRUNCATE|INSERT|UPDATE|ALTER|CREATE|GRANT)\b',
r';\s*\w+', # Multiple statements
r'--', # Comments that might hide injection
r'/\*', # Block comments
]
# Whitelist of allowed table prefixes — explicit allow
ALLOWED_TABLES = [
'cached_dataset_', # Ingested CKAN datasets
'series_tiempo_', # Economic indicators
'transparency_', # Budget and declarations
'georef_' # Geographic entities
]
async def execute_nl_query(
self,
natural_language: str,
schema_context: str, # Relevant table schemas for LLM context
session: AsyncSession
) -> Tuple[Optional[list], Optional[str]]:
"""Convert NL to SQL and execute with full validation.
Returns: (results, error_message) — error_message is user-safe
"""
# Step 1: Generate SQL via LLM with schema grounding
generated_sql = await self._llm_generate_sql(
question=natural_language,
schema=schema_context,
dialect="postgresql",
examples=self._few_shot_examples() # In-context learning
)
# Step 2: Layer 1 — Syntactic validation
if not self._is_safe_sql(generated_sql):
return None, "Query contains unsupported operations. Please rephrase."
# Step 3: Layer 2 — Semantic validation against metadata
validation = self._validate_against_schema(generated_sql)
if not validation.is_valid:
return None, f"Query references unavailable data: {validation.reason}"
# Step 4: Layer 3 — Execute with read-only guardrails
try:
# SET TRANSACTION READ ONLY ensures no modifications
await session.execute(text("SET TRANSACTION READ ONLY"))
# Statement timeout prevents runaway queries
await session.execute(text("SET LOCAL statement_timeout = '30000'")) # 30s
result = await session.execute(text(generated_sql))
rows = result.mappings().all()
# Size limit prevents excessive memory consumption
if len(rows) > 10_000:
return rows[:10_000], "Results truncated to 10,000 rows"
return list(rows), None
except Exception as e:
# Sanitize error — never expose internal details
self._log_security_event(generated_sql, str(e))
return None, "Unable to execute query. The data may be temporarily unavailable."
def _is_safe_sql(self, sql: str) -> bool:
"""Layer 1: Pattern-based rejection of dangerous operations."""
upper_sql = sql.upper()
for pattern in self.FORBIDDEN_PATTERNS:
if re.search(pattern, upper_sql, re.IGNORECASE):
return False
# Verify table references are in whitelist
# Extract table names via simple parsing (production uses SQL parser)
for table in self._extract_table_references(sql):
if not any(table.startswith(allowed) for allowed in self.ALLOWED_TABLES):
return False
return True
This three-layer approach — generate, validate, execute with constraints — is what separates production NL2SQL from dangerous demos. The explicit allow-list of table prefixes, read-only transaction setting, and sanitized error messages demonstrate security thinking that most "AI SQL" projects lack entirely.
Celery Worker Configuration
The worker specialization shows operational maturity:
# From worker configuration — tuned concurrency per workload type
from celery import Celery
from kombu import Queue
app = Celery('openarg')
# Dedicated queues prevent head-of-line blocking
app.conf.task_queues = (
Queue('scraper', routing_key='scraper.#'), # I/O bound, rate-limited by portals
Queue('collector', routing_key='collector.#'), # Mixed I/O + CPU (pandas parsing)
Queue('embedding', routing_key='embedding.#'), # CPU/GPU bound (vectorization)
Queue('analyst', routing_key='analyst.#'), # LLM API bound (throttle-sensitive)
Queue('transparency', routing_key='transparency.#'), # Batch processing
Queue('ingest', routing_key='ingest.#'), # Structured data pipelines
Queue('s3', routing_key='s3.#'), # Large file operations
)
# Concurrency tuned to workload characteristics
app.conf.worker_concurrency = {
'scraper': 2, # Portal rate limits: aggressive concurrency gets you blocked
'collector': 4, # pandas parsing: some parallelism, memory-constrained
'embedding': 8, # Bedrock Cohere: batch-friendly, CPU waits on API
'analyst': 2, # Claude API: expensive, rate-limited, quality over speed
'transparency': 2, # Complex joins: database-bound
'ingest': 2, # Series Tiempo API: moderate rate limits
's3': 2, # Network throughput: diminishing returns above 2
}.get(os.environ['WORKER_TYPE'], 2)
# Critical: separate prefetch to prevent slow tasks from starving queue
app.conf.worker_prefetch_multiplier = 1 # Each worker takes only 1 task at a time
The queue-per-worker-type pattern with tuned concurrency is a lesson in operational reality. Embedding workers run at 8x because Bedrock Cohere handles batching efficiently; analyst workers stay at 2x because Claude API costs and rate limits make aggressive concurrency expensive and error-prone.
Advanced Usage & Pro Tips
Leverage Semantic Cache Warming. For anticipated high-traffic queries (election periods, budget releases), pre-populate the Redis + pgvector cache by running representative questions through the pipeline. The POST /api/v1/query/ async endpoint lets you batch these efficiently.
Monitor Connector Reliability Scores. Each DataSourceResult includes a reliability_score. Build dashboards on GET /api/v1/metrics to detect when government APIs degrade — the BCRA endpoint, for instance, has historically had uptime issues during currency volatility.
Use the Policy Analyst Sparingly. The DNFCG evaluation agent adds significant latency (extra LLM call) and works best with explicit user activation. Consider A/B testing whether users prefer always-on versus opt-in policy analysis.
Extend via Spec-Driven Design. The specs/ directory is the project's secret weapon. Before modifying behavior, read the relevant spec.md (what/why) and plan.md (how). New features require new spec folders — this discipline prevents the architecture decay that kills most LLM projects after month six.
Implement Custom Connectors. The IDataSource port and Dishka registration make adding provincial or municipal data sources straightforward. Mendoza and Córdoba publish open data; a custom adapter would extend coverage without pipeline changes.
How OpenArg Backend Compares to Alternatives
| Dimension | OpenArg Backend | Generic RAG (LlamaIndex/LangChain) | Traditional BI (Tableau/PowerBI) | Direct API Queries |
|---|---|---|---|---|
| Natural Language | Multi-agent decomposition with replanning | Single-retrieval or simple chain | None — requires SQL/dashboard building | None |
| Data Source Variety | 9 specialized connectors + extensible port system | Usually 1-2 vector stores | Requires manual ETL per source | 1 per query |
| Citation & Provenance | Structured citations with confidence scoring | Often missing or unstructured | Manual annotation | None |
| Policy Analysis | Dedicated DNFCG evaluation agent | Not available | Not available | Not available |
| Real-time Streaming | WebSocket progress streaming | Rarely implemented | N/A | N/A |
| Semantic Cache | Redis + pgvector with similarity matching | Basic exact-match or none | Query result caching | None |
| SQL Generation | NL2SQL with 3-layer validation | Often unvalidated or read-only not enforced | Requires manual modeling | Manual writing |
| Civic Data Focus | Argentine government data optimized | Generic | Generic | N/A |
| Open Source | Full stack, MIT license | Varies | Proprietary | N/A |
| Operational Maturity | 8 worker types, health checks, metrics | Usually single-process | Enterprise-grade | Fragile |
The key insight: OpenArg Backend isn't competing with general AI frameworks — it's what you build when those frameworks prove insufficient for real-world complexity. The multi-agent decomposition, specialized civic connectors, and policy evaluation layer are domain-specific investments that generic tools can't replicate.
Frequently Asked Questions
Is OpenArg Backend only useful for Argentine data?
No — while connectors target Argentine government sources, the hexagonal architecture lets you swap IDataSource implementations for any country's open data. The LangGraph pipeline, NL2SQL sandbox, and multi-agent pattern are fully generalizable.
What LLM costs should I expect at scale? The semantic cache and query classification (0 LLM calls for casual/off-topic) significantly reduce costs. Typical analytical queries use 2-3 Claude API calls (Strategist + Analyst + optional Policy Analyst). AWS Bedrock offers reserved throughput pricing for predictable workloads.
How does the system handle hallucination? Multiple defenses: structured citations require source attribution, confidence scores flag uncertain synthesis, the SQL sandbox executes against real data (not LLM generation), and replanning loops trigger when data is insufficient rather than fabricating answers.
Can I run this without AWS?
The primary LLM path requires AWS Bedrock for Claude and Cohere embeddings. However, the Anthropic API fallback and pluggable ILLMProvider port mean you could implement adapters for OpenAI, local models, or other providers. The core system has no AWS-specific dependencies beyond current adapters.
What's the learning curve for contributing?
Moderate — the spec-driven design process adds upfront reading but prevents architectural drift. Start with specs/README.md and specs/constitution.md. The make code.check command validates your changes before PR submission.
How does this differ from ChatGPT with browsing? ChatGPT browsing performs single-step web retrieval with no structured decomposition, no SQL execution against cached datasets, no policy evaluation framework, and no provenance tracking. OpenArg Backend is engineered for verifiable analytical workflows, not conversational convenience.
Is there a frontend, or is this API-only? The openarg_frontend repository provides the web interface. The backend is fully API-driven, enabling custom clients, mobile apps, or integration into existing workflows.
The Verdict: Why OpenArg Backend Matters
In a landscape of AI demos and prototype-grade "solutions," OpenArg Backend stands out as production civic infrastructure with architectural integrity. The hexagonal design, spec-driven development, multi-agent pipeline with replanning, and operational maturity of its Celery worker ecosystem demonstrate what happens when skilled engineers treat government data accessibility as a serious systems problem — not a hackathon project.
The choice of LangGraph over simpler chains, the investment in nine specialized connectors rather than generic RAG, and the explicit policy analysis agent for DNFCG criteria all signal domain expertise that can't be faked. This is technology built by people who understand both AI engineering and the messy reality of government data.
For developers building production LLM systems: study the query pipeline specification, the port-adapter patterns, and the semantic caching implementation. For civic technologists: this is a blueprint for making open data actually open. For Argentines: this is your tax data, made intelligible.
Clone it, extend it, deploy it. The code is waiting at github.com/colossus-lab/openarg_backend.