Skip to content

Python Examples

This file provides brief examples for the publicly available functions and types in the API, demonstrating how to use these interfaces.

Robot Joint Names (G1 2.2)

Joint Group List

Robot joint group names include: ["head", "left_arm", "right_arm", "leg", "left_gripper", "right_gripper", "left_suction_cup", "right_suction_cup"]

Detailed Information for Each Joint Group

Joint Group English Name Number of Joints Joint Name List
Head head 2 head_joint1, head_joint2
Legs leg 5 leg_joint1, leg_joint2, leg_joint3, leg_joint4, leg_joint5
Left Arm left_arm 7 left_arm_joint1, left_arm_joint2, left_arm_joint3, left_arm_joint4, left_arm_joint5, left_arm_joint6, left_arm_joint7
Right Arm right_arm 7 right_arm_joint1, right_arm_joint2, right_arm_joint3, right_arm_joint4, right_arm_joint5, right_arm_joint6, right_arm_joint7
Left Gripper left_gripper 1 left_gripper_joint1
Right Gripper right_gripper 1 right_gripper_joint1
Left Suction Cup left_suction_cup 1 left_suction_cup_joint1
Right Suction Cup right_suction_cup 1 right_suction_cup_joint1

Class: GalbotRobot

Tips: If you get data immediately after program startup, the data may not be ready right away. You may sleep for a few seconds as appropriate.

Get Instance and Initialize (get_instance && init)

examples/python/galbot_robot/get_instance.py
from galbot_sdk.g1 import GalbotRobot
import time

# 获取 GalbotRobot 的单例
robot = GalbotRobot.get_instance()

state = robot.init()
if not state:
    print("初始化失败")
else:
    print("初始化成功")

while robot.is_running():
    # 业务逻辑
    time.sleep(1)
    break

# 发出退出信号退出程序
robot.request_shutdown()
# 等待进入退出状态
robot.wait_for_shutdown()
# SDK相关资源释放
robot.destroy()

Log interface

examples/python/galbot_robot/logger_example.py
import galbot_sdk

cfg = {
    # 日志存放目录,如果为空,则默认生成在 ~/galbot_sdk_log/user_log
    "path": "",

    # 日志文件名,如果为空,则默认名称为 <process_name>_<current_time>_<pid>_<thread_id>.log
    "file_name": "",

    # 单个日志文件最大字节数,如果日志超过此大小,会切换到新文件
    "file_max_size": 10 * 1024 * 1024,  # 10MB

    # 循环日志文件最大数量,超过数量时会覆盖最旧的日志文件
    "file_max_num": 5,

    # 是否在控制台输出日志,默认 False
    "console_output": True,

    # 日志输出等级,可用值:debug, info, warning, error, critical
    "level": "info",
}

galbot_sdk.init_logger(cfg)

# 写日志
galbot_sdk.debug("Debug example")
galbot_sdk.info("Info example")
galbot_sdk.warning("Warning example")
galbot_sdk.error("Error example")
galbot_sdk.critical("Critical example")

Set Joint Positions (set_joint_positions)

examples/python/galbot_robot/set_joint_positions.py
import time
from galbot_sdk.g1 import GalbotRobot, ControlStatus

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()
print('初始化成功')

# 程序立即启动,稍等数据就绪时间
time.sleep(2)

# 设置头部两个关节角度为0.2,0.2,阻塞等待动作执行到位,最大超时时间为10s
joint_pos = [0.2, 0.2]
# 设置头部关节组,如为空将默认填写全身关节["leg", "head", "left_arm", "right_arm"]
joint_groups = ["head"]
# 是否阻塞等待关节运行到位
is_blocking = True
# 限制关节最大运行速度为0.1rad/s
max_speed = 0.1
# 阻塞等待最大时间
timeout_s = 10

status = robot.set_joint_positions(
    joint_pos, joint_groups, [], is_blocking, max_speed, timeout_s
)

if status != ControlStatus.SUCCESS:
    print("关节角设置失败")
else:
    print('关节角设置成功')

time.sleep(1)

# 使用特定关节名称进行控制,该参数将覆盖joint_groups关节组参数
joint_names = ["head_joint1", "head_joint2"]
joint_pos = [0.0, 0.0]

status = robot.set_joint_positions(
    joint_pos, [], joint_names, is_blocking, max_speed, timeout_s
)

if status != ControlStatus.SUCCESS:
    print("关节角设置失败")
else:
    print('关节角设置成功')

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Set Gripper Command (set_gripper_command)

examples/python/galbot_robot/set_gripper_command.py
import time
from galbot_sdk.g1 import GalbotRobot, JointGroup, ControlStatus

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()
print('初始化成功')
# 程序立即启动,稍等数据就绪时间
time.sleep(2)

# 设置左夹爪宽度为0.02m,运行速度为0.05m,力矩为10N,将阻塞等待夹爪运行到位
status = robot.set_gripper_command(
    JointGroup.LEFT_GRIPPER, 0.02, 0.05, 10, True
)
if status != ControlStatus.SUCCESS:
    print("设置夹爪失败")
else:
    print('设置夹爪成功')

# 设置左夹爪宽度为0.1m,运行速度为0.05m,力矩为10N,将阻塞等待夹爪运行到位
status = robot.set_gripper_command(
    JointGroup.LEFT_GRIPPER, 0.1, 0.05, 10, True
)

if status != ControlStatus.SUCCESS:
    print("设置夹爪失败")
else:
    print('设置夹爪成功')

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Set Suction Cup Command (set_suction_cup_command)

examples/python/galbot_robot/set_suction_cup_command.py
from galbot_sdk.g1 import GalbotRobot, JointGroup, ControlStatus
import time

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()
time.sleep(1)
print('初始化成功')

# 设置吸盘所属的关节组(右吸盘)
joint_group = JointGroup.RIGHT_SUCTION_CUP

# 是否激活吸盘
activate = True  # True:激活吸盘,False:关闭吸盘

# 发送吸盘控制指令
status = robot.set_suction_cup_command(
    joint_group,
    activate
)

# 检查执行结果
if status != ControlStatus.SUCCESS:
    print("设置吸盘失败")
else:
    print("设置吸盘成功")

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Stop Trajectory Execution (stop_trajectory_execution)

examples/python/galbot_robot/stop_trajectory_execution.py
from galbot_sdk.g1 import GalbotRobot, ControlStatus
import time

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()
time.sleep(2)
print("初始化成功")

# 发送停止轨迹执行指令
while True:
    status = robot.stop_trajectory_execution()

    # 检查执行结果
    if status == ControlStatus.SUCCESS:
        print('停止轨迹执行成功')
        break

    print("停止轨迹执行失败,重试中...")

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Set Trajectory (execute_joint_trajectory)

examples/python/galbot_robot/execute_joint_trajectory.py
from galbot_sdk.g1 import GalbotRobot, JointGroup, ControlStatus, Trajectory, TrajectoryPoint,JointCommand
import time
import numpy as np
from typing import List

# 生成轨迹单点,包含位置与时间信息
def generate_target_point(q: List[float], target_time: float = 10):
    """Generate target for joints"""
    joint_position = TrajectoryPoint()
    joint_position.time_from_start_second = target_time
    joint_command_vec = []
    for joint in q:
        joint_cmd = JointCommand()
        joint_cmd.position = joint
        joint_command_vec.append(joint_cmd)
    joint_position.joint_command_vec = joint_command_vec
    return joint_position

def generate_target_trajectory(trajectory, joint_groups=[], joint_names=[], dt=0.008):
    """Generate trajectory for joints"""
    if trajectory is None or np.ndim(trajectory) != 2 or len(trajectory) == 0:
        return None

    # 创建 Trajectory
    traj = Trajectory()
    traj.joint_groups = joint_groups
    traj.joint_names = joint_names

    time = 0.0
    points = []
    for state in trajectory:
        time += dt
        # 创建单个轨迹点
        traj_point = generate_target_point(state, time)
        points.append(traj_point)

    traj.points = points
    return traj

# 轨迹执行函数
def traj_exec():
    # 获取 GalbotRobot 的单例并初始化,仅需初始化一次
    robot = GalbotRobot.get_instance()
    robot.init()
    time.sleep(1)
    print("初始化成功")

    head_traj = np.linspace(
        [0.0, 0.0],
        [0.5, 0.0],
        num=200,
    )
    # 是否阻塞等待轨迹执行完毕
    is_block = True
    # 指定执行哪个关节组轨迹
    joint_groups = ["head"]
    # 执行指定关节轨迹,如填充将覆盖joint_groups关节组参数
    joint_names = []
    status = robot.execute_joint_trajectory(generate_target_trajectory(head_traj.tolist(), joint_groups, joint_names), is_block)

    # 检查执行结果
    if status != ControlStatus.SUCCESS:
        print("轨迹执行失败")
    else:
        print("轨迹执行成功")

    # 主动发出SIGINT退出信号
    robot.request_shutdown()
    # 等待进入shutdown状态
    robot.wait_for_shutdown()
    # 进行SDK资源释放
    robot.destroy()
    print('资源释放成功')

traj_exec()

Set Joint Commands (set_joint_commands)

examples/python/galbot_robot/set_joint_commands_example.py
import time
import math
from galbot_sdk.g1 import GalbotRobot, JointCommand

def head_high_frequency_control():
    """
    头部高频控制示例
    """

    control_frequency = 100.0  # Hz
    dt = 1.0 / control_frequency
    duration = 4.0  # 控制 4 秒

    amplitude = 0.3  # 最大摆动幅度 (rad)
    frequency = 0.5  # 正弦频率 (Hz)
    # 要控制的关节组名称
    joint_groups = ["head"]
    # 如果要控制指定关节,填充该字段,将会覆盖joint_groups参数。如不填充则默认控制joint_groups中的所有关节
    joint_names = []

    print("开始头部高频控制")

    joint_commands = [JointCommand(), JointCommand()]

    start_time = time.time()

    while True:
        current_time = time.time() - start_time
        if current_time > duration:
            break

        # 生成正弦轨迹
        target_position = amplitude * math.sin(
            2 * math.pi * frequency * current_time
        )

        # 设置头部关节角度
        joint_commands[0].position = target_position
        joint_commands[1].position = target_position
        print(f"当前时间: {current_time:.2f}s, 目标位置: {target_position:.3f} rad")

        # 期望到达时间
        time_from_start_sec = 0.0

        execution_status = GalbotRobot.get_instance().set_joint_commands(
            joint_commands,
            joint_groups,
            joint_names,
            time_from_start_sec
        )

        # 固定周期 sleep
        time.sleep(dt)

    print("头部控制结束")

def main():
    # 获取 GalbotRobot 的单例并初始化
    robot = GalbotRobot.get_instance()

    if robot.init():
        print("系统初始化成功!")
    else:
        print("系统初始化失败!")
        return

    # 程序立即启动,稍等数据就绪时间
    time.sleep(2)

    head_high_frequency_control()

    # 退出系统并进行 SDK 资源释放
    robot.request_shutdown()
    robot.wait_for_shutdown()
    robot.destroy()
    print("程序已退出")


if __name__ == "__main__":
    main()

Set Joint Commands in Batch Mode (set_joint_commands_batch)

examples/python/galbot_robot/set_joint_commands_batch.py
from galbot_sdk.g1 import GalbotRobot, JointGroup, ControlStatus, Trajectory, TrajectoryPoint, JointCommand
import time
import numpy as np
from typing import List

