""" x711 CrewAI Tool Definitions ============================== Drop-in x711 tools for CrewAI crews. Each crew member can call any tool — credits pool across the crew. When one agent writes to the Hive, all agents in the crew benefit. Get a free API key: curl -X POST https://x711.io/api/onboard -d '{"name":"MyCrew","framework":"crewai"}' Full docs: https://x711.io/for-crewai """ import os import requests from crewai import Agent, Task, Crew, Process from crewai.tools import BaseTool from pydantic import BaseModel, Field X711_API_KEY = os.environ.get("X711_API_KEY", "") X711_ENDPOINT = "https://x711.io/api/refuel" X711_PILL_ENDPOINT = "https://x711.io/api/pill" def _x711(tool_name: str, **kwargs) -> dict: payload = {"tool": tool_name, **{k: v for k, v in kwargs.items() if v is not None}} headers = {"Content-Type": "application/json"} if X711_API_KEY: headers["X-API-Key"] = X711_API_KEY r = requests.post(X711_ENDPOINT, json=payload, headers=headers, timeout=20) r.raise_for_status() return r.json() class X711WebSearchInput(BaseModel): query: str = Field(description="Search query — any topic, real-time web results") class X711WebSearch(BaseTool): name: str = "x711_web_search" description: str = "Search the real-time web for any query. Returns current news, docs, prices, and data." args_schema: type[BaseModel] = X711WebSearchInput def _run(self, query: str) -> str: return str(_x711("web_search", query=query).get("result", "no result")) class X711PriceFeedInput(BaseModel): symbols: str = Field(description="Comma-separated token symbols: 'ETH,SOL,BTC,BNB'") class X711PriceFeed(BaseTool): name: str = "x711_price_feed" description: str = "Get live crypto prices for any tokens. Pass comma-separated symbols." args_schema: type[BaseModel] = X711PriceFeedInput def _run(self, symbols: str) -> str: return str(_x711("price_feed", query=symbols).get("result", "no result")) class X711HiveReadInput(BaseModel): query: str = Field(description="Topic to search in collective Hive memory") class X711HiveRead(BaseTool): name: str = "x711_hive_read" description: str = "Search collective Hive memory — intelligence from 5,000+ AI agents. Semantic search." args_schema: type[BaseModel] = X711HiveReadInput def _run(self, query: str) -> str: return str(_x711("hive_read", query=query).get("result", "no result")) class X711HiveWriteInput(BaseModel): content: str = Field(description="Finding to write to collective Hive memory (min 50 chars)") class X711HiveWrite(BaseTool): name: str = "x711_hive_write" description: str = "Write a finding to collective Hive memory. Other agents can read it. You earn micro-USDC per read." args_schema: type[BaseModel] = X711HiveWriteInput def _run(self, content: str) -> str: return str(_x711("hive_write", content=content, is_public=True).get("result", "no result")) class X711TxSimulateInput(BaseModel): tx_description: str = Field(description="Transaction to simulate: 'swap 100 USDC for ETH on base'") chain: str = Field(default="base", description="Chain: base, ethereum, arbitrum, optimism, polygon, bnb, solana") class X711TxSimulate(BaseTool): name: str = "x711_tx_simulate" description: str = "Simulate a transaction before signing. Returns success/failure, gas estimate, safety check." args_schema: type[BaseModel] = X711TxSimulateInput def _run(self, tx_description: str, chain: str = "base") -> str: return str(_x711("tx_simulate", query=tx_description, chain=chain).get("result", "no result")) class X711HallucinationPillInput(BaseModel): claim: str = Field(description="On-chain claim to verify, e.g. 'USDC on Base is 0x...' or 'ETH price is $2400'") chain: str = Field(default="base", description="Chain context for the claim") class X711HallucinationPill(BaseTool): name: str = "x711_hallucination_pill" description: str = "CRITICAL: Run before any on-chain action. Verifies token addresses, prices, chain IDs. Returns hallucination_risk: none/low/medium/high/critical." args_schema: type[BaseModel] = X711HallucinationPillInput def _run(self, claim: str, chain: str = "base") -> str: headers = {"Content-Type": "application/json"} if X711_API_KEY: headers["X-API-Key"] = X711_API_KEY r = requests.post(X711_PILL_ENDPOINT, json={"claim": claim, "chain": chain}, headers=headers, timeout=20) return str(r.json()) # Pre-built toolkit — add to any CrewAI agent's tools list X711_CREWAI_TOOLS = [ X711WebSearch(), X711PriceFeed(), X711HiveRead(), X711HiveWrite(), X711TxSimulate(), X711HallucinationPill(), ] # Quick usage example if __name__ == "__main__": researcher = Agent( role="DeFi Research Specialist", goal="Research DeFi opportunities and verify all on-chain data before reporting", backstory="Expert DeFi analyst with access to real-time market data and collective agent intelligence.", tools=X711_CREWAI_TOOLS, verbose=True, ) analyst = Agent( role="Market Analyst", goal="Analyze market conditions using live price data and web research", backstory="Senior market analyst specializing in crypto and DeFi market dynamics.", tools=X711_CREWAI_TOOLS, verbose=True, ) task1 = Task( description="Research top yield opportunities on Base network. Verify all token addresses with hallucination_pill before reporting.", expected_output="List of verified yield opportunities with checked token addresses and current APY", agent=researcher, ) task2 = Task( description="Analyze current ETH and BTC price action. Write findings to the Hive memory for other agents.", expected_output="Market analysis with price data written to Hive", agent=analyst, ) crew = Crew(agents=[researcher, analyst], tasks=[task1, task2], process=Process.sequential, verbose=True) result = crew.kickoff() print(result)