Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extensions/rcs_fr3/src/rcs_fr3/creators.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def create_env(self, cfg: FR3HardwareEnvCreatorConfig) -> gym.Env:
logger.info("CameraSet started")
env = CameraSetWrapper(env, camera_set, cfg.wrapper_cfg.include_depth)

if cfg.relative_to != RelativeTo.NONE:
if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to)
return CoverWrapper(env)

Expand Down
2 changes: 1 addition & 1 deletion extensions/rcs_panda/src/rcs_panda/creators.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def create_env(self, cfg: PandaHardwareEnvCreatorConfig) -> gym.Env:
logger.info("CameraSet started")
env = CameraSetWrapper(env, camera_set)

if cfg.relative_to != RelativeTo.NONE:
if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to)
return CoverWrapper(env)

Expand Down
2 changes: 1 addition & 1 deletion extensions/rcs_so101/src/rcs_so101/creators.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def create_env(self, cfg: SO101HardwareEnvCreatorConfig) -> gym.Env:
logger.info("CameraSet started")
env = CameraSetWrapper(env, camera_set, include_depth=True)

if cfg.relative_to != RelativeTo.NONE:
if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to)
return CoverWrapper(env)

Expand Down
2 changes: 1 addition & 1 deletion extensions/rcs_ur5e/src/rcs_ur5e/creators.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def create_env(self, cfg: UR5eHardwareEnvCreatorConfig) -> gym.Env:
logger.info("CameraSet started")
env = CameraSetWrapper(env, camera_set, include_depth=True)

if cfg.relative_to != RelativeTo.NONE:
if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to)
return CoverWrapper(env)

Expand Down
2 changes: 1 addition & 1 deletion extensions/rcs_xarm7/src/rcs_xarm7/creators.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def create_env(self, cfg: XArm7HardwareEnvCreatorConfig) -> gym.Env:
hand = TilburgHand(cfg=cfg.hand_cfg, verbose=True)
env = HandWrapper(env, hand, cfg.wrapper_cfg.binary_gripper)

if cfg.relative_to != RelativeTo.NONE:
if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE:
env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to)
return CoverWrapper(env)

Expand Down
150 changes: 89 additions & 61 deletions python/rcs/envs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ def __init__(
)
self.max_mov: float | tuple[float, float] = max_mov

