All demos
Data IntelligenceLlamaIndex

Telemetry Fusion

Fuse telemetry + service manuals into a maintenance diagnosis brief

Ready to replay
0.0s / 9.9s

Security Pipeline

Input
Sandbox
Network
PII Scan
Injection
Vault
LLM Call
Result
Real run · LlamaIndex · python run.py · captured 2026-06-30 · SDK 1.3.0
Run it yourself
View the agent code· with Declawdata-intelligence-workflows/sandboxed/02-telemetry-fusion-llamaindex/run.py
"""W2 sandboxed — Telemetry + Manuals Fusion inside one declaw microVM.

Untrusted content (service manuals) can embed indirect prompt-injection,
so this policy turns `InjectionDefenseConfig(action='log_only')` on —
audit detections without blocking the workflow.
"""
from __future__ import annotations

import sys
import textwrap
from pathlib import Path

REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(REPO_ROOT))
sys.path.insert(0, str(REPO_ROOT / "sandboxed"))
from shared.mock_manuals import MANUALS, fetch_workorder_history  # noqa: E402
from shared.mock_timeseries import READINGS  # noqa: E402
from shared.declaw_helpers import (  # noqa: E402
    llm_envs, run_python_in_sandbox, wisdomai_analytics_policy,
)


FUSION_SCRIPT = textwrap.dedent("""
    import asyncio, json
    from llama_index.core.agent.workflow import FunctionAgent
    from llama_index.core.tools import FunctionTool
    from llama_index.llms.openai import OpenAI as LlamaOpenAI

    with open('/tmp/in.json') as f: inp = json.load(f)
    READINGS = inp['readings']
    MANUALS = inp['manuals']
    WORKORDERS = inp['workorders']
    PUMP_ID = inp['pump_id']

    def telemetry_summary(pump_id: str) -> dict:
        \"\"\"Vibration + temp stats and overnight-spike flag for a pump.\"\"\"
        rows = READINGS.get(pump_id, [])
        if not rows: return {'pump_id': pump_id, 'error': 'unknown pump'}
        vib = [r['vibration_mm_s'] for r in rows]
        temp = [r['temp_c'] for r in rows]
        overnight = [r for r in rows if r['ts'][11:13] in ('02','03','04')]
        ov = sum(r['vibration_mm_s'] for r in overnight)/len(overnight)
        day = [r for r in rows if r['ts'][11:13] not in ('02','03','04')]
        dy = sum(r['vibration_mm_s'] for r in day)/len(day)
        return {'pump_id': pump_id, 'n': len(rows),
                'vibration_avg': round(sum(vib)/len(vib),2),
                'temp_avg': round(sum(temp)/len(temp),2),
                'overnight_vibration_avg': round(ov,2),
                'daytime_vibration_avg': round(dy,2),
                'overnight_spike_detected': ov > dy*1.5}

    def manuals_search(keywords: str) -> list:
        \"\"\"Search service-manual corpus for sections matching keywords.\"\"\"
        toks = [t.lower() for t in keywords.split() if len(t)>=4]
        hits = []
        for mid, m in MANUALS.items():
            text = m['sections'].lower()
            if any(t in text for t in toks):
                hits.append({'manual_id': mid, 'title': m['title'],
                             'excerpt': m['sections'][:900]})
        return hits

    def workorder_history(pump_id: str) -> list:
        \"\"\"CMMS work-order history for a pump.\"\"\"
        return WORKORDERS.get(pump_id, [])

    async def main():
        agent = FunctionAgent(
            tools=[
                FunctionTool.from_defaults(fn=telemetry_summary),
                FunctionTool.from_defaults(fn=manuals_search),
                FunctionTool.from_defaults(fn=workorder_history),
            ],
            llm=LlamaOpenAI(model='gpt-4.1'),
            system_prompt=(
                'You are a biomed engineering analyst. Step 1: call '
                f'telemetry_summary(\"{PUMP_ID}\"). Step 2: if '
                'overnight_spike_detected, call manuals_search. Step 3: '
                f'call workorder_history(\"{PUMP_ID}\"). Step 4: emit a '
                'plain-text brief with diagnosis (one sentence), '
                'recommended work-order code, and manual section cited.'
            ),
        )
        resp = await agent.run(user_msg=f'Diagnose {PUMP_ID}.')
        with open('/tmp/out.json','w') as f: json.dump({'brief': str(resp)}, f)

    asyncio.run(main())
""")


def main() -> None:
    pump_id = "pump-14"
    print("=== W2 Telemetry Fusion (sandboxed, 1 microVM, injection scan ON) ===\n")
    out = run_python_in_sandbox(
        "telemetry-fusion", FUSION_SCRIPT,
        wisdomai_analytics_policy(enable_injection_scan=True),
        payload={
            "pump_id": pump_id,
            "readings": READINGS,
            "manuals": MANUALS,
            "workorders": {"pump-14": fetch_workorder_history("pump-14")},
        },
        envs=llm_envs(),
        timeout=300,
    )
    print("\n--- Diagnosis brief ---")
    print(out.get("brief", ""))


if __name__ == "__main__":
    main()
View raw audit JSON
[
  {
    "atMs": 450,
    "kind": "stage",
    "payload": {
      "stage": "input",
      "status": "done",
      "detail": "read input files in-VM"
    }
  },
  {
    "atMs": 2160,
    "kind": "stage",
    "payload": {
      "stage": "sandbox",
      "status": "done",
      "detail": "1 Firecracker microVM(s) · own kernel · egress-locked"
    }
  },
  {
    "atMs": 3870,
    "kind": "network",
    "payload": {
      "event": "egress_allowed",
      "detail": {
        "host": "api.openai.com",
        "port": 443,
        "reason": "allowlist"
      }
    }
  },
  {
    "atMs": 5580,
    "kind": "security",
    "payload": {
      "event": "vault_brokered",
      "detail": {
        "keys": "OPENAI_API_KEY",
        "host": "api.openai.com",
        "injected_at": "egress proxy",
        "exposure_to_vm": "none (declaw:vault-managed placeholder)"
      }
    }
  },
  {
    "atMs": 7290,
    "kind": "stage",
    "payload": {
      "stage": "llm",
      "status": "done",
      "detail": "model called from inside the microVM (PII redacted on the wire) · 20.0s real",
      "durationMs": 2460
    }
  },
  {
    "atMs": 9000,
    "kind": "decision",
    "payload": {
      "text": "Diagnosis brief + recommended work-order code drafted — for a technician"
    }
  }
]