Skip to main content

Why Track Entities?

Entity attributes change over time:
  • Injury statuses update
  • Approval ratings shift
  • Positions change
  • News breaks
These changes often precede market price movements. By tracking entities, you can:
  • Detect signals before markets react
  • Understand which markets are affected
  • Build event-driven trading systems

Polling Pattern

For real-time notifications, B2B API subscribers can use webhooks. For custom tracking logic, use polling:
import time
import requests
from typing import Dict, Any

class EntityTracker:
    def __init__(self, api_key: str = None):
        self.base_url = "https://api.marketmotion.xyz/api"
        self.headers = {"X-API-Key": api_key} if api_key else {}
        self.cache: Dict[str, Any] = {}

    def fetch_entity(self, slug: str) -> dict:
        """Fetch current entity state."""
        response = requests.get(
            f"{self.base_url}/entities/{slug}",
            headers=self.headers
        )
        return response.json()["entity"]

    def detect_changes(self, slug: str) -> list:
        """Compare current state to cached state."""
        current = self.fetch_entity(slug)
        changes = []

        if slug in self.cache:
            previous = self.cache[slug]

            # Compare attributes
            for key, value in current.get("attributes", {}).items():
                prev_value = previous.get("attributes", {}).get(key)
                if prev_value != value:
                    changes.append({
                        "type": "attribute_change",
                        "entity": slug,
                        "attribute": key,
                        "old_value": prev_value,
                        "new_value": value,
                        "markets": current.get("marketExposures", [])
                    })

        # Update cache
        self.cache[slug] = current
        return changes

    def track(self, slugs: list, interval: int = 60):
        """Continuously track entities for changes."""
        print(f"Tracking {len(slugs)} entities...")

        # Initial fetch to populate cache
        for slug in slugs:
            self.cache[slug] = self.fetch_entity(slug)

        while True:
            time.sleep(interval)

            for slug in slugs:
                changes = self.detect_changes(slug)
                for change in changes:
                    self.on_change(change)

    def on_change(self, change: dict):
        """Override this to handle changes."""
        print(f"Change detected: {change['entity']}")
        print(f"  {change['attribute']}: {change['old_value']} -> {change['new_value']}")
        print(f"  Affected markets: {len(change['markets'])}")

Category Monitoring

Track all entities in a category:
def get_category_entities(category: str, subcategory: str = None) -> list:
    """Get all entity slugs in a category."""
    params = {"category": category, "limit": 100}
    if subcategory:
        params["subcategory"] = subcategory

    response = requests.get(
        f"{BASE_URL}/entities",
        params=params
    )
    return [e["slug"] for e in response.json()["items"]]

# Track all NFL players
tracker = EntityTracker(api_key="your_key")
nfl_players = get_category_entities("sports", "nfl")
tracker.track(nfl_players, interval=300)  # Every 5 minutes

Event-Driven Architecture

from dataclasses import dataclass
from typing import Callable

@dataclass
class EntityEvent:
    entity_slug: str
    event_type: str
    data: dict

class EventDrivenTracker(EntityTracker):
    def __init__(self, api_key: str = None):
        super().__init__(api_key)
        self.handlers: Dict[str, list] = {}

    def on(self, event_type: str, handler: Callable):
        """Register an event handler."""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)

    def emit(self, event: EntityEvent):
        """Emit an event to handlers."""
        for handler in self.handlers.get(event.event_type, []):
            handler(event)
        for handler in self.handlers.get("*", []):  # Catch-all
            handler(event)

    def on_change(self, change: dict):
        """Convert change to event and emit."""
        event = EntityEvent(
            entity_slug=change["entity"],
            event_type=f"attribute:{change['attribute']}",
            data=change
        )
        self.emit(event)

# Usage
tracker = EventDrivenTracker(api_key="your_key")

# Handle injury status changes
def on_injury_change(event: EntityEvent):
    new_status = event.data["new_value"]
    markets = event.data["markets"]

    if new_status == "out":
        print(f"ALERT: {event.entity_slug} is OUT")
        for market in markets:
            print(f"  Consider: {market['market']['title']}")

tracker.on("attribute:injury_status", on_injury_change)

# Handle any attribute change
def log_all_changes(event: EntityEvent):
    print(f"[{event.event_type}] {event.entity_slug}")

tracker.on("*", log_all_changes)

# Start tracking
tracker.track(["patrick-mahomes", "travis-kelce", "isaiah-pacheco"])

Tracking Specific Attributes

INJURY_SEVERITY = {
    "healthy": 0,
    "questionable": 1,
    "doubtful": 2,
    "out": 3
}

def on_injury_change(event):
    old = INJURY_SEVERITY.get(event.data["old_value"], 0)
    new = INJURY_SEVERITY.get(event.data["new_value"], 0)

    if new > old:
        print(f"Injury worsened: {event.entity_slug}")
        # Signal to sell/short related markets
    elif new < old:
        print(f"Injury improved: {event.entity_slug}")
        # Signal to buy related markets

Efficient Polling

Don’t fetch entities one by one. Use the list endpoint with filtering.
  • Sports injuries: 5-15 minutes
  • Political approval: 1 hour
  • Crypto prices: 1 minute (use dedicated feeds)
Store entity state locally. Only fetch when you need to check for changes.
With a Live API key (300 req/min), you can track ~300 entities per minute at 1-minute intervals.

Webhooks (B2B API)

B2B API subscribers with Pro tier keys can register webhooks for real-time notifications on injury alerts, arbitrage opportunities, market gaps, and more.