if self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TRPY:
if self.relative_to == RelativeTo.NONE:
pass
elif self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TRPY:
assert isinstance(self.max_mov, tuple)
self.action_space.spaces.update(
get_space(
Expand Down Expand Up @@ -700,6 +702,7 @@ def reset(self, **kwargs) -> tuple[dict, dict[str, Any]]:
self.initial_obs = obs
self.set_origin_to_current()
self._last_action = None
self._absolute_action = None
return obs, info

def action(self, action: dict[str, Any]) -> dict[str, Any]:
Expand All @@ -709,105 +712,130 @@ def action(self, action: dict[str, Any]) -> dict[str, Any]:
self.set_origin_to_current()
action = copy.deepcopy(action)
if self.get_wrapper_attr("get_control_mode")() == ControlMode.JOINTS and self.joints_key in action:
assert isinstance(self._origin, np.ndarray), "Invalid origin type give the control mode."
assert isinstance(self.max_mov, float)
low, high = self._robot.get_config().joint_limits
# TODO: should we also clip euqally for all joints?
if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None:
limited_joints = np.clip(action[self.joints_key], -self.max_mov, self.max_mov)
self._last_action = limited_joints
if self.relative_to == RelativeTo.NONE:
reference_joints = self._robot.get_joint_position() if self._last_action is None else self._last_action
assert isinstance(reference_joints, np.ndarray), "Invalid reference type given the control mode."
limited_joints_diff = np.clip(action[self.joints_key] - reference_joints, -self.max_mov, self.max_mov)
clipped_joints = np.clip(reference_joints + limited_joints_diff, low, high)
self._last_action = clipped_joints
else:
joints_diff = action[self.joints_key] - self._last_action
limited_joints_diff = np.clip(joints_diff, -self.max_mov, self.max_mov)
limited_joints = limited_joints_diff + self._last_action
self._last_action = limited_joints
clipped_joints = np.clip(self._origin + limited_joints, low, high)
assert isinstance(self._origin, np.ndarray), "Invalid origin type give the control mode."
# TODO: should we also clip euqally for all joints?
if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None:
limited_joints = np.clip(action[self.joints_key], -self.max_mov, self.max_mov)
self._last_action = limited_joints
else:
joints_diff = action[self.joints_key] - self._last_action
limited_joints_diff = np.clip(joints_diff, -self.max_mov, self.max_mov)
limited_joints = limited_joints_diff + self._last_action
self._last_action = limited_joints
clipped_joints = np.clip(self._origin + limited_joints, low, high)
action.update(JointsDictType(joints=clipped_joints))
self._absolute_action = clipped_joints

elif self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TRPY and self.trpy_key in action:
assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode."
assert isinstance(self.max_mov, tuple)
pose_space = cast(gym.spaces.Box, get_space(TRPYDictType).spaces[self.trpy_key])
target_pose = common.Pose(
translation=action[self.trpy_key][:3],
rpy_vector=action[self.trpy_key][3:],
)

if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None:
clipped_pose_offset = (
common.Pose(
translation=action[self.trpy_key][:3],
rpy_vector=action[self.trpy_key][3:],
)
.limit_translation_length(self.max_mov[0])
.limit_rotation_angle(self.max_mov[1])
)
self._last_action = clipped_pose_offset
else:
assert isinstance(self._last_action, common.Pose)
pose_diff = (
common.Pose(
translation=action[self.trpy_key][:3],
rpy_vector=action[self.trpy_key][3:],
)
* self._last_action.inverse()
if self.relative_to == RelativeTo.NONE:
reference_pose = (
self._robot.get_cartesian_position() if self._last_action is None else self._last_action
)
assert isinstance(reference_pose, common.Pose), "Invalid reference type given the control mode."
pose_diff = target_pose * reference_pose.inverse()
clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle(
self.max_mov[1]
)
clipped_pose_offset = clipped_pose_diff * self._last_action
self._last_action = clipped_pose_offset
unclipped_pose = common.Pose(
translation=reference_pose.translation() + clipped_pose_diff.translation(), # type: ignore
rpy_vector=(clipped_pose_diff * reference_pose).rotation_rpy().as_vector(),
)
else:
assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode."
if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None:
clipped_pose_offset = target_pose.limit_translation_length(self.max_mov[0]).limit_rotation_angle(
self.max_mov[1]
)
self._last_action = clipped_pose_offset
else:
assert isinstance(self._last_action, common.Pose)
pose_diff = target_pose * self._last_action.inverse()
clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle(
self.max_mov[1]
)
clipped_pose_offset = clipped_pose_diff * self._last_action
self._last_action = clipped_pose_offset

unclipped_pose = common.Pose(
translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore
rpy_vector=(clipped_pose_offset * self._origin).rotation_rpy().as_vector(),
)
unclipped_pose = common.Pose(
translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore
rpy_vector=(clipped_pose_offset * self._origin).rotation_rpy().as_vector(),
)
clipped_pose = np.concatenate( # type: ignore
[
np.clip(unclipped_pose.translation(), pose_space.low[:3], pose_space.high[:3]),
unclipped_pose.rotation_rpy().as_vector(),
],
)
if self.relative_to == RelativeTo.NONE:
self._last_action = common.Pose(translation=clipped_pose[:3], rpy_vector=clipped_pose[3:])
action.update(TRPYDictType(xyzrpy=clipped_pose))
self._absolute_action = clipped_pose

elif self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TQuat and self.tquat_key in action:
assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode."
assert isinstance(self.max_mov, tuple)
pose_space = cast(gym.spaces.Box, get_space(TQuatDictType).spaces[self.tquat_key])
target_pose = common.Pose(
translation=action[self.tquat_key][:3],
quaternion=action[self.tquat_key][3:],
)

if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None:
clipped_pose_offset = (
common.Pose(
translation=action[self.tquat_key][:3],
quaternion=action[self.tquat_key][3:],
)
.limit_translation_length(self.max_mov[0])
.limit_rotation_angle(self.max_mov[1])
)
self._last_action = clipped_pose_offset
else:
assert isinstance(self._last_action, common.Pose)
pose_diff = (
common.Pose(
translation=action[self.tquat_key][:3],
quaternion=action[self.tquat_key][3:],
)
* self._last_action.inverse()
if self.relative_to == RelativeTo.NONE:
reference_pose = (
self._robot.get_cartesian_position() if self._last_action is None else self._last_action
)
assert isinstance(reference_pose, common.Pose), "Invalid reference type given the control mode."
pose_diff = target_pose * reference_pose.inverse()
clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle(
self.max_mov[1]
)
clipped_pose_offset = clipped_pose_diff * self._last_action
self._last_action = clipped_pose_offset
unclipped_pose = common.Pose(
translation=reference_pose.translation() + clipped_pose_diff.translation(), # type: ignore
quaternion=(clipped_pose_diff * reference_pose).rotation_q(),
)
else:
assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode."
if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None:
clipped_pose_offset = target_pose.limit_translation_length(self.max_mov[0]).limit_rotation_angle(
self.max_mov[1]
)
self._last_action = clipped_pose_offset
else:
assert isinstance(self._last_action, common.Pose)
pose_diff = target_pose * self._last_action.inverse()
clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle(
self.max_mov[1]
)
clipped_pose_offset = clipped_pose_diff * self._last_action
self._last_action = clipped_pose_offset

unclipped_pose = common.Pose(
translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore
quaternion=(clipped_pose_offset * self._origin).rotation_q(),
)
unclipped_pose = common.Pose(
translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore
quaternion=(clipped_pose_offset * self._origin).rotation_q(),
)
clipped_pose = np.concatenate( # type: ignore
[
np.clip(unclipped_pose.translation(), pose_space.low[:3], pose_space.high[:3]),
unclipped_pose.rotation_q(),
],
)
if self.relative_to == RelativeTo.NONE:
self._last_action = common.Pose(translation=clipped_pose[:3], quaternion=clipped_pose[3:])
action.update(TQuatDictType(tquat=clipped_pose)) # type: ignore
self._absolute_action = clipped_pose

Expand Down
2 changes: 1 addition & 1 deletion python/rcs/envs/scenes.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def create_env_from_model(self, cfg: SimEnvCreatorConfig, mjmodel: MjModel) -> g
if prefixed_cfg.gripper_cfgs is not None:
env = self.add_gripper_env(prefixed_cfg, robot_name, simulation, env)

if prefixed_cfg.relative_to != RelativeTo.NONE:
if prefixed_cfg.max_relative_movement is not None or prefixed_cfg.relative_to != RelativeTo.NONE:
env = RelativeActionSpace(
env, max_mov=prefixed_cfg.max_relative_movement, relative_to=prefixed_cfg.relative_to
)
Expand Down
88 changes: 88 additions & 0 deletions python/tests/test_sim_envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,36 @@ def test_relative_non_zero_action(self):
obs, _, _, _, info = env.step(non_zero_action)
self.assert_no_pose_change(info, obs_initial, expected_obs)

def test_absolute_action_delta_limit_trpy(self):
max_translation = 0.05
max_rotation = np.deg2rad(10)
env = build_single_robot_env(
ControlMode.CARTESIAN_TRPY,
with_gripper=False,
with_camera=False,
max_relative_movement=None,
)
env = RelativeActionSpace(env, max_mov=(max_translation, max_rotation), relative_to=RelativeTo.NONE)
obs_initial, _ = env.reset()

assert env.action_space.spaces["xyzrpy"].high[0] > max_translation

target = obs_initial["xyzrpy"].copy()
target[0] += 0.2
target[5] += np.deg2rad(45)
_, _, _, _, info = env.step(TRPYDictType(xyzrpy=target))
absolute_action = info[RelativeActionSpace.ABSOLUTE_ACTION_KEY]

initial_pose = rcs.common.Pose(
translation=np.array(obs_initial["xyzrpy"][:3]), rpy_vector=np.array(obs_initial["xyzrpy"][3:])
)
absolute_pose = rcs.common.Pose(
translation=np.array(absolute_action[:3]), rpy_vector=np.array(absolute_action[3:])
)
pose_diff = absolute_pose * initial_pose.inverse()
assert np.linalg.norm(absolute_action[:3] - obs_initial["xyzrpy"][:3]) <= max_translation + 1e-6
assert pose_diff.total_angle() <= max_rotation + 1e-6

def test_collision_trpy(self):
env = build_single_robot_env(
ControlMode.CARTESIAN_TRPY,
Expand Down Expand Up @@ -327,3 +357,61 @@ def test_relative_zero_action_joints(self):
act.update(GripperDictType(gripper=np.array([1.0])))
obs, _, _, _, info = env.step(act)
self.assert_no_pose_change(info, obs_initial, obs)

def test_absolute_action_delta_limit_joints(self):
max_joint_delta = 0.05
env = build_single_robot_env(
ControlMode.JOINTS,
with_gripper=False,
with_camera=False,
max_relative_movement=None,
)
env = RelativeActionSpace(env, max_mov=max_joint_delta, relative_to=RelativeTo.NONE)
obs_initial, _ = env.reset()

assert np.max(env.action_space.spaces["joints"].high) > max_joint_delta

first_target = obs_initial["joints"].copy()
first_target[0] += 0.2
_, _, _, _, first_info = env.step(JointsDictType(joints=first_target))
first_action = first_info[RelativeActionSpace.ABSOLUTE_ACTION_KEY]
np.testing.assert_allclose(first_action[0], obs_initial["joints"][0] + max_joint_delta, atol=1e-6, rtol=0)

second_target = obs_initial["joints"].copy()
second_target[0] -= 0.2
_, _, _, _, second_info = env.step(JointsDictType(joints=second_target))
second_action = second_info[RelativeActionSpace.ABSOLUTE_ACTION_KEY]
assert np.max(np.abs(second_action - first_action)) <= max_joint_delta + 1e-6


class TestAbsoluteActionSceneCreator:
def test_scene_wraps_absolute_actions_when_relative_to_none(self):
max_joint_delta = 0.05
scene = EmptyWorldFR3()
cfg = copy.deepcopy(scene.config())
cfg.control_mode = ControlMode.JOINTS
cfg.headless = True
cfg.sim_cfg.realtime = False
cfg.sim_cfg.async_control = False
cfg.gripper_cfgs = None
cfg.gripper_offsets = None
cfg.camera_cfgs = None
cfg.camera_adds = None
cfg.max_relative_movement = max_joint_delta
cfg.relative_to = RelativeTo.NONE

env = scene.create_env(cfg)
obs_initial, _ = env.reset()
robot_name = next(iter(obs_initial))
target = obs_initial[robot_name]["joints"].copy()
target[0] += 0.2
_, _, _, _, info = env.step({robot_name: JointsDictType(joints=target)})

assert RelativeActionSpace.ABSOLUTE_ACTION_KEY in info[robot_name]
absolute_action = info[robot_name][RelativeActionSpace.ABSOLUTE_ACTION_KEY]
np.testing.assert_allclose(
absolute_action[0],
obs_initial[robot_name]["joints"][0] + max_joint_delta,
atol=1e-6,
rtol=0,
)
Loading