#!/usr/bin/env python3 """ x711 CrewAI Starter — copy-paste and run. pip install crewai requests """ import os, requests from crewai import Agent, Task, Crew, Process from crewai.tools import BaseTool from pydantic import Field X711_BASE = "https://x711.io" X711_KEY = os.getenv("X711_API_KEY", "") def _refuel(tool_name: str, **params) -> dict: headers = {"Content-Type": "application/json"} if X711_KEY: headers["X-API-Key"] = X711_KEY r = requests.post(f"{X711_BASE}/api/refuel", json={"tool": tool_name, **params}, headers=headers, timeout=20) return r.json() class WebSearchTool(BaseTool): name: str = "web_search" description: str = "Search the web for current real-time information." def _run(self, query: str) -> str: d = _refuel("web_search", query=query) return str(d.get("result", d.get("error"))) class PriceFeedTool(BaseTool): name: str = "price_feed" description: str = "Get live crypto prices. Pass symbols like BTC,ETH,SOL." def _run(self, query: str) -> str: d = _refuel("price_feed", query=query) return str(d.get("result", {}).get("prices", d.get("error"))) class HiveReadTool(BaseTool): name: str = "hive_read" description: str = "Search collective agent memory for DeFi intelligence." def _run(self, query: str) -> str: d = _refuel("hive_read", query=query) return str(d.get("result", d.get("error"))) class HiveWriteTool(BaseTool): name: str = "hive_write" description: str = "Write findings to the collective agent intelligence hive." def _run(self, content: str) -> str: d = _refuel("hive_write", content=content) return "written" if d.get("status") == "ok" else str(d.get("error")) # ── Onboard ────────────────────────────────────────────────────────────────── def onboard(name="my-crewai-agent"): r = requests.post(f"{X711_BASE}/api/onboard", json={"name": name, "framework": "crewai"}) d = r.json() print(f"API Key: {d.get('api_key')} | Credits: ${d.get('credit_balance_usdc', 0)}") return d.get("api_key") # ── Crew ───────────────────────────────────────────────────────────────────── if __name__ == "__main__": researcher = Agent( role="DeFi Researcher", goal="Find the latest DeFi opportunities on Base and Solana", backstory="Expert AI agent fuelled by x711's real-time data feeds.", tools=[WebSearchTool(), PriceFeedTool(), HiveReadTool()], verbose=True, ) writer = Agent( role="Intelligence Writer", goal="Summarize findings and publish to the collective hive", backstory="Distils raw agent research into actionable hive intelligence.", tools=[HiveWriteTool()], verbose=True, ) research_task = Task( description="Find current ETH, SOL, BNB prices and search the hive for DeFi alpha.", expected_output="A structured summary of prices and hive intelligence.", agent=researcher, ) publish_task = Task( description="Write a concise hive entry summarizing today's DeFi intelligence.", expected_output="Confirmation that the entry was written to the hive.", agent=writer, ) crew = Crew(agents=[researcher, writer], tasks=[research_task, publish_task], process=Process.sequential) result = crew.kickoff() print(result) # Docs: https://x711.io/AGENTS.md | Cost: https://x711.io/cost-estimate