Examples
Prerequisites
pip install livekit-api livekit
Scripts
Script |
What it does |
|---|---|
|
Print video frame stats |
|
Send a walk command |
|
Print robot state @ 50 Hz |
|
Save state to CSV |
Running
# Generate a token first
./scripts/generate_token.py \
--api-key YOUR_KEY \
--api-secret YOUR_SECRET \
--room robot-room \
--identity test-client \
--role operator
# Run any script
python3 view_stream.py --token "eyJ..."
Common Pattern
All scripts follow the same structure:
import asyncio
from livekit import rtc
LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"
async def main():
room = rtc.Room()
await room.connect(LIVEKIT_URL, TOKEN)
print(f"Connected to {room.name}")
# Your logic here
await room.disconnect()
asyncio.run(main())
View Stream
#!/usr/bin/env python3
"""Print video frame stats."""
import asyncio
from livekit import rtc
LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"
async def main():
room = rtc.Room()
await room.connect(LIVEKIT_URL, TOKEN)
print(f"Connected to room: {room.name}")
@room.on("track_subscribed")
def on_track(track, publication, participant):
if isinstance(track, rtc.VideoTrack):
print(f"Video track: {track.name}")
@track.on("frame_received")
def on_frame(frame):
print(f"Frame: {frame.width}x{frame.height}")
await asyncio.sleep(30)
await room.disconnect()
asyncio.run(main())
Send Command
#!/usr/bin/env python3
"""Send a walk forward command."""
import asyncio
import struct
from livekit import rtc
LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"
async def main():
room = rtc.Room()
await room.connect(LIVEKIT_URL, TOKEN)
# Wait for DataChannel
await asyncio.sleep(1)
# TeleopCommand_S: mode, vx, vy, vyaw, gait_mode, enable, e_stop
cmd = struct.pack('<BfffBBB',
3, # CONTROL_MODE_MOVE
0.3, # vx = 0.3 m/s forward
0.0, # vy = 0
0.0, # vyaw = 0
0, # gait_mode
1, # enable = true
0 # e_stop = false
)
await room.local_participant.publish_data(cmd, reliable=True)
print("Sent walk command")
await room.disconnect()
asyncio.run(main())
Read State
#!/usr/bin/env python3
"""Print robot state at 50 Hz."""
import asyncio
import struct
from livekit import rtc
LIVEKIT_URL = "wss://your-server.com"
TOKEN = "your-token"
async def main():
room = rtc.Room()
await room.connect(LIVEKIT_URL, TOKEN)
@room.on("data_received")
def on_data(data, participant, kind):
# TeleopState_S header: timestamp, sequence, mode, motors, estop
if len(data) >= 15:
ts, seq, mode, motors, estop = struct.unpack('<QIbbb', data[:15])
print(f"[{seq}] Mode: {mode} | Motors: {motors} | E-Stop: {estop}")
await asyncio.sleep(60)
await room.disconnect()
asyncio.run(main())