Examples

Prerequisites

pip install livekit-api livekit

Scripts

Script

What it does

view_stream.py

Print video frame stats

send_command.py

Send a walk command

read_state.py

Print robot state @ 50 Hz

record_session.py

Save state to CSV

Running

# Generate a token first
./scripts/generate_token.py \
  --api-key YOUR_KEY \
  --api-secret YOUR_SECRET \
  --room robot-room \
  --identity test-client \
  --role operator

# Run any script
python3 view_stream.py --token "eyJ..."

Common Pattern

All scripts follow the same structure:

import asyncio
from livekit import rtc

LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"

async def main():
    room = rtc.Room()
    await room.connect(LIVEKIT_URL, TOKEN)
    print(f"Connected to {room.name}")

    # Your logic here

    await room.disconnect()

asyncio.run(main())

View Stream

#!/usr/bin/env python3
"""Print video frame stats."""
import asyncio
from livekit import rtc

LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"

async def main():
    room = rtc.Room()
    await room.connect(LIVEKIT_URL, TOKEN)
    print(f"Connected to room: {room.name}")

    @room.on("track_subscribed")
    def on_track(track, publication, participant):
        if isinstance(track, rtc.VideoTrack):
            print(f"Video track: {track.name}")

            @track.on("frame_received")
            def on_frame(frame):
                print(f"Frame: {frame.width}x{frame.height}")

    await asyncio.sleep(30)
    await room.disconnect()

asyncio.run(main())

Send Command

#!/usr/bin/env python3
"""Send a walk forward command."""
import asyncio
import struct
from livekit import rtc

LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"

async def main():
    room = rtc.Room()
    await room.connect(LIVEKIT_URL, TOKEN)

    # Wait for DataChannel
    await asyncio.sleep(1)

    # TeleopCommand_S: mode, vx, vy, vyaw, gait_mode, enable, e_stop
    cmd = struct.pack('<BfffBBB',
        3,      # CONTROL_MODE_MOVE
        0.3,    # vx = 0.3 m/s forward
        0.0,    # vy = 0
        0.0,    # vyaw = 0
        0,      # gait_mode
        1,      # enable = true
        0       # e_stop = false
    )

    await room.local_participant.publish_data(cmd, reliable=True)
    print("Sent walk command")

    await room.disconnect()

asyncio.run(main())

Read State

#!/usr/bin/env python3
"""Print robot state at 50 Hz."""
import asyncio
import struct
from livekit import rtc

LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"

async def main():
    room = rtc.Room()
    await room.connect(LIVEKIT_URL, TOKEN)

    @room.on("data_received")
    def on_data(data, participant, kind):
        # TeleopState_S header: timestamp, sequence, mode, motors, estop
        if len(data) >= 15:
            ts, seq, mode, motors, estop = struct.unpack('<QIbbb', data[:15])
            print(f"[{seq}] Mode: {mode} | Motors: {motors} | E-Stop: {estop}")

    await asyncio.sleep(60)
    await room.disconnect()

asyncio.run(main())