# 生成轨迹单点,包含位置与时间信息
def generate_target_point(q: List[float], target_time: float = 10):
    """Generate target for joints"""
    joint_position = TrajectoryPoint()
    joint_position.time_from_start_second = target_time
    joint_command_vec = []
    for joint in q:
        joint_cmd = JointCommand()
        joint_cmd.position = joint
        joint_cmd.velocity = 0.0
        joint_cmd.acceleration = 0.0
        joint_cmd.effort = 0.0
        # joint_cmd.Kp = 0.0
        # joint_cmd.Kd = 0.0
        joint_command_vec.append(joint_cmd)
    joint_position.joint_command_vec = joint_command_vec
    return joint_position

def generate_batch_trajectory(trajectory, joint_groups=[], joint_names=[], dt=0.008):
    """Generate batch trajectory for joints"""
    if trajectory is None or np.ndim(trajectory) != 2 or len(trajectory) == 0:
        return None

    # 创建 Trajectory
    traj = Trajectory()
    traj.joint_groups = joint_groups
    traj.joint_names = joint_names

    time = 0.0
    points = []
    for state in trajectory:
        time += dt
        # 创建单个轨迹点
        traj_point = generate_target_point(state, time)
        points.append(traj_point)

    traj.points = points
    return traj

# 批量设置关节指令函数
def batch_commands_exec():
    # 获取 GalbotRobot 的单例并初始化,仅需初始化一次
    robot = GalbotRobot.get_instance()
    robot.init()
    time.sleep(1)
    print("初始化成功")

    # 生成批量轨迹数据(包含多个时间点的关节指令)
    head_traj = np.linspace(
        [0.0, 0.0],
        [0.5, 0.0],
        num=10,  # 批量轨迹点数量
    )
    # 指定执行哪个关节组轨迹
    joint_groups = ["head"]
    # 执行指定关节轨迹,如填充将覆盖joint_groups关节组参数
    joint_names = []

    # 批量设置关节指令(非阻塞,立即返回)
    status = robot.set_joint_commands_batch(generate_batch_trajectory(head_traj.tolist(), joint_groups, joint_names))

    # 检查执行结果
    if status != ControlStatus.SUCCESS:
        print("批量指令提交失败")
    else:
        print("批量指令已提交,正在后台执行(非阻塞)")

    # 等待一段时间让指令执行
    time.sleep(1)

    # 主动发出SIGINT退出信号
    robot.request_shutdown()
    # 等待进入shutdown状态
    robot.wait_for_shutdown()
    # 进行SDK资源释放
    robot.destroy()
    print('资源释放成功')

batch_commands_exec()

Set Base Velocity (set_base_velocity)

examples/python/galbot_robot/set_base_velocity.py
from galbot_sdk.g1 import GalbotRobot, ControlStatus
import time

# 获取 GalbotRobot 的单例
robot = GalbotRobot.get_instance()
robot.init()
time.sleep(1)
print("初始化成功")

# 设置底盘速度
linear_velocity = [0.05, 0.0, 0.0]  # 前进 0.5 m/s
angular_velocity = [0.0, 0.0, 0.1]  # 旋转 0.1 rad/s

duration_s = 2.0  # 持续 2 秒后自动停止
status = robot.set_base_velocity(linear_velocity, angular_velocity, duration_s)

if status == ControlStatus.SUCCESS:
    print(f"底盘速度设置成功,将在 {duration_s} 秒后自动停止。")
else:
    print("设置底盘速度失败。")

time.sleep(duration_s + 0.5)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Joint States (get_joint_states)

examples/python/galbot_robot/get_joint_states.py
import time
from galbot_sdk.g1 import GalbotRobot

def print_joint_states(joint_states):
    """
    joint_state_vec: List of JointState, 每个对象具有 position, velocity, acceleration, effort, current
    """
    for js in joint_states:
        print(f" : position = {js.position} , velocity = {js.velocity} "
            f", acceleration = {js.acceleration} , effort = {js.effort} , current = {js.current}")

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()
# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")
# 使用关节组名称获取关节状态,为空默认返回所有关节
joint_group_names = ["left_arm"]
ret = robot.get_joint_states(joint_group_names, [])
print_joint_states(ret)

# 获取指定关节状态,如果填充将覆盖关节组输入
joint_names = ["left_arm_joint1", "left_arm_joint2"]
state_ret = robot.get_joint_states([], joint_names)
print_joint_states(state_ret)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Joint Positions (get_joint_positions)

examples/python/galbot_robot/get_joint_positions.py
import time
from galbot_sdk.g1 import GalbotRobot

def print_joint_positions(joint_positions):
    print(f"pos count is {len(joint_positions)}")
    for pos in joint_positions:
        print(pos)

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()
# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")

# 使用关节组名称获取关节位置,为空默认返回所有关节
joint_group_names = ["left_arm"]
ret = robot.get_joint_positions(joint_group_names, [])
print("左臂关节位置:")
print_joint_positions(ret)
# 获取指定关节位置,如果填充将覆盖关节组输入
joint_names = ["left_arm_joint1", "left_arm_joint2"]
state_ret = robot.get_joint_positions([], joint_names)
print("左臂1、2关节位置:")
print_joint_positions(state_ret)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Joint Names (get_joint_names)

examples/python/galbot_robot/get_joint_names.py
import time
from galbot_sdk.g1 import GalbotRobot

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()
# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")

# 使用关节组名称获取关节位置,为空默认返回所有关节
joint_group_names = ["left_arm"]
# 仅获取可活动关节
only_active_joint = True
ret = robot.get_joint_names(only_active_joint, joint_group_names)
print("Left joint names:")
for i, name in enumerate(ret):
    print(f"{i + 1}: {name}")

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Gripper State (get_gripper_state)

examples/python/galbot_robot/get_gripper_state.py
import time
from galbot_sdk.g1 import GalbotRobot, JointGroup

def print_gripper_state(joint_group, gripper_state):
    """
    joint_group: JointGroup 枚举
    gripper_state: 对象,包含 timestamp_ns, width, velocity, effort, is_moving
    """
    print(f"Timestamp (ns): {gripper_state.timestamp_ns}")
    print(
        f"width {gripper_state.width} "
        f"velocity {gripper_state.velocity} "
        f"effort {gripper_state.effort} "
        f"is moving {gripper_state.is_moving}"
    )

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()

# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")

# 设置夹爪关节组(左夹爪)
joint_group = JointGroup.LEFT_GRIPPER

# 获取夹爪状态
gripper_state = robot.get_gripper_state(joint_group)

if gripper_state is None:
    print("get gripper state error")
else:
    print("左夹爪状态如下:")
    print_gripper_state(joint_group, gripper_state)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Suction Cup State (get_suction_cup_state)

examples/python/galbot_robot/get_suction_cup_state.py
import time
from galbot_sdk.g1 import GalbotRobot, JointGroup

def print_suction_cup_state(suction_cup_state):
    """
    suction_cup_state: 对象,包含 timestamp_ns, pressure, activation, action_state
    """
    group_name = joint_group.name
    print(f"Timestamp (ns): {suction_cup_state.timestamp_ns}")
    print(
        f"pressure {suction_cup_state.pressure} "
        f"activation {suction_cup_state.activation} "
        f"action state {int(suction_cup_state.action_state)}"
    )

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()

# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")

# 设置吸盘所属的关节组(右吸盘)
joint_group = JointGroup.RIGHT_SUCTION_CUP

# 获取吸盘状态
suction_cup_state = robot.get_suction_cup_state(joint_group)

if suction_cup_state is None:
    print("get suction cup error")
else:
    print("右吸盘状态:")
    print_suction_cup_state(suction_cup_state)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Transform (get_transform)

examples/python/galbot_robot/get_transform.py
import time
from galbot_sdk.g1 import GalbotRobot

def print_pose(pose_vec):
    """
    pose_vec: list of floats
    """
    print("pose_vec = [" + ", ".join(str(p) for p in pose_vec) + "]")

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()

# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")

# 设置目标帧和源帧
target_frame = "base_link"
source_frame = "left_arm_link1"
timestamp_ns = 0    # 0为获取最新tf变换值

# 获取坐标变换
ret_val = robot.get_transform(target_frame, source_frame)

if not ret_val[0]:
    print("get_transform error")
else:
    print("tf_timestamp_ns:", ret_val[1])
    print_pose(ret_val[0])

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get IMU Data (get_imu_data)

examples/python/galbot_robot/get_imu_data.py
import time
from galbot_sdk.g1 import GalbotRobot, SensorType
from typing import Dict

def print_imu_data(imu_data: dict):
    """
    imu_data: dict,包含:
        - 'timestamp_ns'
        - 'accel'   : {'x', 'y', 'z'}
        - 'gyro'    : {'x', 'y', 'z'}
        - 'magnet'  : {'x', 'y', 'z'}
    """
    if not imu_data:
        print("IMU data is empty")
        return

    print(f"Timestamp (ns): {imu_data.get('timestamp_ns')}")

    accel = imu_data.get("accel", {})
    gyro = imu_data.get("gyro", {})
    magnet = imu_data.get("magnet", {})

    print(
        f"Accelerometer: x={accel.get('x')}, "
        f"y={accel.get('y')}, "
        f"z={accel.get('z')}"
    )
    print(
        f"Gyroscope:     x={gyro.get('x')}, "
        f"y={gyro.get('y')}, "
        f"z={gyro.get('z')}"
    )
    print(
        f"Magnetometer:  x={magnet.get('x')}, "
        f"y={magnet.get('y')}, "
        f"z={magnet.get('z')}"
    )

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init({SensorType.TORSO_IMU})

# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")

imu_data = robot.get_imu_data(SensorType.TORSO_IMU)
if not imu_data:
    print("No imu data!")
else:
    print("imu数据:")
    print_imu_data(imu_data)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Power Information(get_bms_information)

examples/python/galbot_robot/get_bms_information.py
import time
from galbot_sdk.g1 import GalbotRobot


def print_bms_information(bms_info: dict):
    """
    bms_info: dict,包含:
        - 'voltage'            : float (V)
        - 'current'            : float (A)
        - 'battery_level'      : float (0-100)
        - 'temperature'        : float (°C)
        - 'charging_status'    : str or int
        - 'health_status'      : str or int
        - 'capacity'           : float (Ah)
    """
    if not bms_info:
        print("BMS information is empty")
        return

    print(f"Voltage (V): {bms_info.get('voltage')}")
    print(f"Current (A): {bms_info.get('current')}")
    print(f"Battery level (%): {bms_info.get('battery_level')}")
    print(f"Temperature (C): {bms_info.get('temperature')}")
    print(f"Charging status: {bms_info.get('charging_status')}")
    print(f"Health status: {bms_info.get('health_status')}")
    print(f"Capacity (Ah): {bms_info.get('capacity')}")


# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()

# 程序立即启动,稍等数据就绪时间
time.sleep(3)
print("初始化成功")

bms_info = robot.get_bms_information()
print_bms_information(bms_info)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print("资源释放成功")

Get Device Information (get_device_information)

examples/python/galbot_robot/get_device_information.py
import time
from galbot_sdk.g1 import GalbotRobot
from typing import Dict

