diff --git a/extensions/rcs_fr3/src/rcs_fr3/creators.py b/extensions/rcs_fr3/src/rcs_fr3/creators.py index e28401e4..b95d8fbc 100644 --- a/extensions/rcs_fr3/src/rcs_fr3/creators.py +++ b/extensions/rcs_fr3/src/rcs_fr3/creators.py @@ -205,7 +205,7 @@ def create_env(self, cfg: FR3HardwareEnvCreatorConfig) -> gym.Env: logger.info("CameraSet started") env = CameraSetWrapper(env, camera_set, cfg.wrapper_cfg.include_depth) - if cfg.relative_to != RelativeTo.NONE: + if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE: env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to) return CoverWrapper(env) diff --git a/extensions/rcs_panda/src/rcs_panda/creators.py b/extensions/rcs_panda/src/rcs_panda/creators.py index e85e6565..213c8188 100644 --- a/extensions/rcs_panda/src/rcs_panda/creators.py +++ b/extensions/rcs_panda/src/rcs_panda/creators.py @@ -164,7 +164,7 @@ def create_env(self, cfg: PandaHardwareEnvCreatorConfig) -> gym.Env: logger.info("CameraSet started") env = CameraSetWrapper(env, camera_set) - if cfg.relative_to != RelativeTo.NONE: + if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE: env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to) return CoverWrapper(env) diff --git a/extensions/rcs_so101/src/rcs_so101/creators.py b/extensions/rcs_so101/src/rcs_so101/creators.py index f92969c7..392677ac 100644 --- a/extensions/rcs_so101/src/rcs_so101/creators.py +++ b/extensions/rcs_so101/src/rcs_so101/creators.py @@ -109,7 +109,7 @@ def create_env(self, cfg: SO101HardwareEnvCreatorConfig) -> gym.Env: logger.info("CameraSet started") env = CameraSetWrapper(env, camera_set, include_depth=True) - if cfg.relative_to != RelativeTo.NONE: + if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE: env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to) return CoverWrapper(env) diff --git a/extensions/rcs_ur5e/src/rcs_ur5e/creators.py b/extensions/rcs_ur5e/src/rcs_ur5e/creators.py index decf4fd5..e79f4d7a 100644 --- a/extensions/rcs_ur5e/src/rcs_ur5e/creators.py +++ b/extensions/rcs_ur5e/src/rcs_ur5e/creators.py @@ -128,7 +128,7 @@ def create_env(self, cfg: UR5eHardwareEnvCreatorConfig) -> gym.Env: logger.info("CameraSet started") env = CameraSetWrapper(env, camera_set, include_depth=True) - if cfg.relative_to != RelativeTo.NONE: + if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE: env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to) return CoverWrapper(env) diff --git a/extensions/rcs_xarm7/src/rcs_xarm7/creators.py b/extensions/rcs_xarm7/src/rcs_xarm7/creators.py index d364f2a7..f67d4a62 100644 --- a/extensions/rcs_xarm7/src/rcs_xarm7/creators.py +++ b/extensions/rcs_xarm7/src/rcs_xarm7/creators.py @@ -118,7 +118,7 @@ def create_env(self, cfg: XArm7HardwareEnvCreatorConfig) -> gym.Env: hand = TilburgHand(cfg=cfg.hand_cfg, verbose=True) env = HandWrapper(env, hand, cfg.wrapper_cfg.binary_gripper) - if cfg.relative_to != RelativeTo.NONE: + if cfg.max_relative_movement is not None or cfg.relative_to != RelativeTo.NONE: env = RelativeActionSpace(env, max_mov=cfg.max_relative_movement, relative_to=cfg.relative_to) return CoverWrapper(env) diff --git a/python/rcs/envs/base.py b/python/rcs/envs/base.py index cfe392f6..d85ed6af 100644 --- a/python/rcs/envs/base.py +++ b/python/rcs/envs/base.py @@ -643,7 +643,9 @@ def __init__( ) self.max_mov: float | tuple[float, float] = max_mov - if self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TRPY: + if self.relative_to == RelativeTo.NONE: + pass + elif self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TRPY: assert isinstance(self.max_mov, tuple) self.action_space.spaces.update( get_space( @@ -700,6 +702,7 @@ def reset(self, **kwargs) -> tuple[dict, dict[str, Any]]: self.initial_obs = obs self.set_origin_to_current() self._last_action = None + self._absolute_action = None return obs, info def action(self, action: dict[str, Any]) -> dict[str, Any]: @@ -709,105 +712,130 @@ def action(self, action: dict[str, Any]) -> dict[str, Any]: self.set_origin_to_current() action = copy.deepcopy(action) if self.get_wrapper_attr("get_control_mode")() == ControlMode.JOINTS and self.joints_key in action: - assert isinstance(self._origin, np.ndarray), "Invalid origin type give the control mode." assert isinstance(self.max_mov, float) low, high = self._robot.get_config().joint_limits - # TODO: should we also clip euqally for all joints? - if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None: - limited_joints = np.clip(action[self.joints_key], -self.max_mov, self.max_mov) - self._last_action = limited_joints + if self.relative_to == RelativeTo.NONE: + reference_joints = self._robot.get_joint_position() if self._last_action is None else self._last_action + assert isinstance(reference_joints, np.ndarray), "Invalid reference type given the control mode." + limited_joints_diff = np.clip(action[self.joints_key] - reference_joints, -self.max_mov, self.max_mov) + clipped_joints = np.clip(reference_joints + limited_joints_diff, low, high) + self._last_action = clipped_joints else: - joints_diff = action[self.joints_key] - self._last_action - limited_joints_diff = np.clip(joints_diff, -self.max_mov, self.max_mov) - limited_joints = limited_joints_diff + self._last_action - self._last_action = limited_joints - clipped_joints = np.clip(self._origin + limited_joints, low, high) + assert isinstance(self._origin, np.ndarray), "Invalid origin type give the control mode." + # TODO: should we also clip euqally for all joints? + if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None: + limited_joints = np.clip(action[self.joints_key], -self.max_mov, self.max_mov) + self._last_action = limited_joints + else: + joints_diff = action[self.joints_key] - self._last_action + limited_joints_diff = np.clip(joints_diff, -self.max_mov, self.max_mov) + limited_joints = limited_joints_diff + self._last_action + self._last_action = limited_joints + clipped_joints = np.clip(self._origin + limited_joints, low, high) action.update(JointsDictType(joints=clipped_joints)) self._absolute_action = clipped_joints elif self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TRPY and self.trpy_key in action: - assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode." assert isinstance(self.max_mov, tuple) pose_space = cast(gym.spaces.Box, get_space(TRPYDictType).spaces[self.trpy_key]) + target_pose = common.Pose( + translation=action[self.trpy_key][:3], + rpy_vector=action[self.trpy_key][3:], + ) - if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None: - clipped_pose_offset = ( - common.Pose( - translation=action[self.trpy_key][:3], - rpy_vector=action[self.trpy_key][3:], - ) - .limit_translation_length(self.max_mov[0]) - .limit_rotation_angle(self.max_mov[1]) - ) - self._last_action = clipped_pose_offset - else: - assert isinstance(self._last_action, common.Pose) - pose_diff = ( - common.Pose( - translation=action[self.trpy_key][:3], - rpy_vector=action[self.trpy_key][3:], - ) - * self._last_action.inverse() + if self.relative_to == RelativeTo.NONE: + reference_pose = ( + self._robot.get_cartesian_position() if self._last_action is None else self._last_action ) + assert isinstance(reference_pose, common.Pose), "Invalid reference type given the control mode." + pose_diff = target_pose * reference_pose.inverse() clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle( self.max_mov[1] ) - clipped_pose_offset = clipped_pose_diff * self._last_action - self._last_action = clipped_pose_offset + unclipped_pose = common.Pose( + translation=reference_pose.translation() + clipped_pose_diff.translation(), # type: ignore + rpy_vector=(clipped_pose_diff * reference_pose).rotation_rpy().as_vector(), + ) + else: + assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode." + if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None: + clipped_pose_offset = target_pose.limit_translation_length(self.max_mov[0]).limit_rotation_angle( + self.max_mov[1] + ) + self._last_action = clipped_pose_offset + else: + assert isinstance(self._last_action, common.Pose) + pose_diff = target_pose * self._last_action.inverse() + clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle( + self.max_mov[1] + ) + clipped_pose_offset = clipped_pose_diff * self._last_action + self._last_action = clipped_pose_offset - unclipped_pose = common.Pose( - translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore - rpy_vector=(clipped_pose_offset * self._origin).rotation_rpy().as_vector(), - ) + unclipped_pose = common.Pose( + translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore + rpy_vector=(clipped_pose_offset * self._origin).rotation_rpy().as_vector(), + ) clipped_pose = np.concatenate( # type: ignore [ np.clip(unclipped_pose.translation(), pose_space.low[:3], pose_space.high[:3]), unclipped_pose.rotation_rpy().as_vector(), ], ) + if self.relative_to == RelativeTo.NONE: + self._last_action = common.Pose(translation=clipped_pose[:3], rpy_vector=clipped_pose[3:]) action.update(TRPYDictType(xyzrpy=clipped_pose)) self._absolute_action = clipped_pose elif self.get_wrapper_attr("get_control_mode")() == ControlMode.CARTESIAN_TQuat and self.tquat_key in action: - assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode." assert isinstance(self.max_mov, tuple) pose_space = cast(gym.spaces.Box, get_space(TQuatDictType).spaces[self.tquat_key]) + target_pose = common.Pose( + translation=action[self.tquat_key][:3], + quaternion=action[self.tquat_key][3:], + ) - if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None: - clipped_pose_offset = ( - common.Pose( - translation=action[self.tquat_key][:3], - quaternion=action[self.tquat_key][3:], - ) - .limit_translation_length(self.max_mov[0]) - .limit_rotation_angle(self.max_mov[1]) - ) - self._last_action = clipped_pose_offset - else: - assert isinstance(self._last_action, common.Pose) - pose_diff = ( - common.Pose( - translation=action[self.tquat_key][:3], - quaternion=action[self.tquat_key][3:], - ) - * self._last_action.inverse() + if self.relative_to == RelativeTo.NONE: + reference_pose = ( + self._robot.get_cartesian_position() if self._last_action is None else self._last_action ) + assert isinstance(reference_pose, common.Pose), "Invalid reference type given the control mode." + pose_diff = target_pose * reference_pose.inverse() clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle( self.max_mov[1] ) - clipped_pose_offset = clipped_pose_diff * self._last_action - self._last_action = clipped_pose_offset + unclipped_pose = common.Pose( + translation=reference_pose.translation() + clipped_pose_diff.translation(), # type: ignore + quaternion=(clipped_pose_diff * reference_pose).rotation_q(), + ) + else: + assert isinstance(self._origin, common.Pose), "Invalid origin type given the control mode." + if self.relative_to == RelativeTo.LAST_STEP or self._last_action is None: + clipped_pose_offset = target_pose.limit_translation_length(self.max_mov[0]).limit_rotation_angle( + self.max_mov[1] + ) + self._last_action = clipped_pose_offset + else: + assert isinstance(self._last_action, common.Pose) + pose_diff = target_pose * self._last_action.inverse() + clipped_pose_diff = pose_diff.limit_translation_length(self.max_mov[0]).limit_rotation_angle( + self.max_mov[1] + ) + clipped_pose_offset = clipped_pose_diff * self._last_action + self._last_action = clipped_pose_offset - unclipped_pose = common.Pose( - translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore - quaternion=(clipped_pose_offset * self._origin).rotation_q(), - ) + unclipped_pose = common.Pose( + translation=self._origin.translation() + clipped_pose_offset.translation(), # type: ignore + quaternion=(clipped_pose_offset * self._origin).rotation_q(), + ) clipped_pose = np.concatenate( # type: ignore [ np.clip(unclipped_pose.translation(), pose_space.low[:3], pose_space.high[:3]), unclipped_pose.rotation_q(), ], ) + if self.relative_to == RelativeTo.NONE: + self._last_action = common.Pose(translation=clipped_pose[:3], quaternion=clipped_pose[3:]) action.update(TQuatDictType(tquat=clipped_pose)) # type: ignore self._absolute_action = clipped_pose diff --git a/python/rcs/envs/scenes.py b/python/rcs/envs/scenes.py index ef281673..5d70843d 100644 --- a/python/rcs/envs/scenes.py +++ b/python/rcs/envs/scenes.py @@ -359,7 +359,7 @@ def create_env_from_model(self, cfg: SimEnvCreatorConfig, mjmodel: MjModel) -> g if prefixed_cfg.gripper_cfgs is not None: env = self.add_gripper_env(prefixed_cfg, robot_name, simulation, env) - if prefixed_cfg.relative_to != RelativeTo.NONE: + if prefixed_cfg.max_relative_movement is not None or prefixed_cfg.relative_to != RelativeTo.NONE: env = RelativeActionSpace( env, max_mov=prefixed_cfg.max_relative_movement, relative_to=prefixed_cfg.relative_to ) diff --git a/python/tests/test_sim_envs.py b/python/tests/test_sim_envs.py index 325d1ef4..73b3564d 100644 --- a/python/tests/test_sim_envs.py +++ b/python/tests/test_sim_envs.py @@ -174,6 +174,36 @@ def test_relative_non_zero_action(self): obs, _, _, _, info = env.step(non_zero_action) self.assert_no_pose_change(info, obs_initial, expected_obs) + def test_absolute_action_delta_limit_trpy(self): + max_translation = 0.05 + max_rotation = np.deg2rad(10) + env = build_single_robot_env( + ControlMode.CARTESIAN_TRPY, + with_gripper=False, + with_camera=False, + max_relative_movement=None, + ) + env = RelativeActionSpace(env, max_mov=(max_translation, max_rotation), relative_to=RelativeTo.NONE) + obs_initial, _ = env.reset() + + assert env.action_space.spaces["xyzrpy"].high[0] > max_translation + + target = obs_initial["xyzrpy"].copy() + target[0] += 0.2 + target[5] += np.deg2rad(45) + _, _, _, _, info = env.step(TRPYDictType(xyzrpy=target)) + absolute_action = info[RelativeActionSpace.ABSOLUTE_ACTION_KEY] + + initial_pose = rcs.common.Pose( + translation=np.array(obs_initial["xyzrpy"][:3]), rpy_vector=np.array(obs_initial["xyzrpy"][3:]) + ) + absolute_pose = rcs.common.Pose( + translation=np.array(absolute_action[:3]), rpy_vector=np.array(absolute_action[3:]) + ) + pose_diff = absolute_pose * initial_pose.inverse() + assert np.linalg.norm(absolute_action[:3] - obs_initial["xyzrpy"][:3]) <= max_translation + 1e-6 + assert pose_diff.total_angle() <= max_rotation + 1e-6 + def test_collision_trpy(self): env = build_single_robot_env( ControlMode.CARTESIAN_TRPY, @@ -327,3 +357,61 @@ def test_relative_zero_action_joints(self): act.update(GripperDictType(gripper=np.array([1.0]))) obs, _, _, _, info = env.step(act) self.assert_no_pose_change(info, obs_initial, obs) + + def test_absolute_action_delta_limit_joints(self): + max_joint_delta = 0.05 + env = build_single_robot_env( + ControlMode.JOINTS, + with_gripper=False, + with_camera=False, + max_relative_movement=None, + ) + env = RelativeActionSpace(env, max_mov=max_joint_delta, relative_to=RelativeTo.NONE) + obs_initial, _ = env.reset() + + assert np.max(env.action_space.spaces["joints"].high) > max_joint_delta + + first_target = obs_initial["joints"].copy() + first_target[0] += 0.2 + _, _, _, _, first_info = env.step(JointsDictType(joints=first_target)) + first_action = first_info[RelativeActionSpace.ABSOLUTE_ACTION_KEY] + np.testing.assert_allclose(first_action[0], obs_initial["joints"][0] + max_joint_delta, atol=1e-6, rtol=0) + + second_target = obs_initial["joints"].copy() + second_target[0] -= 0.2 + _, _, _, _, second_info = env.step(JointsDictType(joints=second_target)) + second_action = second_info[RelativeActionSpace.ABSOLUTE_ACTION_KEY] + assert np.max(np.abs(second_action - first_action)) <= max_joint_delta + 1e-6 + + +class TestAbsoluteActionSceneCreator: + def test_scene_wraps_absolute_actions_when_relative_to_none(self): + max_joint_delta = 0.05 + scene = EmptyWorldFR3() + cfg = copy.deepcopy(scene.config()) + cfg.control_mode = ControlMode.JOINTS + cfg.headless = True + cfg.sim_cfg.realtime = False + cfg.sim_cfg.async_control = False + cfg.gripper_cfgs = None + cfg.gripper_offsets = None + cfg.camera_cfgs = None + cfg.camera_adds = None + cfg.max_relative_movement = max_joint_delta + cfg.relative_to = RelativeTo.NONE + + env = scene.create_env(cfg) + obs_initial, _ = env.reset() + robot_name = next(iter(obs_initial)) + target = obs_initial[robot_name]["joints"].copy() + target[0] += 0.2 + _, _, _, _, info = env.step({robot_name: JointsDictType(joints=target)}) + + assert RelativeActionSpace.ABSOLUTE_ACTION_KEY in info[robot_name] + absolute_action = info[robot_name][RelativeActionSpace.ABSOLUTE_ACTION_KEY] + np.testing.assert_allclose( + absolute_action[0], + obs_initial[robot_name]["joints"][0] + max_joint_delta, + atol=1e-6, + rtol=0, + )