def print_device_info(device_info: dict):
    """
    device_info: dict,包含:
        - 'model': 设备型号 (str)
        - 'serial_number': 序列号 (str)
        - 'firmware_version': 固件版本 (str)
        - 'hardware_version': 硬件版本 (str)
        - 'manufacturer': 制造商 (str)
    """
    if not device_info:
        print("设备信息为空")
        return

    print("设备信息:")
    print(f"  型号: {device_info.get('model', 'N/A')}")
    print(f"  序列号: {device_info.get('serial_number', 'N/A')}")
    print(f"  固件版本: {device_info.get('firmware_version', 'N/A')}")
    print(f"  硬件版本: {device_info.get('hardware_version', 'N/A')}")
    print(f"  制造商: {device_info.get('manufacturer', 'N/A')}")

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
robot.init()

# 程序立即启动,稍等数据就绪时间
time.sleep(1)
print("初始化成功")

device_info = robot.get_device_information()
if not device_info:
    print("设备信息获取失败!")
else:
    print("设备信息获取成功!")
    print_device_info(device_info)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Camera Image Data (get_rgb_data && get_depth_data)

examples/python/galbot_robot/get_camera_data.py
try:
    from galbot_sdk.g1  import GalbotRobot, SensorType
except ImportError:
    print("import galbot_sdk failed, please install it first or check if it is in the PYTHONPATH")
    exit(1)

import os

try:
    import cv2
except ImportError:
    os.system("pip install opencv-python")
    import cv2

try:
    import numpy as np
except ImportError:
    os.system("pip install numpy")
    import numpy as np

import time
from typing import Dict

def decode_compressed_image(compressed_image):
    """
    decode CompressedImage image

    Parameters:
        compressed_image (dict): image dict, keys:[header, format, data, "depth_scale"]

    Returns:
        numpy.ndarray: decoded image
    """
    image_data = compressed_image["data"]
    if compressed_image["format"] == "rgb8":
        return decode_rgb_image(image_data)
    elif compressed_image["format"] == "16UC1":
        return decode_depth_image(compressed_image)
    else:
        raise ValueError(f"Unsupport data format: {compressed_image['format']}")

def decode_rgb_image(image_data):
    """decode rgb image"""
    nparr = np.frombuffer(image_data, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    if img is None:
        raise ValueError("Fail to Decode RGB Image")
    return img

def decode_depth_image(image_data):
    """decode depth image"""
    depth_img = np.frombuffer(image_data["data"], dtype=np.uint16).copy()

    # 检查是否存在 height 和 width
    if "height" not in image_data or "width" not in image_data:
        raise ValueError("Missing 'height' or 'width' in depth image metadata.")
    if image_data["height"] == 0 or image_data["width"] == 0:
        raise ValueError(f"Invalid 'height' ({image_data['height']}) or 'width' ({image_data['width']}) in depth image metadata.")

    # 解析深度图像
    depth_img = depth_img.reshape((image_data["height"], image_data["width"]))
    depth_img = depth_img.astype(np.float32) / image_data["depth_scale"]

    return depth_img

def main():
    SHOW_IMAGE = False
    robot = GalbotRobot.get_instance()

    # 获取左臂的RGB图像和深度图像,右臂的深度图像,底座的激光雷达数据,躯干的IMU数据
    enable_sensor_set = {SensorType.LEFT_ARM_CAMERA, # 左臂深度相机
                        SensorType.LEFT_ARM_DEPTH_CAMERA,} # 左臂RGB相机
    robot.init(enable_sensor_set)
    print("初始化成功")
    # 程序立即启动,稍等数据就绪时间
    time.sleep(5)
    # 获取左臂的RGB图像
    rgb_image_data = robot.get_rgb_data(SensorType.LEFT_ARM_CAMERA)
    if not rgb_image_data:
        print("No rgb image data!")
    else:
        print("get rgb image suceess")
        print(rgb_image_data['header'])
        img = decode_compressed_image(rgb_image_data)

        # 保存RGB图像
        cv2.imwrite("rgb_image_data.jpg", img)
        # 可视化RGB图像
        if SHOW_IMAGE:
            cv2.namedWindow("rgb image", cv2.WINDOW_NORMAL)
            cv2.imshow("rgb image", img)
            cv2.waitKey(0)
            cv2.destroyAllWindows()

    # 获取左臂的深度图像
    depth_data = robot.get_depth_data(SensorType.LEFT_ARM_DEPTH_CAMERA)
    if not depth_data or "data" not in depth_data:
        print("Depth camera not ready")
    else:
        print("get depth data suceess")
        print(depth_data['header'])
        depth_img_raw = decode_compressed_image(depth_data)
        depth_img = cv2.normalize(depth_img_raw, None, 0, 255, cv2.NORM_MINMAX) # 归一化,将深度值映射到0-1范围
        depth_img = depth_img.astype(np.uint8)

        # 保存深度图
        cv2.imwrite("depth_data.jpg", depth_img)
        # 可视化深度图
        if SHOW_IMAGE:
            cv2.namedWindow("depth image", cv2.WINDOW_NORMAL)
            cv2.imshow("depth image", depth_img)
            cv2.waitKey(0)
            cv2.destroyAllWindows()

    # 主动发出SIGINT退出信号
    robot.request_shutdown()
    # 等待进入shutdown状态
    robot.wait_for_shutdown()
    # 进行SDK资源释放
    robot.destroy()
    print('资源释放成功')

if __name__=="__main__":
    main()

Get Sensor Parameters (get_camera_intrinsic && get_sensor_extrinsic)

examples/python/galbot_robot/get_camera_params.py
try:
    from galbot_sdk.g1  import GalbotRobot, SensorType
except ImportError:
    print("import galbot_sdk failed, please install it first or check if it is in the PYTHONPATH")
    exit(1)

import os

try:
    import cv2
except ImportError:
    os.system("pip install opencv-python")
    import cv2

try:
    import numpy as np
except ImportError:
    os.system("pip install numpy")
    import numpy as np

import time
from typing import Dict

def decode_compressed_image(compressed_image):
    """
    decode CompressedImage image

    Parameters:
        compressed_image (dict): image dict, keys:[header, format, data, "depth_scale"]

    Returns:
        numpy.ndarray: decoded image
    """
    image_data = compressed_image["data"]
    if compressed_image["format"] == "rgb8":
        return decode_rgb_image(image_data)
    elif compressed_image["format"] == "16UC1":
        return decode_depth_image(compressed_image)
    else:
        raise ValueError(f"Unsupport data format: {compressed_image['format']}")

def decode_rgb_image(image_data):
    """decode rgb image"""
    nparr = np.frombuffer(image_data, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    if img is None:
        raise ValueError("Fail to Decode RGB Image")
    return img

def decode_depth_image(image_data):
    """decode depth image"""
    depth_img = np.frombuffer(image_data["data"], dtype=np.uint16).copy()

    # 检查是否存在 height 和 width
    if "height" not in image_data or "width" not in image_data:
        raise ValueError("Missing 'height' or 'width' in depth image metadata.")
    if image_data["height"] == 0 or image_data["width"] == 0:
        raise ValueError(f"Invalid 'height' ({image_data['height']}) or 'width' ({image_data['width']}) in depth image metadata.")

    # 解析深度图像
    depth_img = depth_img.reshape((image_data["height"], image_data["width"]))
    depth_img = depth_img.astype(np.float32) / image_data["depth_scale"]

    return depth_img

def main():
    robot = GalbotRobot.get_instance()

    # 获取左臂的RGB图像和深度图像,右臂的深度图像,底座的激光雷达数据,躯干的IMU数据
    enable_sensor_set = {SensorType.LEFT_ARM_CAMERA, # 左臂深度相机
                        SensorType.LEFT_ARM_DEPTH_CAMERA,} # 左臂RGB相机
    robot.init(enable_sensor_set)
    print("初始化成功")

    # 程序立即启动,稍等数据就绪时间
    time.sleep(5)

    # 获取左臂的RGB图像
    rgb_image_data = robot.get_rgb_data(SensorType.LEFT_ARM_CAMERA)
    if not rgb_image_data:
        print("No rgb image data!")
    else:
        print("get rgb image suceess")
        print(rgb_image_data['header'])
        img = decode_compressed_image(rgb_image_data)

        # 保存RGB图像
        cv2.imwrite("rgb_image_data.jpg", img)

    # 获取左臂的相机内参
    camera_intrinsics = robot.get_camera_intrinsic(SensorType.LEFT_ARM_CAMERA)
    if not camera_intrinsics:
        print("No camera intrinsics data!")
    else:
        print("get camera intrinsics suceess")
        print(camera_intrinsics)

    # 获取左臂的深度图像
    depth_data = robot.get_depth_data(SensorType.LEFT_ARM_DEPTH_CAMERA)
    if not depth_data or "data" not in depth_data:
        print("Depth camera not ready")
    else:
        print("get depth data suceess")
        print(depth_data['header'])
        depth_img_raw = decode_compressed_image(depth_data)
        depth_img = cv2.normalize(depth_img_raw, None, 0, 255, cv2.NORM_MINMAX) # 归一化,将深度值映射到0-1范围
        depth_img = depth_img.astype(np.uint8)

        # 保存深度图
        cv2.imwrite("depth_data.jpg", depth_img)

    # 获取左臂的深度相机内参
    camera_intrinsics = robot.get_camera_intrinsic(SensorType.LEFT_ARM_DEPTH_CAMERA)
    if not camera_intrinsics:
        print("No camera intrinsics data!")
    else:
        print("get camera intrinsics suceess")
        print(camera_intrinsics)

    # 外参
    time.sleep(2)
    camera_extrinsics, timestamp_ns = robot.get_sensor_extrinsic(SensorType.LEFT_ARM_DEPTH_CAMERA)
    if not camera_extrinsics:
        print("No camera extrinsics data!")
    else:
        print("get camera extrinsics suceess")
        print(camera_extrinsics)

    # 主动发出SIGINT退出信号
    robot.request_shutdown()
    # 等待进入shutdown状态
    robot.wait_for_shutdown()
    # 进行SDK资源释放
    robot.destroy()
    print('资源释放成功')

if __name__=="__main__":
    main()

Get Lidar Data (get_lidar_data)

examples/python/galbot_robot/get_lidar_data.py
from galbot_sdk.g1 import GalbotRobot, SensorType
from typing import Dict
import time
import numpy as np

def convert_pointcloud(cloud):
    """
    Convert cloud dict to NumPy array dictionary

    Args:
        pointcloud_msg: PointCloud2 protobuf message object

    Returns:
        Dictionary: {field_name: NumPy array}
        - Single-element fields: shape (N,)
        - Multi-element fields: shape (N, count) or (N,)
        - N = width * height (total number of points)
    """

    if not cloud:
        return {}

    num_points = cloud["height"] * cloud["width"]
    if num_points == 0:
        return {}

    DTYPE_MAP = {
        1: np.int8,
        2: np.uint8,
        3: np.int16,
        4: np.uint16,
        5: np.int32,
        6: np.uint32,
        7: np.float32,
        8: np.float64
    }
    dtype_list = []
    for field in cloud["fields"]:
        # Get base data type
        np_dtype_class = DTYPE_MAP.get(field["datatype"])
        if np_dtype_class is None:
            raise ValueError(f"Unsupported data type: {field['datatype']}")

        dtype_inst = np.dtype(np_dtype_class)

        # Handle byte order (endianness)
        if dtype_inst.itemsize > 1:
            byteorder = '>' if cloud["is_bigendian"] else '<'
            dtype_inst = dtype_inst.newbyteorder(byteorder)

        # Add to dtype list
        if field["count"] == 1:
            dtype_list.append((field["name"], dtype_inst))
        else:
            # Multi-element fields (e.g., rgb)
            dtype_list.append((field["name"], dtype_inst, field["count"]))

    # Create structured dtype
    dtype = np.dtype(dtype_list)

    # Data integrity check
    expected_size = num_points * cloud["point_step"]
    if len(cloud["data"]) < expected_size:
        raise ValueError(
            f"Insufficient data length: expected {expected_size} bytes, "
            f"actual {len(cloud['data'])} bytes"
        )

    # Create NumPy structured array from binary data
    # count parameter ensures only expected number of points are read
    arr = np.frombuffer(cloud["data"], dtype=dtype, count=num_points)

    # Convert to regular dictionary (copy data to avoid modifying original)
    result = {}
    for field in cloud["fields"]:
        field_data = arr[field["name"]]

        # Handle shape of multi-element fields
        if field["count"] == 1:
            result[field["name"]] = field_data.copy()
        else:
            # Keep original shape or flatten, choose according to needs
            result[field["name"]] = field_data.copy()

    return result


def get_xyz_array(pointcloud_dict: Dict[str, np.ndarray], 
                remove_nan: bool = False) -> np.ndarray:
    """
    Extract XYZ coordinate array from converted point cloud dictionary

    Args:
        pointcloud_dict: Dictionary returned by pointcloud2_to_numpy()
        remove_nan: Whether to remove points containing NaN (for FLOAT32/FLOAT64 types)

    Returns:
        Nx3 point coordinate array
    """
    required = ['x', 'y', 'z']
    if not all(k in pointcloud_dict for k in required):
        raise ValueError("Point cloud data missing required xyz fields")

    points = np.stack([pointcloud_dict['x'], 
                    pointcloud_dict['y'], 
                    pointcloud_dict['z']], axis=1)

    if remove_nan:
        mask = ~np.isnan(points).any(axis=1)
        points = points[mask]

    return points

def save_xyz_to_pcd(xyz_array: np.ndarray, filename: str, binary: bool = False) -> None:
    """
    Save XYZ coordinates to PCD file format (simplest option for coordinate-only data)

    Args:
        xyz_array: Nx3 array of XYZ coordinates
        filename: Output PCD file path
        binary: If True, saves in binary format; otherwise ASCII
    """
    if xyz_array.ndim != 2 or xyz_array.shape[1] != 3:
        raise ValueError(f"xyz_array must have shape (N, 3), got {xyz_array.shape}")

    num_points = xyz_array.shape[0]
    header = [
        "# .PCD v0.7 - Point Cloud Data file format",
        "VERSION 0.7",
        "FIELDS x y z",
        "SIZE 4 4 4",
        "TYPE F F F",  # F = float32
        "COUNT 1 1 1",
        f"WIDTH {num_points}",
        "HEIGHT 1",
        "VIEWPOINT 0 0 0 1 0 0 0",
        f"POINTS {num_points}",
        f"DATA {'binary' if binary else 'ascii'}"
    ]

    if binary:
        with open(filename, 'wb') as f:
            f.write(('\n'.join(header) + '\n').encode('ascii'))
            f.write(xyz_array.astype(np.float32).tobytes())
    else:
        with open(filename, 'w') as f:
            f.write('\n'.join(header) + '\n')
            np.savetxt(f, xyz_array, fmt='%f')

# 获取 GalbotRobot 的单例并初始化
robot = GalbotRobot.get_instance()
enable_sensor_set = {SensorType.BASE_LIDAR}
# 为节省资源开销,只有初始化中传入的相机与雷达传感器可获取数据
robot.init(enable_sensor_set)
print("初始化成功")
# 程序立即启动,稍等数据就绪时间
time.sleep(4)

cloud = robot.get_lidar_data(SensorType.BASE_LIDAR)
if not cloud:
    print("No lidar data!")
else:
    pointcloud_dict = convert_pointcloud(cloud)
    xyz_points = get_xyz_array(pointcloud_dict)
    save_xyz_to_pcd(xyz_points, "output_xyz.pcd")
    print(pointcloud_dict)
    print("get lidar data success")

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Whole-Body One-Shot Execution Interface – Chassis Velocity Control (whole_body_execution_vel)

examples/python/galbot_robot/whole_body_execution_vel_example.py
from galbot_sdk.g1 import GalbotRobot
import time


def main():
    robot = GalbotRobot.get_instance()
    if robot.init():
        print("系统初始化成功!")
    else:
        print("系统初始化失败!")
        return

    time.sleep(2)

    # Whole-body joints: leg(5) + head(2) + left_arm(7) + right_arm(7)
    whole_body_joint_1 = [
        0.3, 1.2, 0.85, 0.0, 0.0,        # leg
        0.5, 0.5,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.0,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.0        # right_arm
    ]
    whole_body_joint_2 = [
        0.3, 1.2, 0.85, 0.0, 0.0,        # leg
        0.0, 0.0,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.0,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.0        # right_arm
    ]

    # Base velocity command (twist)
    linear_velocity_1 = [0.1, 0.0, 0.0]
    angular_velocity_1 = [0.0, 0.0, 0.0]
    linear_velocity_2 = [-0.1, 0.0, 0.0]
    angular_velocity_2 = [0.0, 0.0, 0.0]

    print("=== Whole-body + base velocity ===")
    vel_status = robot.execute_whole_body_target(
        whole_body_joint_1,
        linear_velocity_1,
        angular_velocity_1,
        False,
        0.1,
        10.0,
    )
    print("execute_whole_body_target (twist) status:", vel_status)

    # 底盘运动1s后停止
    time.sleep(1)
    robot.stop_base()

    # 等待5s关节运动完成
    time.sleep(5)

    vel_status = robot.execute_whole_body_target(
        whole_body_joint_2,
        linear_velocity_2,
        angular_velocity_2,
        False,
        0.1,
        10.0,
    )
    print("execute_whole_body_target (twist) status:", vel_status)

    # 底盘运动1s后停止
    time.sleep(1)
    robot.stop_base()

    # 等待5s关节运动完成
    time.sleep(5)

    robot.request_shutdown()
    robot.wait_for_shutdown()
    robot.destroy()


if __name__ == "__main__":
    main()

Whole-Body One-Shot Execution Interface – Chassis Position Control (Odometry Frame) (whole_body_execution_odom)

examples/python/galbot_robot/whole_body_execution_odom_example.py
from galbot_sdk.g1 import GalbotRobot
import time


def main():
    robot = GalbotRobot.get_instance()
    if robot.init():
        print("系统初始化成功!")
    else:
        print("系统初始化失败!")
        return

    time.sleep(2)

    # Whole-body joints: leg(5) + head(2) + left_arm(7) + right_arm(7)
    whole_body_joint_1 = [
        0.3, 1.2, 0.85, 0.0, 0.0,        # leg
        0.5, 0.5,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.0,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.0        # right_arm
    ]
    whole_body_joint_2 = [
        0.3, 1.2, 0.85, 0.0, 0.0,        # leg
        0.0, 0.0,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.0,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.0        # right_arm
    ]

    # Base pose command odom(x, y, yaw)
    base_x_1 = 0.2
    base_y_1 = 0.0
    base_yaw_1 = 0.0
    base_x_2 = 0.0
    base_y_2 = 0.0
    base_yaw_2 = 0.0

    # 可选坐标系(frame_id: base_link/odom/map, reference_frame_id: odom/map)
    frame_id = "base_link"
    reference_frame_id = "odom"

    # 底盘位姿插值时间(秒),用于生成平滑底盘轨迹
    base_time_s = 15.0

    print("=== Whole-body + base pose (odom) ===")
    pose_status = robot.execute_whole_body_target(
        whole_body_joint_1,
        base_x_1,
        base_y_1,
        base_yaw_1,
        frame_id,
        reference_frame_id,
        True,
        0.1,
        base_time_s,
        base_time_s,
    )
    print("execute_whole_body_target (pose) status:", pose_status)

    time.sleep(1)

    pose_status = robot.execute_whole_body_target(
        whole_body_joint_2,
        base_x_2,
        base_y_2,
        base_yaw_2,
        frame_id,
        reference_frame_id,
        True,
        0.1,
        base_time_s,
        base_time_s,
    )
    print("execute_whole_body_target (pose) status:", pose_status)

    robot.request_shutdown()
    robot.wait_for_shutdown()
    robot.destroy()


if __name__ == "__main__":
    main()

Whole-Body One-Shot Execution Interface – Chassis Position Control (Map Frame) (whole_body_execution_map)

examples/python/galbot_robot/whole_body_execution_map_example.py
from galbot_sdk.g1 import GalbotRobot
import time


def main():
    robot = GalbotRobot.get_instance()
    if robot.init():
        print("系统初始化成功!")
    else:
        print("系统初始化失败!")
        return

    time.sleep(2)

    # Whole-body joints: leg(5) + head(2) + left_arm(7) + right_arm(7)
    whole_body_joint_1 = [
        0.3, 1.2, 0.85, 0.0, 0.0,        # leg
        0.5, 0.5,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.0,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.0        # right_arm
    ]
    whole_body_joint_2 = [
        0.3, 1.2, 0.85, 0.0, 0.0,        # leg
        0.0, 0.0,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.0,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.0        # right_arm
    ]

    # Base pose command map(x, y, yaw) 注意:需要根据实际环境机器人在map坐标系下的定位进行调整
    base_x_1 = -0.4
    base_y_1 = 0.226593
    base_yaw_1 = 0.0
    base_x_2 = -0.60583
    base_y_2 = 0.226593
    base_yaw_2 = 0.0

    # 可选坐标系(frame_id: base_link/odom/map, reference_frame_id: odom/map)
    frame_id = "base_link"
    reference_frame_id = "map"

    # 底盘位姿插值时间(秒),用于生成平滑底盘轨迹
    base_time_s = 15.0

    print("=== Whole-body + base pose (map) ===")
    pose_status = robot.execute_whole_body_target(
        whole_body_joint_1,
        base_x_1,
        base_y_1,
        base_yaw_1,
        frame_id,
        reference_frame_id,
        True,
        0.1,
        base_time_s,
        base_time_s,
    )
    print("execute_whole_body_target (pose) status:", pose_status)

    time.sleep(1)

    pose_status = robot.execute_whole_body_target(
        whole_body_joint_2,
        base_x_2,
        base_y_2,
        base_yaw_2,
        frame_id,
        reference_frame_id,
        True,
        0.1,
        base_time_s,
        base_time_s,
    )
    print("execute_whole_body_target (pose) status:", pose_status)

    robot.request_shutdown()
    robot.wait_for_shutdown()
    robot.destroy()


if __name__ == "__main__":
    main()

One-Click Reset to Zero – Odometry Frame (whole_body_reset_zero_odom)

examples/python/galbot_robot/whole_body_reset_zero_odom_example.py
from galbot_sdk.g1 import GalbotRobot, GalbotMotion
import time


def main():
    robot = GalbotRobot.get_instance()
    motion = GalbotMotion.get_instance()

    if not robot.init():
        print("GalbotRobot init failed.")
        return
    if not motion.init():
        print("GalbotMotion init failed.")
        return

    time.sleep(2)

    # Whole-body joints: leg(5) + head(2) + left_arm(7) + right_arm(7)
    whole_body_joint_1 = [
        0.25, 1.1, 0.85, 0.0, 0.0,       # leg
        0.5, 0.5,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.2,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.2        # right_arm
    ]

    # Base pose command odom(x, y, yaw)
    base_x_1 = 0.2
    base_y_1 = 0.0
    base_yaw_1 = 0.0

    # 可选坐标系(frame_id: base_link/odom/map, reference_frame_id: odom/map)
    frame_id = "base_link"
    reference_frame_id = "odom"

    # 底盘位姿插值时间(秒),用于生成平滑底盘轨迹
    base_time_s = 15.0

    print("=== Whole-body + base pose (odom) ===")
    pose_status = robot.execute_whole_body_target(
        whole_body_joint_1,
        base_x_1,
        base_y_1,
        base_yaw_1,
        frame_id,
        reference_frame_id,
        True,
        0.1,
        base_time_s,
        base_time_s,
    )
    print("execute_whole_body_target (pose) status:", pose_status)

    time.sleep(1)

    # 一键回零
    result = robot.zero_whole_body_and_base(
        frame_id,
        reference_frame_id,
        True,
        0.2,
        15.0,
    )
    print("Zero joint status:", result[0])
    print("Zero base status:", result[1])

    robot.request_shutdown()
    robot.wait_for_shutdown()
    robot.destroy()


if __name__ == "__main__":
    main()

One-Click Reset to Zero – Map Frame (whole_body_reset_zero_map)

examples/python/galbot_robot/whole_body_reset_zero_map_example.py
from galbot_sdk.g1 import GalbotRobot, GalbotMotion
import time


def main():
    robot = GalbotRobot.get_instance()
    motion = GalbotMotion.get_instance()

    if not robot.init():
        print("GalbotRobot init failed.")
        return
    if not motion.init():
        print("GalbotMotion init failed.")
        return

    time.sleep(2)

    # Whole-body joints: leg(5) + head(2) + left_arm(7) + right_arm(7)
    whole_body_joint_1 = [
        0.25, 1.1, 0.85, 0.0, 0.0,       # leg
        0.5, 0.5,                        # head
        2.0, -1.55, -0.55, -1.7, -0.0, -0.8, 0.2,   # left_arm
        -2.0, 1.55, 0.55, 1.7, 0.0, 0.8, 0.2        # right_arm
    ]

    # Base pose command map(x, y, yaw) 注意:需要根据实际环境机器人在map坐标系下的定位进行调整
    base_x_1 = -0.4
    base_y_1 = 0.226593
    base_yaw_1 = 0.0

    # 可选坐标系(frame_id: base_link/odom/map, reference_frame_id: odom/map)
    frame_id = "base_link"
    reference_frame_id = "map"

    # 底盘位姿插值时间(秒),用于生成平滑底盘轨迹
    base_time_s = 15.0

    print("=== Whole-body + base pose (map) ===")
    pose_status = robot.execute_whole_body_target(
        whole_body_joint_1,
        base_x_1,
        base_y_1,
        base_yaw_1,
        frame_id,
        reference_frame_id,
        True,
        0.1,
        base_time_s,
        base_time_s,
    )
    print("execute_whole_body_target (pose) status:", pose_status)

    time.sleep(1)

    # 一键回零
    result = robot.zero_whole_body_and_base(
        frame_id,
        reference_frame_id,
        True,
        0.2,
        15.0,
    )
    print("Zero joint status:", result[0])
    print("Zero base status:", result[1])

    robot.request_shutdown()
    robot.wait_for_shutdown()
    robot.destroy()


if __name__ == "__main__":
    main()

Class: GalbotMotion

Get Instance and Initialize

examples/python/galbot_motion/get_instance.py
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")

if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 仍然可以通过 GalbotRobot 管理机器人生命周期
robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Forward Kinematics (Using Current State or Specified RobotStates)

examples/python/galbot_motion/fk.py
import time
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("执行结果: SUCCESS, 执行成功")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("执行结果: TIMEOUT, 执行超时")
        elif(status == gm.MotionStatus.FAULT):
            print("执行结果: FAULT, 发生故障无法继续执行")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("执行结果: INVALID_INPUT, 输入参数不符合要求")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("执行结果: INIT_FAILED, 内部通讯组件创建失败")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("执行结果: IN_PROGRESS, 正在运动中但未到位")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("执行结果: STOPPED_UNREACHED, 已停止但未到达目标")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("执行结果: DATA_FETCH_FAILED, 数据获取失败")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("执行结果: PUBLISH_FAIL, 数据发送失败")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("执行结果: COMM_DISCONNECTED, 连接失败")

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")
if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 程序立即启动,稍等数据就绪时间
time.sleep(1)

chain_joints = {
    "leg": [0.4992,1.4991,1.0005,0.0000,-0.0004],
    "head": [0.0000,0.0],
    "left_arm": [1.9999,-1.6000,-0.5999,-1.6999,0.0000,-0.7999,0.0000],
    "right_arm": [-2.0000,1.6001,0.6001,1.7000,0.0000,0.8000,0.0000]
}
chain_pose_baselink = {
    "leg": [0.0596,-0.0000,1.0327,0.5000,0.5003,0.4997,0.5000],
    "head": [0.0599,0.0002,1.4098,-0.7072,0.0037,0.0037,0.7069],
    "left_arm": [0.1267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
    "right_arm": [0.1267,-0.2345,0.7358,-0.0225,0.0126,-0.0343,0.9991]
}
whole_body_joint = [
    num for key in ["leg", "head", "left_arm", "right_arm"] 
    for num in chain_joints[key]
]
base_state = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
custom_param = gm.Parameter()
reference_frame = "base_link"
target_frame = "EndEffector"
target_chain = "left_arm"
one_chain = [target_chain]
chain_with_torso = [target_chain, "torso"]
error_chains = [target_chain, "torso", "head"]
# 场景1:单链逆解
try:
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=one_chain,
        target_frame=target_frame,
        reference_frame=reference_frame,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 基础版逆运动学成功: 关节角={joint_map}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 基础版逆运动学异常: {e}")

# 场景2:手臂链+腰部逆解
try:
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=chain_with_torso,
        target_frame=target_frame,
        reference_frame=reference_frame,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 自定义初始关节逆运动学成功: 关节角={joint_map}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 自定义初始关节逆运动学异常: {e}")

# 场景3:非法链组合
try:
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=error_chains,
        target_frame=target_frame,
        reference_frame=reference_frame,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.INVALID_INPUT, "逆运动学求解失败"
    print(f"✅ 非法链组合输入检测成功")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 自定义初始关节逆运动学异常: {e}")

# 场景4:使用参考关节
try:
    # initial_joint_positions可指定链关节作为逆解参考值,未指定链关节使用全身关节补全
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=one_chain,
        target_frame=target_frame,
        reference_frame=reference_frame,
        initial_joint_positions=chain_joints,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 自定义初始关节逆运动学成功: 关节角={joint_map}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 自定义初始关节逆运动学异常: {e}")

# 场景5:使用RobotStates
try:
    ref_robot_state = gm.RobotStates()
    ref_robot_state.chain_name = target_chain
    ref_robot_state.whole_body_joint = whole_body_joint
    ref_robot_state.base_state = base_state
    target_frame = "EndEffector"
    reference_frame = "base_link"
    status, joint_map = motion.inverse_kinematics_by_state(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=one_chain,
        target_frame=target_frame,
        reference_frame=reference_frame,
        reference_robot_states=ref_robot_state
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 基于RobotStates逆运动学成功: 关节角={joint_map}")
except Exception as e:
    print(f"❌ 基于RobotStates逆运动学异常: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Inverse Kinematics (Basic and RobotStates-based)

examples/python/galbot_motion/ik.py
import time
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("执行结果: SUCCESS, 执行成功")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("执行结果: TIMEOUT, 执行超时")
        elif(status == gm.MotionStatus.FAULT):
            print("执行结果: FAULT, 发生故障无法继续执行")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("执行结果: INVALID_INPUT, 输入参数不符合要求")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("执行结果: INIT_FAILED, 内部通讯组件创建失败")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("执行结果: IN_PROGRESS, 正在运动中但未到位")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("执行结果: STOPPED_UNREACHED, 已停止但未到达目标")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("执行结果: DATA_FETCH_FAILED, 数据获取失败")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("执行结果: PUBLISH_FAIL, 数据发送失败")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("执行结果: COMM_DISCONNECTED, 连接失败")

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")
if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 程序立即启动,稍等数据就绪时间
time.sleep(1)

chain_joints = {
    "leg": [0.4992,1.4991,1.0005,0.0000,-0.0004],
    "head": [0.0000,0.0],
    "left_arm": [1.9999,-1.6000,-0.5999,-1.6999,0.0000,-0.7999,0.0000],
    "right_arm": [-2.0000,1.6001,0.6001,1.7000,0.0000,0.8000,0.0000]
}
chain_pose_baselink = {
    "leg": [0.0596,-0.0000,1.0327,0.5000,0.5003,0.4997,0.5000],
    "head": [0.0599,0.0002,1.4098,-0.7072,0.0037,0.0037,0.7069],
    "left_arm": [0.1267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
    "right_arm": [0.1267,-0.2345,0.7358,-0.0225,0.0126,-0.0343,0.9991]
}
whole_body_joint = [
    num for key in ["leg", "head", "left_arm", "right_arm"] 
    for num in chain_joints[key]
]
base_state = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
custom_param = gm.Parameter()
reference_frame = "base_link"
target_frame = "EndEffector"
target_chain = "left_arm"
one_chain = [target_chain]
chain_with_torso = [target_chain, "torso"]
error_chains = [target_chain, "torso", "head"]
# 场景1:单链逆解
try:
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=one_chain,
        target_frame=target_frame,
        reference_frame=reference_frame,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 基础版逆运动学成功: 关节角={joint_map}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 基础版逆运动学异常: {e}")

# 场景2:手臂链+腰部逆解
try:
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=chain_with_torso,
        target_frame=target_frame,
        reference_frame=reference_frame,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 自定义初始关节逆运动学成功: 关节角={joint_map}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 自定义初始关节逆运动学异常: {e}")

# 场景3:非法链组合
try:
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=error_chains,
        target_frame=target_frame,
        reference_frame=reference_frame,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.INVALID_INPUT, "逆运动学求解失败"
    print(f"✅ 非法链组合输入检测成功")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 自定义初始关节逆运动学异常: {e}")

# 场景4:使用参考关节
try:
    # initial_joint_positions可指定链关节作为逆解参考值,未指定链关节使用全身关节补全
    status, joint_map = motion.inverse_kinematics(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=one_chain,
        target_frame=target_frame,
        reference_frame=reference_frame,
        initial_joint_positions=chain_joints,
        enable_collision_check=False  # 关闭碰撞检测加速测试
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 自定义初始关节逆运动学成功: 关节角={joint_map}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 自定义初始关节逆运动学异常: {e}")

# 场景5:使用RobotStates
try:
    ref_robot_state = gm.RobotStates()
    ref_robot_state.chain_name = target_chain
    ref_robot_state.whole_body_joint = whole_body_joint
    ref_robot_state.base_state = base_state
    target_frame = "EndEffector"
    reference_frame = "base_link"
    status, joint_map = motion.inverse_kinematics_by_state(
        target_pose=chain_pose_baselink[target_chain],
        chain_names=one_chain,
        target_frame=target_frame,
        reference_frame=reference_frame,
        reference_robot_states=ref_robot_state
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "逆运动学求解失败"
    print(f"✅ 基于RobotStates逆运动学成功: 关节角={joint_map}")
except Exception as e:
    print(f"❌ 基于RobotStates逆运动学异常: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Get and Set End Effector Pose

examples/python/galbot_motion/get_set_end_effector_pose.py
import time
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("执行结果: SUCCESS, 执行成功")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("执行结果: TIMEOUT, 执行超时")
        elif(status == gm.MotionStatus.FAULT):
            print("执行结果: FAULT, 发生故障无法继续执行")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("执行结果: INVALID_INPUT, 输入参数不符合要求")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("执行结果: INIT_FAILED, 内部通讯组件创建失败")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("执行结果: IN_PROGRESS, 正在运动中但未到位")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("执行结果: STOPPED_UNREACHED, 已停止但未到达目标")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("执行结果: DATA_FETCH_FAILED, 数据获取失败")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("执行结果: PUBLISH_FAIL, 数据发送失败")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("执行结果: COMM_DISCONNECTED, 连接失败")

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")
if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 程序立即启动,稍等数据就绪时间
time.sleep(1)

chain_pose_baselink = {
    "leg": [0.0596,-0.0000,1.0327,0.5000,0.5003,0.4997,0.5000],
    "head": [0.0599,0.0002,1.4098,-0.7072,0.0037,0.0037,0.7069],
    "left_arm": [0.1267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
    "right_arm": [0.1267,-0.2345,0.7358,-0.0225,0.0126,-0.0343,0.9991]
}
custom_param = gm.Parameter()
target_frame = "EndEffector"
reference_frame = "base_link"
target_chain = "left_arm"
# 场景1:基础版
try:
    end_ee_link = "left_arm_end_effector_mount_link"
    status, pose = motion.get_end_effector_pose(
        end_effector_frame=end_ee_link,
        reference_frame=reference_frame
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "获取末端位姿失败"
    print(f"✅ 基础版获取末端位姿成功: {pose}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 基础版获取末端位姿异常: {e}")

# 场景2:指定链名 + 自定义frame
try:
    status, pose = motion.get_end_effector_pose_on_chain(
        chain_name=target_chain,
        frame_id=target_frame,
        reference_frame=reference_frame
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "获取末端位姿失败"
    print(f"✅ 指定链名获取末端位姿成功: {pose}")
    time.sleep(0.8)
except Exception as e:
    print(f"❌ 指定链名获取末端位姿异常: {e}")

end_effector_frame="left_arm"
reference_frame = "base_link"
try:
    status = motion.set_end_effector_pose(
        target_pose=chain_pose_baselink[end_effector_frame],
        end_effector_frame=end_effector_frame,
        reference_frame=reference_frame,
        enable_collision_check=False,
        is_blocking=False,
        timeout=5.0,
        params=custom_param
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "设置末端位姿失败"
    print(f"✅ 设置末端位姿成功: status={status}")
except Exception as e:
    print(f"❌ 设置末端位姿异常: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Single Point Motion Planning (Joint Space and Cartesian Space)

examples/python/galbot_motion/motion_plan.py
import time

import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# NOTE:
# - GalbotMotion currently does NOT provide real-time obstacle perception / automatic environment updates.
# - Motion collision checking uses self-collision + a collision world built from objects you load manually via
#   add_obstacle()/attach_target_object() (including point clouds if you load them explicitly).

motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("Result: SUCCESS")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("Result: TIMEOUT")
        elif(status == gm.MotionStatus.FAULT):
            print("Result: FAULT")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("Result: INVALID_INPUT")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("Result: INIT_FAILED")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("Result: IN_PROGRESS")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("Result: STOPPED_UNREACHED")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("Result: DATA_FETCH_FAILED")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("Result: PUBLISH_FAIL")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("Result: COMM_DISCONNECTED")

if motion.init():
    print("GalbotMotion init OK")
else:
    print("GalbotMotion init FAILED")
if robot.init():
    print("GalbotRobot init OK")
else:
    print("GalbotRobot init FAILED")

# Wait for data to be ready.
time.sleep(1)

chain_joints = {
    "leg": [0.4992,1.4991,1.0005,0.0000,-0.0004],
    "head": [0.0000,0.0],
    "left_arm": [1.9999,-1.6000,-0.5999,-1.6999,0.0000,-0.7999,0.0000],
    "right_arm": [-2.0000,1.6001,0.6001,1.7000,0.0000,0.8000,0.0000]
}
chain_pose_baselink = {
    "leg": [0.0596,-0.0000,1.0327,0.5000,0.5003,0.4997,0.5000],
    "head": [0.0599,0.0002,1.4098,-0.7072,0.0037,0.0037,0.7069],
    "left_arm": [0.1267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
    "right_arm": [0.1267,-0.2345,0.7358,-0.0225,0.0126,-0.0343,0.9991]
}
whole_body_joint = [
    num for key in ["leg", "head", "left_arm", "right_arm"] 
    for num in chain_joints[key]
]
base_state = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
custom_param = gm.Parameter()

# Scenario 1: joint-space planning, target type = joint state
try:
    # 构造目标关节状态

    target_joint = gm.JointStates()
    target_joint.chain_name = "left_arm"
    target_joint.joint_positions = chain_joints[target_joint.chain_name]

    status, traj = motion.motion_plan(
        target=target_joint,
        # When enable_collision_check=True, collision is checked against Motion-side explicitly loaded obstacles
        # (add_obstacle/attach_target_object) and self-collision.
        enable_collision_check=False,
        params=custom_param
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "Planning failed"
    if traj != {}:
        print(f"✅ 关节空间规划 + 目标点类型为关节单链单点规划规划成功: 轨迹点数={len(traj[target_joint.chain_name])}")
        time.sleep(0.8)
    else:
        print(f"⚠️ 返回状态为SUCCESS,轨迹为空,可能已到达,检查目标值与当前状态是否一致或在误差范围内")

except Exception as e:
    print(f"ERROR: joint-space single-point planning exception: {e}")

# Scenario 2: joint-space planning, target type = end-effector pose (Cartesian)
try:
    # 构造目标位姿状态
    target_pose_state = gm.PoseState()
    target_pose_state.chain_name = "left_arm"
    target_pose_state.frame_id = "EndEffector"
    target_pose_state.reference_frame = "base_link"
    target_pose_state.pose = gm.Pose(chain_pose_baselink[target_pose_state.chain_name])
    # target_pose_state.pose.position.x += 0.2

    status, traj = motion.motion_plan(
        target=target_pose_state,
        enable_collision_check=False
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "Planning failed"
    if traj != {}:
        print(f"✅ 关节空间规划 + 目标点类型为末端位姿单链单点规划成功: 轨迹长度={len(traj[target_pose_state.chain_name])}")
        time.sleep(0.8)
    else:
        print(f"⚠️ 返回状态为SUCCESS,轨迹为空,可能已到达,检查目标值与当前状态是否一致或在误差范围内")

except Exception as e:
    print(f"ERROR: Cartesian single-point planning exception: {e}")

# Scenario 3: joint-space planning with an explicit start state
try:
    # 构造目标关节状态

    target_joint = gm.JointStates()
    target_joint.chain_name = "left_arm"
    target_joint.joint_positions = chain_joints[target_joint.chain_name]

    start_joint = gm.JointStates()
    start_joint.chain_name = "left_arm"
    start_joint.joint_positions = [0] * 7

    status, traj = motion.motion_plan(
        target=target_joint,
        start=start_joint,
        enable_collision_check=False,
        params=custom_param
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "Planning failed"
    if traj != {}:
        print(f"✅ 关节空间规划 + 目标点类型为关节单链单点规划规划成功: 轨迹点数={len(traj[target_joint.chain_name])}")
    else:
        print(f"⚠️ 返回状态为SUCCESS,轨迹为空,可能已到达,检查目标值与当前状态是否一致或在误差范围内")

except Exception as e:
    print(f"ERROR: joint-space single-point planning exception: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Multi-Waypoint Trajectory Planning

examples/python/galbot_motion/motion_plan_multi_waypoints.py
import time
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("执行结果: SUCCESS, 执行成功")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("执行结果: TIMEOUT, 执行超时")
        elif(status == gm.MotionStatus.FAULT):
            print("执行结果: FAULT, 发生故障无法继续执行")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("执行结果: INVALID_INPUT, 输入参数不符合要求")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("执行结果: INIT_FAILED, 内部通讯组件创建失败")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("执行结果: IN_PROGRESS, 正在运动中但未到位")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("执行结果: STOPPED_UNREACHED, 已停止但未到达目标")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("执行结果: DATA_FETCH_FAILED, 数据获取失败")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("执行结果: PUBLISH_FAIL, 数据发送失败")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("执行结果: COMM_DISCONNECTED, 连接失败")

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")
if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 程序立即启动,稍等数据就绪时间
time.sleep(2)

chain_joints = {
    "leg": [0.4992,1.4991,1.0005,0.0000,-0.0004],
    "head": [0.0000,0.0],
    "left_arm": [1.9999,-1.6000,-0.5999,-1.6999,0.0000,-0.7999,0.0000],
    "right_arm": [-2.0000,1.6001,0.6001,1.7000,0.0000,0.8000,0.0000]
}
chain_pose_baselink = {
    "leg": [0.0596,-0.0000,1.0327,0.5000,0.5003,0.4997,0.5000],
    "head": [0.0599,0.0002,1.4098,-0.7072,0.0037,0.0037,0.7069],
    "left_arm": [0.1267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
    "right_arm": [0.1267,-0.2345,0.7358,-0.0225,0.0126,-0.0343,0.9991]
}
whole_body_joint = [
    num for key in ["leg", "head", "left_arm", "right_arm"]
    for num in chain_joints[key]
]
base_state = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
custom_param = gm.Parameter()

# 场景1:笛卡尔空间多路点规划(PoseState目标)
try:
    # 构造目标位姿
    target_pose_state = gm.PoseState()
    target_pose_state.chain_name = "left_arm"

    # 构造路点(3个中间位姿)
    waypoint_poses = [
        [0.1267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
        [0.2267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
        [0.3267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
        [0.4267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
    ]

    status, traj = motion.motion_plan_multi_waypoints(
        target=target_pose_state,
        waypoint_poses=waypoint_poses,
        enable_collision_check=False,
        params=custom_param
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "笛卡尔多点单链规划失败"
    if traj != {}:
        print(f"✅ 笛卡尔路点单链规划规划成功: 轨迹点数={len(traj[target_pose_state.chain_name])}")
        time.sleep(0.8)
    else:
        print(f"⚠️ 返回状态为SUCCESS,轨迹为空,可能已到达,检查目标值与当前状态是否一致或在误差范围内")
except Exception as e:
    print(f"❌ 笛卡尔多点运动规划异常: {e}")

# 场景2:关节空间多路点规划(JointStates目标)
try:
    # 构造目标位姿
    target_joint = gm.JointStates()
    target_joint.chain_name = "left_arm"

    # 构造路点(3个中间位姿)
    waypoints = [
        [0.1267,0.2342,0.7356,0.0220,0.0127,0.0343,0.9991],
        [0.2267,0.4342,0.7356,0.0220,0.0127,0.0343,0.9991],
        [0.3267,0.6342,0.7356,0.0220,0.0127,0.0343,0.9991],
        [0.4267,0.8342,0.7356,0.0220,0.0127,0.0343,0.9991]
    ]

    status, traj = motion.motion_plan_multi_waypoints(
        target=target_joint,
        waypoint_poses=waypoints,
        enable_collision_check=False,
        params=custom_param
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "笛卡尔多点单链规划失败"
    if traj != {}:
        print(f"✅ 关节路点单链规划规划成功: 轨迹点数={len(traj[target_pose_state.chain_name])}")
    else:
        print(f"⚠️ 返回状态为SUCCESS,轨迹为空,可能已到达,检查目标值与当前状态是否一致或在误差范围内")
except Exception as e:
    print(f"❌ 关节空间多点运动规划异常: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Collision Detection

examples/python/galbot_motion/check_collision.py
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot, GalbotNavigation   
import time

# NOTE:
# - GalbotMotion currently does NOT provide real-time obstacle perception / automatic environment updates.
# - Collision checking uses self-collision + the collision world built from objects you load manually via
#   add_obstacle()/attach_target_object() (including point clouds if you load them explicitly).

motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()
nav = GalbotNavigation.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("Result: SUCCESS")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("Result: TIMEOUT")
        elif(status == gm.MotionStatus.FAULT):
            print("Result: FAULT")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("Result: INVALID_INPUT")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("Result: INIT_FAILED")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("Result: IN_PROGRESS")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("Result: STOPPED_UNREACHED")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("Result: DATA_FETCH_FAILED")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("Result: PUBLISH_FAIL")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("Result: COMM_DISCONNECTED")

if motion.init():
    print("GalbotMotion init OK")
else:
    print("GalbotMotion init FAILED")
if robot.init():
    print("GalbotRobot init OK")
else:
    print("GalbotRobot init FAILED")
if nav.init():
    print("GalbotNavigation init OK")
else:
    print("GalbotNavigation init FAILED")

# Wait for data to be ready.
time.sleep(3)

chain_joints = {
    "leg": [0.4992,1.4991,1.0005,0.0000,-0.0004],
    "head": [0.0000,0.0],
    "left_arm": [1.9999,-1.6000,-0.5999,-1.6999,0.0000,-0.7999,0.0000],
    "right_arm": [-2.0000,1.6001,0.6001,1.7000,0.0000,0.8000,0.0000]
}

whole_body_joint = [
    num for key in ["leg", "head", "left_arm", "right_arm"] 
    for num in chain_joints[key]
]
base_state = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
custom_param = gm.Parameter()

try:
    # Build RobotStates list to check.
    check_states = [gm.RobotStates() for _ in range(2)]
    check_states[0].whole_body_joint = whole_body_joint
    check_states[0].base_state = base_state

    bad_left_arm_joint = [1.99995,-1.60004,0.599905,-1.69994,0,-0.799924,0]

    check_left_arm = gm.JointStates()
    check_left_arm.chain_name = "left_arm"
    check_left_arm.joint_positions = bad_left_arm_joint
    check_states[1] = check_left_arm

    status, collision_res = motion.check_collision(
        start=check_states,
        enable_collision_check=True
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "Collision check failed"
    assert len(collision_res) == len(check_states), "Result size mismatch"
    print(f"OK: collision check finished: {collision_res} (False=no collision)")
except Exception as e:
    print(f"ERROR: collision check exception: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Attach Tool

examples/python/galbot_motion/attach_tool.py
import time
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("执行结果: SUCCESS, 执行成功")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("执行结果: TIMEOUT, 执行超时")
        elif(status == gm.MotionStatus.FAULT):
            print("执行结果: FAULT, 发生故障无法继续执行")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("执行结果: INVALID_INPUT, 输入参数不符合要求")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("执行结果: INIT_FAILED, 内部通讯组件创建失败")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("执行结果: IN_PROGRESS, 正在运动中但未到位")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("执行结果: STOPPED_UNREACHED, 已停止但未到达目标")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("执行结果: DATA_FETCH_FAILED, 数据获取失败")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("执行结果: PUBLISH_FAIL, 数据发送失败")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("执行结果: COMM_DISCONNECTED, 连接失败")

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")
if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 程序立即启动,稍等数据就绪时间
time.sleep(2)

try:
    chain_name = "left_arm"
    tool_name = "suction_cup"
    status = motion.attach_tool(
        chain=chain_name,
        tool=tool_name
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "加载工具失败"
    print(f"✅ 加载工具成功")
except Exception as e:
    print(f"❌ 加载工具异常: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Detach Tool

examples/python/galbot_motion/detach_tool.py
import time
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("执行结果: SUCCESS, 执行成功")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("执行结果: TIMEOUT, 执行超时")
        elif(status == gm.MotionStatus.FAULT):
            print("执行结果: FAULT, 发生故障无法继续执行")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("执行结果: INVALID_INPUT, 输入参数不符合要求")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("执行结果: INIT_FAILED, 内部通讯组件创建失败")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("执行结果: IN_PROGRESS, 正在运动中但未到位")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("执行结果: STOPPED_UNREACHED, 已停止但未到达目标")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("执行结果: DATA_FETCH_FAILED, 数据获取失败")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("执行结果: PUBLISH_FAIL, 数据发送失败")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("执行结果: COMM_DISCONNECTED, 连接失败")

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")
if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 程序立即启动,稍等数据就绪时间
time.sleep(2)

# 1. 卸载工具
try:
    chain_name = "left_arm"
    status = motion.detach_tool(
        chain=chain_name
    )
    printStatus(status)
    assert status == gm.MotionStatus.SUCCESS, "卸载工具失败"
    print(f"✅ 卸载工具成功")
except Exception as e:
    print(f"❌ 卸载工具异常: {e}") 

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()
examples/python/galbot_motion/get_link_names.py
import time
import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# 获取 GalbotMotion 单例并初始化
motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

if motion.init():
    print("GalbotMotion 初始化成功")
else:
    print("GalbotMotion 初始化失败")
if robot.init():
    print("GalbotRobot 初始化成功")
else:
    print("GalbotRobot 初始化失败")

# 程序立即启动,稍等数据就绪时间
time.sleep(2)

try:
    # 获取所有连杆名称
    all_link_names = motion.get_link_names(only_end_effector=False)
    print(f"\n所有连杆名称 (共 {len(all_link_names)} 个):")
    for i, link_name in enumerate(all_link_names, 1):
        print(f"  {i}. {link_name}")

    # 只获取末端执行器连杆名称
    ee_link_names = motion.get_link_names(only_end_effector=True)
    print(f"\n末端执行器连杆名称 (共 {len(ee_link_names)} 个):")
    for i, link_name in enumerate(ee_link_names, 1):
        print(f"  {i}. {link_name}")

    # 示例:使用连杆名称进行前向运动学计算
    if ee_link_names:
        print(f"\n使用末端执行器连杆 '{ee_link_names[0]}' 进行前向运动学计算...")
        success, fk_result = motion.forward_kinematics(ee_link_names[0])
        if success == gm.MotionStatus.SUCCESS:
            print(f"前向运动学结果: {fk_result}")
        else:
            print(f"前向运动学计算失败: {success}")
except Exception as e:
    print(f"❌ 获取连杆名称异常: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Add/Remove Environment Collision Objects

examples/python/galbot_motion/add_obstacle.py
import time

import galbot_sdk.g1 as gm
from galbot_sdk.g1 import GalbotMotion, GalbotRobot

# NOTE:
# - GalbotMotion currently does NOT provide real-time obstacle perception / automatic environment updates.
# - If you want Motion collision checking to consider obstacles (including point clouds), you must load them
#   manually via add_obstacle()/attach_target_object().

motion = GalbotMotion.get_instance()
robot = GalbotRobot.get_instance()

def printStatus(status):
        if(status == gm.MotionStatus.SUCCESS):
            print("Result: SUCCESS")
        elif(status == gm.MotionStatus.TIMEOUT):
            print("Result: TIMEOUT")
        elif(status == gm.MotionStatus.FAULT):
            print("Result: FAULT")
        elif(status == gm.MotionStatus.INVALID_INPUT):
            print("Result: INVALID_INPUT")
        elif(status == gm.MotionStatus.INIT_FAILED):
            print("Result: INIT_FAILED")
        elif(status == gm.MotionStatus.IN_PROGRESS):
            print("Result: IN_PROGRESS")
        elif(status == gm.MotionStatus.STOPPED_UNREACHED):
            print("Result: STOPPED_UNREACHED")
        elif(status == gm.MotionStatus.DATA_FETCH_FAILED):
            print("Result: DATA_FETCH_FAILED")
        elif(status == gm.MotionStatus.PUBLISH_FAIL):
            print("Result: PUBLISH_FAIL")
        elif(status == gm.MotionStatus.COMM_DISCONNECTED):
            print("Result: COMM_DISCONNECTED")

if motion.init():
    print("GalbotMotion init OK")
else:
    print("GalbotMotion init FAILED")
if robot.init():
    print("GalbotRobot init OK")
else:
    print("GalbotRobot init FAILED")

# Wait for data to be ready.
time.sleep(2)

# 1) Add a box collision object into Motion environment.
#    This affects Motion-side collision checking (e.g., motion_plan/check_collision).
try:
    obstacle_id = "box_test_1"
    obj_type = "box"
    obj_pose = [1.0, 0.0, 1.0, 0,0,0,1]
    obj_size = [1.0, 1.0, 1.0]
    target_frame = "world"
    status = motion.add_obstacle(
        obstacle_id=obstacle_id,
        obstacle_type=obj_type,
        pose=obj_pose,
        scale=obj_size,
        target_frame=target_frame
    )
    printStatus(status)
    motion.clear_obstacle()
    assert status == gm.MotionStatus.SUCCESS, "Failed to add obstacle"
    print(f"OK: added obstacle: {obstacle_id}")
except Exception as e:
    print(f"ERROR: add obstacle exception: {e}")

# 2) Add a duplicate ID (expected to fail).
try:
    obstacle_id = "box_test_1"
    obj_type = "box"
    obj_pose = [1.0, 0.0, 1.0, 0,0,0,1]
    obj_size = [1.0, 1.0, 1.0]
    target_frame = "world"
    status = motion.add_obstacle(
        obstacle_id=obstacle_id,
        obstacle_type=obj_type,
        pose=obj_pose,
        scale=obj_size,
        target_frame=target_frame
    )
    status = motion.add_obstacle(
        obstacle_id=obstacle_id,
        obstacle_type=obj_type,
        pose=obj_pose,
        scale=obj_size,
        target_frame=target_frame
    )
    printStatus(status)
    motion.clear_obstacle()
    assert status == gm.MotionStatus.FAULT, "Expected duplicate obstacle ID to fail"
    print("OK: duplicate obstacle ID is rejected")
except Exception as e:
    print(f"ERROR: duplicate obstacle exception: {e}")

robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Class: GalbotNavigation

Get Instance / Initialize

examples/python/galbot_navigation/get_instance.py
from galbot_sdk.g1 import GalbotNavigation, GalbotRobot
import numpy as np

# 初始化系统与导航模块
robot = GalbotRobot.get_instance()
robot.init()

nav = GalbotNavigation.get_instance()
nav.init()

print("GalbotNavigation 已初始化:", nav is not None)

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Relocalize / Is Localized / Get Current Pose

examples/python/galbot_navigation/relocalized.py
from galbot_sdk.g1 import GalbotNavigation, GalbotRobot
import numpy as np
import time

nav = GalbotNavigation.get_instance()
nav.init()
robot = GalbotRobot.get_instance()
robot.init()

init_pose = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])

# 尝试确保定位成功
while not nav.is_localized():
    nav.relocalize(init_pose)
    time.sleep(0.5)

print("当前位姿:", nav.get_current_pose())

nav.stop_navigation()
# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Check Path Reachability and Blocking Navigation to Goal

examples/python/galbot_navigation/check_path_reachability.py
from galbot_sdk.g1 import GalbotNavigation, GalbotRobot, ControllerName, ControlStatus
import numpy as np
import sys

nav = GalbotNavigation.get_instance()
nav.init()
robot = GalbotRobot.get_instance()
robot.init()

start = nav.get_current_pose()
goal = np.array([1.0, 1.0, 0.0, 0, 0, 0.4794255, 0.8775826])

res = robot.switch_controller(ControllerName.CHASSIS_POSE_CTRL)
if res != ControlStatus.SUCCESS:
    print("切换控制器失败!")
    sys.exit(1)
else:
    print("切换控制器成功!")

if nav.check_path_reachability(goal, start):
    status = nav.navigate_to_goal(goal, enable_collision_check=True, is_blocking=True, timeout=30)
    print("navigate_to_goal 返回状态:", status)
    print("是否到达:", nav.check_goal_arrival())
else:
    print("路径不可达或不安全")

nav.stop_navigation()
# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Non-blocking Navigation + Polling for Arrival

examples/python/galbot_navigation/navigate_to_goal.py
from galbot_sdk.g1 import GalbotNavigation, GalbotRobot, ControllerName, ControlStatus
import numpy as np
import time
import sys

nav = GalbotNavigation.get_instance()
nav.init()
robot = GalbotRobot.get_instance()
robot.init()

goal = np.array([0.5, 0.0, 0.0, 0, 0, 0.0, 1.0])

res = robot.switch_controller(ControllerName.CHASSIS_POSE_CTRL)
if res != ControlStatus.SUCCESS:
    print("切换控制器失败!")
    sys.exit(1)
else:
    print("切换控制器成功!")

nav.navigate_to_goal(goal, enable_collision_check=True, is_blocking=False, timeout=20)

start_time = time.time()
reached = False

while True:
    if nav.check_goal_arrival():
        reached = True
        break
    if time.time() - start_time > 20:
        print("导航超时,20s 内未到达目标")
        break
    print("正在导航...")
    time.sleep(0.5)

if reached:
    print("已到达目标")
if reached:
    print("已到达目标")

nav.stop_navigation()
# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Move Straight to Goal / Stop Navigation

examples/python/galbot_navigation/move_straight_to.py
from galbot_sdk.g1 import GalbotNavigation, GalbotRobot, ControllerName, ControlStatus
import numpy as np
import time
import sys

nav = GalbotNavigation.get_instance()
nav.init()
robot = GalbotRobot.get_instance()
robot.init()

target = np.array([0.2, 0.0, 0.0, 0, 0, 0.0, 1.0])

res = robot.switch_controller(ControllerName.CHASSIS_POSE_CTRL)
if res != ControlStatus.SUCCESS:
    print("切换控制器失败!")
    sys.exit(1)
else:
    print("切换控制器成功!")

nav.move_straight_to(target, is_blocking=False, timeout=10)
time.sleep(1.0)
nav.stop_navigation()

# 主动发出SIGINT退出信号
robot.request_shutdown()
# 等待进入shutdown状态
robot.wait_for_shutdown()
# 进行SDK资源释放
robot.destroy()
print('资源释放成功')

Get Navigation Status + Polling for SUCCESS/FAILED or Timeout

examples/python/galbot_navigation/get_navigation_status.py
"""
示例:非阻塞导航中轮询 get_navigation_status,根据 SUCCESS/FAILED 或超时及时退出,
避免卡死并走错误逻辑。
"""
from galbot_sdk.g1 import GalbotNavigation, GalbotRobot, NavigationTaskStatus, ControllerName, ControlStatus
import numpy as np
import time
import sys

nav = GalbotNavigation.get_instance()
nav.init()
robot = GalbotRobot.get_instance()
robot.init()

goal = np.array([0.5, 0.0, 0.0, 0, 0, 0.0, 1.0])
timeout_s = 20.0
poll_interval_s = 0.5

res = robot.switch_controller(ControllerName.CHASSIS_POSE_CTRL)
if res != ControlStatus.SUCCESS:
    print("切换控制器失败!")
    sys.exit(1)
else:
    print("切换控制器成功!")
# 非阻塞导航
nav.navigate_to_goal(goal, enable_collision_check=True, is_blocking=False, timeout=timeout_s)
start = time.time()

while True:
    status = nav.get_navigation_status()
    elapsed = time.time() - start

    if status == NavigationTaskStatus.SUCCESS:
        print("已到达目标")
        break
    if status == NavigationTaskStatus.FAILED:
        print("导航失败,及时退出错误逻辑")
        break
    if elapsed >= timeout_s:
        print("导航超时,及时退出")
        break

    if status == NavigationTaskStatus.RUNNING:
        print(f"正在导航... 状态: {status.name}, 已用时: {elapsed:.1f}s")
    else:
        print(f"状态: {status.name}, 已用时: {elapsed:.1f}s")

    time.sleep(poll_interval_s)

nav.stop_navigation()
robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()
print("资源释放成功")

Complete Running Example (Simple Workflow)

examples/python/galbot_navigation/complete_example.py
from galbot_sdk.g1 import GalbotRobot, GalbotNavigation, ControllerName, ControlStatus
import numpy as np
import time
import sys

robot = GalbotRobot.get_instance()
robot.init()
nav = GalbotNavigation.get_instance()
nav.init()

init_pose = np.array([0.0, 0.0, 0.0, 0, 0, 0.0, 1.0])
goal_pose = np.array([1.0, 0.0, 0.0, 0, 0, 0.0, 1.0])

res = robot.switch_controller(ControllerName.CHASSIS_POSE_CTRL)
if res != ControlStatus.SUCCESS:
    print("切换控制器失败!")
    sys.exit(1)
else:
    print("切换控制器成功!")

while not nav.is_localized():
    nav.relocalize(init_pose)
    time.sleep(0.5)

if nav.check_path_reachability(goal_pose, nav.get_current_pose()):
    nav.navigate_to_goal(goal_pose, enable_collision_check=True, is_blocking=True, timeout=30)
    print("是否已到达:", nav.check_goal_arrival())

nav.stop_navigation()
# 关闭系统
robot.request_shutdown()
robot.wait_for_shutdown()
robot.destroy()

Class: GalbotPerception

Foundation Stereo Single Depth Inference (run_once + wait_for_new_result + get_latest_result)

Initialize FOUNDATION_STEREO and LIGHT_STEREO, trigger one inference, wait for a new result, then colorize the depth map (instance_mask) and save it as a PNG.

examples/python/galbot_perception/foundation_stereo_example.py
"""高精度双目深度估计示例:单次 run_once,将深度图伪彩色保存为一张图片"""

import time
import cv2
import numpy as np

from galbot_sdk.g1 import GalbotPerception, GalbotRobot, PerceptionModule

OUTPUT_IMAGE_PATH = "foundation_stereo_depth.png"

def main():
    robot = GalbotRobot.get_instance()
    if not robot.init():
        print("Robot 初始化失败")
        return
    print("Robot 初始化成功")

    perception = GalbotPerception.get_instance()
    if not perception.init({PerceptionModule.FOUNDATION_STEREO, PerceptionModule.LIGHT_STEREO}):
        print("感知模块初始化失败")
        return
    print("感知模块初始化成功")

    time.sleep(12)  # 等待感知模型 load 成功
    print("触发单次推理...")

    if not perception.run_once(PerceptionModule.FOUNDATION_STEREO):
        print("run_once 命令发送失败")
        return

    print("等待推理结果...")
    if not perception.wait_for_new_result(
        PerceptionModule.FOUNDATION_STEREO, timeout_s=6.0
    ):
        print("等待超时,未收到推理结果")
        return

    ok, result = perception.get_latest_result(PerceptionModule.FOUNDATION_STEREO)
    if not ok:
        print("获取结果失败")
        return

    print(result.get_result_info())

    depth_map = result.instance_mask
    if depth_map is None:
        print("未收到深度图 (instance_mask 为空)")
        return

    print(f"深度图 shape: {depth_map.shape}, dtype: {depth_map.dtype}")
    print(f"深度值范围: [{np.nanmin(depth_map)}, {np.nanmax(depth_map)}]")

    depth_f = depth_map.astype(np.float32)
    valid = depth_f[depth_f > 0]
    if valid.size > 0:
        vmin, vmax = np.percentile(valid, [1, 99])
        normalized = np.clip((depth_f - vmin) / (vmax - vmin + 1e-6), 0, 1)
    else:
        normalized = np.zeros_like(depth_f)
    colored = cv2.applyColorMap(
        (normalized * 255).astype(np.uint8), cv2.COLORMAP_TURBO
    )

    if cv2.imwrite(OUTPUT_IMAGE_PATH, colored):
        print(f"已保存: {OUTPUT_IMAGE_PATH}")
    else:
        print(f"保存失败: {OUTPUT_IMAGE_PATH}")


if __name__ == "__main__":
    main()
    GalbotRobot.get_instance().request_shutdown()
    GalbotRobot.get_instance().wait_for_shutdown()
    GalbotRobot.get_instance().destroy()

Class: Parameter

examples/python/galbot_motion/create_parameter.py
from galbot_sdk.g1 import Parameter, create_parameter, JointGroup

# 通过构造函数创建 Parameter 并设置选项
p = Parameter()
p.set_blocking(True)
p.set_check_collision(False)
p.set_timeout(5.0)
p.set_actuate('with_chain_only')
p.set_tool_pose(False)
p.set_reference_frame('base_link')

p.joint_state = {
    JointGroup.LEFT_ARM: [0.0] * 7,
    # 需要的话也可以加上其他:
    # JointGroup.RIGHT_ARM: [0.0] * 7,
    # JointGroup.LEG: [0.0] * 4,
}

print('blocking:', p.get_blocking())
print('collision check:', p.get_check_collision())
print('timeout:', p.get_timeout())

# 或者使用工厂函数快速创建 Parameter
p2 = create_parameter(direct_execute=False, blocking=True, timeout=3.0, actuate='with_chain_only', tool_pose=False, check_collision=True)
print('工厂创建的 timeout:', p2.get_timeout())

Utility Functions

check_motion_status

examples/python/galbot_motion/check_motion_status.py
from galbot_sdk.g1 import MotionStatus, check_motion_status

status_str = check_motion_status(MotionStatus.SUCCESS)
print('MotionStatus 字符串:', status_str)

create_joint_state / create_pose_state

examples/python/galbot_motion/create_pose_state.py
from galbot_sdk.g1 import create_joint_state, create_pose_state, JointStates, PoseState, Pose

# 使用工厂函数创建辅助对象
js = create_joint_state()
ps = create_pose_state()

# 填充示例字段
js.chain_name = 'left_arm'
js.joint_positions = [0.0] * 7

ps.chain_name = 'left_arm'
# 7 维向量: x, y, z, qx, qy, qz, qw
ps.pose = Pose([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])

print(type(js), js.chain_name)
print(type(ps), ps.chain_name)