# Trace: Diagnose and Enhance Voltage Error Reporting

Agent: pi | Model: GPT-5.3 Codex | Project: robot-1

---

## User

$ lerobot-setup-motors --robot.type=so101_follower --robot.port=/dev/tty.usbmodem59710814421
Connect the controller board to the 'gripper' motor only and press enter.
Some motors found returned an error status:
{1: '[RxPacketError] Input voltage error!'}


I'm getting this error and I know why it's happening but I want to dig deep and understand the hardware stuff. What's actually happening is I'm using a 12 V barrel jack adapter with a 7.4 V motor and that's what the error is caused by. I want to go see how the library, the LeRobot library, actually checks this and all of that. I want to understand what it's doing so that if I were to write this whole stack myself, how that might look. 

go to ./lerobot/ to find the source code for this command

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "ls -la"
}
```

## Assistant

**Result** (success):
```
total 8
drwxr-xr-x@  5 mukund  staff   160 Mar 18 04:01 .
drwxr-xr-x@ 59 mukund  staff  1888 Mar 18 03:54 ..
drwxr-xr-x@  9 mukund  staff   288 Mar 18 03:59 .venv
drwxr-xr-x@ 29 mukund  staff   928 Mar 18 03:58 lerobot
-rw-r--r--@  1 mukund  staff   271 Mar 18 04:01 README.md
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "rg -n \"setup-motors|setup_motors|lerobot-setup-motors|so101_follower|Input voltage error|RxPacketError\" lerobot"
}
```

## Assistant

**Result** (success):
```
lerobot/pyproject.toml:219:lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"
lerobot/docs/source/smolvla.mdx:101:  --robot.type=so101_follower \
lerobot/docs/source/integrate_hardware.mdx:21:For a good example of how it can be used, you can have a look at our own [SO101 follower implementation](https://github.com/huggingface/lerobot/blob/main/src/lerobot/robots/so_follower/so101_follower/so101_follower.py)
lerobot/docs/source/lekiwi.mdx:125:lerobot-setup-motors \
lerobot/docs/source/il_robots.mdx:49:    --robot.type=so101_follower \
lerobot/docs/source/il_robots.mdx:178:    --robot.type=so101_follower \
lerobot/docs/source/il_robots.mdx:400:    --robot.type=so101_follower \
lerobot/docs/source/backwardcomp.mdx:120:    --robot.type=so101_follower \
lerobot/docs/source/koch.mdx:95:For a visual reference on how to set the motor ids please refer to [this video](https://huggingface.co/docs/lerobot/en/so101#setup-motors-video) where we follow the process for the SO101 arm.
lerobot/docs/source/koch.mdx:97:<hfoptions id="setup_motors">
lerobot/docs/source/koch.mdx:101:lerobot-setup-motors \
lerobot/docs/source/koch.mdx:118:follower.setup_motors()
lerobot/docs/source/koch.mdx:173:<hfoptions id="setup_motors">
lerobot/docs/source/koch.mdx:177:lerobot-setup-motors \
lerobot/docs/source/koch.mdx:194:leader.setup_motors()
lerobot/docs/source/lerobot-dataset-v3.mdx:34:  --robot.type=so101_follower \
lerobot/docs/source/so100.mdx:90:For a visual reference on how to set the motor ids please refer to [this video](https://huggingface.co/docs/lerobot/en/so101#setup-motors-video) where we follow the process for the SO101 arm.
lerobot/docs/source/so100.mdx:92:<hfoptions id="setup_motors">
lerobot/docs/source/so100.mdx:96:lerobot-setup-motors \
lerobot/docs/source/so100.mdx:113:follower.setup_motors()
lerobot/docs/source/so100.mdx:168:<hfoptions id="setup_motors">
lerobot/docs/source/so100.mdx:171:lerobot-setup-motors \
lerobot/docs/source/so100.mdx:187:leader.setup_motors()
lerobot/docs/source/so101.mdx:117:      src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/lerobot/setup_motors_so101_2.mp4"
lerobot/docs/source/so101.mdx:127:<hfoptions id="setup_motors">
lerobot/docs/source/so101.mdx:131:lerobot-setup-motors \
lerobot/docs/source/so101.mdx:132:    --robot.type=so101_follower \
lerobot/docs/source/so101.mdx:148:follower.setup_motors()
lerobot/docs/source/so101.mdx:203:<hfoptions id="setup_motors">
lerobot/docs/source/so101.mdx:207:lerobot-setup-motors \
lerobot/docs/source/so101.mdx:224:leader.setup_motors()
lerobot/docs/source/so101.mdx:370:    --robot.type=so101_follower \
lerobot/tests/test_cli_peft.py:226:                "--robot.type=so101_follower",
lerobot/src/lerobot/motors/feetech/feetech.py:421:            display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}
lerobot/src/lerobot/motors/motors_bus.py:215:    def getRxPacketError(self, error): ...
lerobot/src/lerobot/motors/motors_bus.py:958:                raise RuntimeError(self.packet_handler.getRxPacketError(error))
lerobot/src/lerobot/motors/motors_bus.py:1046:            raise RuntimeError(f"{err_msg} {self.packet_handler.getRxPacketError(error)}")
lerobot/src/lerobot/motors/motors_bus.py:1107:            raise RuntimeError(f"{err_msg} {self.packet_handler.getRxPacketError(error)}")
lerobot/src/lerobot/robots/openarm_follower/openarm_follower.py:216:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/so_follower/config_so_follower.py:45:@RobotConfig.register_subclass("so101_follower")
lerobot/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py:92:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py:93:        self.left_arm.setup_motors()
lerobot/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py:94:        self.right_arm.setup_motors()
lerobot/src/lerobot/teleoperators/so_leader/so_leader.py:133:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/so_follower/so_follower.py:171:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py:109:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/openarm_mini/openarm_mini.py:258:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/lekiwi/lekiwi.py:201:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/koch_leader/koch_leader.py:156:    def setup_motors(self) -> None:
lerobot/src/lerobot/async_inference/constants.py:29:SUPPORTED_ROBOTS = ["so100_follower", "so101_follower", "bi_so_follower", "omx_follower"]
lerobot/src/lerobot/robots/omx_follower/omx_follower.py:161:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/openarm_leader/openarm_leader.py:180:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py:133:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/omx_leader/omx_leader.py:145:    def setup_motors(self) -> None:
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:21:lerobot-setup-motors \
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:58:    "so101_follower",
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:77:def setup_motors(cfg: SetupConfig):
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:86:    device.setup_motors()
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:90:    setup_motors()
lerobot/src/lerobot/robots/koch_follower/koch_follower.py:178:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/bi_so_follower/bi_so_follower.py:117:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/bi_so_follower/bi_so_follower.py:118:        self.left_arm.setup_motors()
lerobot/src/lerobot/robots/bi_so_follower/bi_so_follower.py:119:        self.right_arm.setup_motors()
lerobot/src/lerobot/robots/hope_jr/hope_jr_hand.py:153:    def setup_motors(self) -> None:
lerobot/src/lerobot/scripts/lerobot_teleoperate.py:22:    --robot.type=so101_follower \
lerobot/src/lerobot/robots/hope_jr/hope_jr_arm.py:123:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/utils.py:39:    elif config.type == "so101_follower":
lerobot/tests/async_inference/test_robot_client.py:247:      error: argument --robot.type: invalid choice: 'so101_follower' (choose from )
lerobot/tests/async_inference/test_robot_client.py:259:        "so101_follower",
lerobot/tests/motors/test_dynamixel.py:164:            RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!")
lerobot/tests/motors/test_dynamixel.py:220:            RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!")
lerobot/tests/motors/test_feetech.py:191:        with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
lerobot/tests/motors/test_feetech.py:251:        with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/src/lerobot/scripts/lerobot_setup_motors.py"
}
```

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/src/lerobot/robots/so_follower/so_follower.py"
}
```

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/src/lerobot/motors/feetech/feetech.py"
}
```

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/src/lerobot/motors/motors_bus.py"
}
```

## Assistant

**Result** (success):
```
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Helper to set motor ids and baudrate.

Example:

```shell
lerobot-setup-motors \
    --teleop.type=so100_leader \
    --teleop.port=/dev/tty.usbmodem575E0031751
```
"""

from dataclasses import dataclass

import draccus

from lerobot.robots import (  # noqa: F401
    RobotConfig,
    bi_so_follower,
    koch_follower,
    lekiwi,
    make_robot_from_config,
    omx_follower,
    so_follower,
)
from lerobot.teleoperators import (  # noqa: F401
    TeleoperatorConfig,
    bi_so_leader,
    koch_leader,
    make_teleoperator_from_config,
    omx_leader,
    openarm_mini,
    so_leader,
)

COMPATIBLE_DEVICES = [
    "koch_follower",
    "koch_leader",
    "omx_follower",
    "omx_leader",
    "openarm_mini",
    "so100_follower",
    "so100_leader",
    "so101_follower",
    "so101_leader",
    "lekiwi",
]


@dataclass
class SetupConfig:
    teleop: TeleoperatorConfig | None = None
    robot: RobotConfig | None = None

    def __post_init__(self):
        if bool(self.teleop) == bool(self.robot):
            raise ValueError("Choose either a teleop or a robot.")

        self.device = self.robot if self.robot else self.teleop


@draccus.wrap()
def setup_motors(cfg: SetupConfig):
    if cfg.device.type not in COMPATIBLE_DEVICES:
        raise NotImplementedError

    if isinstance(cfg.device, RobotConfig):
        device = make_robot_from_config(cfg.device)
    else:
        device = make_teleoperator_from_config(cfg.device)

    device.setup_motors()


def main():
    setup_motors()


if __name__ == "__main__":
    main()
```

## Assistant

**Result** (success):
```
#!/usr/bin/env python

# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time
from functools import cached_property

from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.feetech import (
    FeetechMotorsBus,
    OperatingMode,
)
from lerobot.types import RobotAction, RobotObservation
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected

from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_so_follower import SOFollowerRobotConfig

logger = logging.getLogger(__name__)


class SOFollower(Robot):
    """
    Generic SO follower base implementing common functionality for SO-100/101/10X.
    Designed to be subclassed with a per-hardware-model `config_class` and `name`.
    """

    config_class = SOFollowerRobotConfig
    name = "so_follower"

    def __init__(self, config: SOFollowerRobotConfig):
        super().__init__(config)
        self.config = config
        # choose normalization mode depending on config if available
        norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
        self.bus = FeetechMotorsBus(
            port=self.config.port,
            motors={
                "shoulder_pan": Motor(1, "sts3215", norm_mode_body),
                "shoulder_lift": Motor(2, "sts3215", norm_mode_body),
                "elbow_flex": Motor(3, "sts3215", norm_mode_body),
                "wrist_flex": Motor(4, "sts3215", norm_mode_body),
                "wrist_roll": Motor(5, "sts3215", norm_mode_body),
                "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
            },
            calibration=self.calibration,
        )
        self.cameras = make_cameras_from_configs(config.cameras)

    @property
    def _motors_ft(self) -> dict[str, type]:
        return {f"{motor}.pos": float for motor in self.bus.motors}

    @property
    def _cameras_ft(self) -> dict[str, tuple]:
        return {
            cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
        }

    @cached_property
    def observation_features(self) -> dict[str, type | tuple]:
        return {**self._motors_ft, **self._cameras_ft}

    @cached_property
    def action_features(self) -> dict[str, type]:
        return self._motors_ft

    @property
    def is_connected(self) -> bool:
        return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())

    @check_if_already_connected
    def connect(self, calibrate: bool = True) -> None:
        """
        We assume that at connection time, arm is in a rest position,
        and torque can be safely disabled to run calibration.
        """

        self.bus.connect()
        if not self.is_calibrated and calibrate:
            logger.info(
                "Mismatch between calibration values in the motor and the calibration file or no calibration file found"
            )
            self.calibrate()

        for cam in self.cameras.values():
            cam.connect()

        self.configure()
        logger.info(f"{self} connected.")

    @property
    def is_calibrated(self) -> bool:
        return self.bus.is_calibrated

    def calibrate(self) -> None:
        if self.calibration:
            # Calibration file exists, ask user whether to use it or run new calibration
            user_input = input(
                f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
            )
            if user_input.strip().lower() != "c":
                logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
                self.bus.write_calibration(self.calibration)
                return

        logger.info(f"\nRunning calibration of {self}")
        self.bus.disable_torque()
        for motor in self.bus.motors:
            self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)

        input(f"Move {self} to the middle of its range of motion and press ENTER....")
        homing_offsets = self.bus.set_half_turn_homings()

        # Attempt to call record_ranges_of_motion with a reduced motor set when appropriate.
        full_turn_motor = "wrist_roll"
        unknown_range_motors = [motor for motor in self.bus.motors if motor != full_turn_motor]
        print(
            f"Move all joints except '{full_turn_motor}' sequentially through their "
            "entire ranges of motion.\nRecording positions. Press ENTER to stop..."
        )
        range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
        range_mins[full_turn_motor] = 0
        range_maxes[full_turn_motor] = 4095

        self.calibration = {}
        for motor, m in self.bus.motors.items():
            self.calibration[motor] = MotorCalibration(
                id=m.id,
                drive_mode=0,
                homing_offset=homing_offsets[motor],
                range_min=range_mins[motor],
                range_max=range_maxes[motor],
            )

        self.bus.write_calibration(self.calibration)
        self._save_calibration()
        print("Calibration saved to", self.calibration_fpath)

    def configure(self) -> None:
        with self.bus.torque_disabled():
            self.bus.configure_motors()
            for motor in self.bus.motors:
                self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
                # Set P_Coefficient to lower value to avoid shakiness (Default is 32)
                self.bus.write("P_Coefficient", motor, 16)
                # Set I_Coefficient and D_Coefficient to default value 0 and 32
                self.bus.write("I_Coefficient", motor, 0)
                self.bus.write("D_Coefficient", motor, 32)

                if motor == "gripper":
                    self.bus.write("Max_Torque_Limit", motor, 500)  # 50% of max torque to avoid burnout
                    self.bus.write("Protection_Current", motor, 250)  # 50% of max current to avoid burnout
                    self.bus.write("Overload_Torque", motor, 25)  # 25% torque when overloaded

    def setup_motors(self) -> None:
        for motor in reversed(self.bus.motors):
            input(f"Connect the controller board to the '{motor}' motor only and press enter.")
            self.bus.setup_motor(motor)
            print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")

    @check_if_not_connected
    def get_observation(self) -> RobotObservation:
        # Read arm position
        start = time.perf_counter()
        obs_dict = self.bus.sync_read("Present_Position")
        obs_dict = {f"{motor}.pos": val for motor, val in obs_dict.items()}
        dt_ms = (time.perf_counter() - start) * 1e3
        logger.debug(f"{self} read state: {dt_ms:.1f}ms")

        # Capture images from cameras
        for cam_key, cam in self.cameras.items():
            start = time.perf_counter()
            obs_dict[cam_key] = cam.read_latest()
            dt_ms = (time.perf_counter() - start) * 1e3
            logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")

        return obs_dict

    @check_if_not_connected
    def send_action(self, action: RobotAction) -> RobotAction:
        """Command arm to move to a target joint configuration.

        The relative action magnitude may be clipped depending on the configuration parameter
        `max_relative_target`. In this case, the action sent differs from original action.
        Thus, this function always returns the action actually sent.

        Raises:
            RobotDeviceNotConnectedError: if robot is not connected.

        Returns:
            RobotAction: the action sent to the motors, potentially clipped.
        """

        goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}

        # Cap goal position when too far away from present position.
        # /!\ Slower fps expected due to reading from the follower.
        if self.config.max_relative_target is not None:
            present_pos = self.bus.sync_read("Present_Position")
            goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
            goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)

        # Send goal position to the arm
        self.bus.sync_write("Goal_Position", goal_pos)
        return {f"{motor}.pos": val for motor, val in goal_pos.items()}

    @check_if_not_connected
    def disconnect(self):
        self.bus.disconnect(self.config.disable_torque_on_disconnect)
        for cam in self.cameras.values():
            cam.disconnect()

        logger.info(f"{self} disconnected.")


SO100Follower = SOFollower
SO101Follower = SOFollower
```

## Assistant

**Result** (success):
```
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from copy import deepcopy
from enum import Enum
from pprint import pformat

from ..encoding_utils import decode_sign_magnitude, encode_sign_magnitude
from ..motors_bus import Motor, MotorCalibration, NameOrID, SerialMotorsBus, Value, get_address
from .tables import (
    FIRMWARE_MAJOR_VERSION,
    FIRMWARE_MINOR_VERSION,
    MODEL_BAUDRATE_TABLE,
    MODEL_CONTROL_TABLE,
    MODEL_ENCODING_TABLE,
    MODEL_NUMBER,
    MODEL_NUMBER_TABLE,
    MODEL_PROTOCOL,
    MODEL_RESOLUTION,
    SCAN_BAUDRATES,
)

DEFAULT_PROTOCOL_VERSION = 0
DEFAULT_BAUDRATE = 1_000_000
DEFAULT_TIMEOUT_MS = 1000

NORMALIZED_DATA = ["Goal_Position", "Present_Position"]

logger = logging.getLogger(__name__)


class OperatingMode(Enum):
    # position servo mode
    POSITION = 0
    # The motor is in constant speed mode, which is controlled by parameter 0x2e, and the highest bit 15 is
    # the direction bit
    VELOCITY = 1
    # PWM open-loop speed regulation mode, with parameter 0x2c running time parameter control, bit11 as
    # direction bit
    PWM = 2
    # In step servo mode, the number of step progress is represented by parameter 0x2a, and the highest bit 15
    # is the direction bit
    STEP = 3


class DriveMode(Enum):
    NON_INVERTED = 0
    INVERTED = 1


class TorqueMode(Enum):
    ENABLED = 1
    DISABLED = 0


def _split_into_byte_chunks(value: int, length: int) -> list[int]:
    import scservo_sdk as scs

    if length == 1:
        data = [value]
    elif length == 2:
        data = [scs.SCS_LOBYTE(value), scs.SCS_HIBYTE(value)]
    elif length == 4:
        data = [
            scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
            scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
            scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
            scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
        ]
    return data


def patch_setPacketTimeout(self, packet_length):  # noqa: N802
    """
    HACK: This patches the PortHandler behavior to set the correct packet timeouts.

    It fixes https://gitee.com/ftservo/SCServoSDK/issues/IBY2S6
    The bug is fixed on the official Feetech SDK repo (https://gitee.com/ftservo/FTServo_Python)
    but because that version is not published on PyPI, we rely on the (unofficial) on that is, which needs
    patching.
    """
    self.packet_start_time = self.getCurrentTime()
    self.packet_timeout = (self.tx_time_per_byte * packet_length) + (self.tx_time_per_byte * 3.0) + 50


class FeetechMotorsBus(SerialMotorsBus):
    """
    The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the
    python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk.
    """

    apply_drive_mode = True
    available_baudrates = deepcopy(SCAN_BAUDRATES)
    default_baudrate = DEFAULT_BAUDRATE
    default_timeout = DEFAULT_TIMEOUT_MS
    model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
    model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
    model_encoding_table = deepcopy(MODEL_ENCODING_TABLE)
    model_number_table = deepcopy(MODEL_NUMBER_TABLE)
    model_resolution_table = deepcopy(MODEL_RESOLUTION)
    normalized_data = deepcopy(NORMALIZED_DATA)

    def __init__(
        self,
        port: str,
        motors: dict[str, Motor],
        calibration: dict[str, MotorCalibration] | None = None,
        protocol_version: int = DEFAULT_PROTOCOL_VERSION,
    ):
        super().__init__(port, motors, calibration)
        self.protocol_version = protocol_version
        self._assert_same_protocol()
        import scservo_sdk as scs

        self.port_handler = scs.PortHandler(self.port)
        # HACK: monkeypatch
        self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__(  # type: ignore[method-assign]
            self.port_handler, scs.PortHandler
        )
        self.packet_handler = scs.PacketHandler(protocol_version)
        self.sync_reader = scs.GroupSyncRead(self.port_handler, self.packet_handler, 0, 0)
        self.sync_writer = scs.GroupSyncWrite(self.port_handler, self.packet_handler, 0, 0)
        self._comm_success = scs.COMM_SUCCESS
        self._no_error = 0x00

        if any(MODEL_PROTOCOL[model] != self.protocol_version for model in self.models):
            raise ValueError(f"Some motors are incompatible with protocol_version={self.protocol_version}")

    def _assert_same_protocol(self) -> None:
        if any(MODEL_PROTOCOL[model] != self.protocol_version for model in self.models):
            raise RuntimeError("Some motors use an incompatible protocol.")

    def _assert_protocol_is_compatible(self, instruction_name: str) -> None:
        if instruction_name == "sync_read" and self.protocol_version == 1:
            raise NotImplementedError(
                "'Sync Read' is not available with Feetech motors using Protocol 1. Use 'Read' sequentially instead."
            )
        if instruction_name == "broadcast_ping" and self.protocol_version == 1:
            raise NotImplementedError(
                "'Broadcast Ping' is not available with Feetech motors using Protocol 1. Use 'Ping' sequentially instead."
            )

    def _assert_same_firmware(self) -> None:
        firmware_versions = self._read_firmware_version(self.ids, raise_on_error=True)
        if len(set(firmware_versions.values())) != 1:
            raise RuntimeError(
                "Some Motors use different firmware versions:"
                f"\n{pformat(firmware_versions)}\n"
                "Update their firmware first using Feetech's software. "
                "Visit https://www.feetechrc.com/software."
            )

    def _handshake(self) -> None:
        self._assert_motors_exist()
        self._assert_same_firmware()

    def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
        if self.protocol_version == 0:
            return self._find_single_motor_p0(motor, initial_baudrate)
        else:
            return self._find_single_motor_p1(motor, initial_baudrate)

    def _find_single_motor_p0(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
        model = self.motors[motor].model
        search_baudrates = (
            [initial_baudrate] if initial_baudrate is not None else self.model_baudrate_table[model]
        )
        expected_model_nb = self.model_number_table[model]

        for baudrate in search_baudrates:
            self.set_baudrate(baudrate)
            id_model = self.broadcast_ping()
            if id_model:
                found_id, found_model = next(iter(id_model.items()))
                if found_model != expected_model_nb:
                    raise RuntimeError(
                        f"Found one motor on {baudrate=} with id={found_id} but it has a "
                        f"model number '{found_model}' different than the one expected: '{expected_model_nb}'. "
                        f"Make sure you are connected only connected to the '{motor}' motor (model '{model}')."
                    )
                return baudrate, found_id

        raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.")

    def _find_single_motor_p1(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
        import scservo_sdk as scs

        model = self.motors[motor].model
        search_baudrates = (
            [initial_baudrate] if initial_baudrate is not None else self.model_baudrate_table[model]
        )
        expected_model_nb = self.model_number_table[model]

        for baudrate in search_baudrates:
            self.set_baudrate(baudrate)
            for id_ in range(scs.MAX_ID + 1):
                found_model = self.ping(id_)
                if found_model is not None:
                    if found_model != expected_model_nb:
                        raise RuntimeError(
                            f"Found one motor on {baudrate=} with id={id_} but it has a "
                            f"model number '{found_model}' different than the one expected: '{expected_model_nb}'. "
                            f"Make sure you are connected only connected to the '{motor}' motor (model '{model}')."
                        )
                    return baudrate, id_

        raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.")

    def configure_motors(self, return_delay_time=0, maximum_acceleration=254, acceleration=254) -> None:
        for motor in self.motors:
            # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on
            # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).
            self.write("Return_Delay_Time", motor, return_delay_time)
            # Set 'Maximum_Acceleration' to 254 to speedup acceleration and deceleration of the motors.
            if self.protocol_version == 0:
                self.write("Maximum_Acceleration", motor, maximum_acceleration)
            self.write("Acceleration", motor, acceleration)

    @property
    def is_calibrated(self) -> bool:
        motors_calibration = self.read_calibration()
        if set(motors_calibration) != set(self.calibration):
            return False

        same_ranges = all(
            self.calibration[motor].range_min == cal.range_min
            and self.calibration[motor].range_max == cal.range_max
            for motor, cal in motors_calibration.items()
        )
        if self.protocol_version == 1:
            return same_ranges

        same_offsets = all(
            self.calibration[motor].homing_offset == cal.homing_offset
            for motor, cal in motors_calibration.items()
        )
        return same_ranges and same_offsets

    def read_calibration(self) -> dict[str, MotorCalibration]:
        offsets, mins, maxes = {}, {}, {}
        for motor in self.motors:
            mins[motor] = self.read("Min_Position_Limit", motor, normalize=False)
            maxes[motor] = self.read("Max_Position_Limit", motor, normalize=False)
            offsets[motor] = (
                self.read("Homing_Offset", motor, normalize=False) if self.protocol_version == 0 else 0
            )

        calibration = {}
        for motor, m in self.motors.items():
            calibration[motor] = MotorCalibration(
                id=m.id,
                drive_mode=0,
                homing_offset=int(offsets[motor]),
                range_min=int(mins[motor]),
                range_max=int(maxes[motor]),
            )

        return calibration

    def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
        for motor, calibration in calibration_dict.items():
            if self.protocol_version == 0:
                self.write("Homing_Offset", motor, calibration.homing_offset)
            self.write("Min_Position_Limit", motor, calibration.range_min)
            self.write("Max_Position_Limit", motor, calibration.range_max)

        if cache:
            self.calibration = calibration_dict

    def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
        """
        On Feetech Motors:
        Present_Position = Actual_Position - Homing_Offset
        """
        half_turn_homings: dict[NameOrID, Value] = {}
        for motor, pos in positions.items():
            model = self._get_motor_model(motor)
            max_res = self.model_resolution_table[model] - 1
            half_turn_homings[motor] = pos - int(max_res / 2)

        return half_turn_homings

    def disable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
        for motor in self._get_motors_list(motors):
            self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
            self.write("Lock", motor, 0, num_retry=num_retry)

    def _disable_torque(self, motor: int, model: str, num_retry: int = 0) -> None:
        addr, length = get_address(self.model_ctrl_table, model, "Torque_Enable")
        self._write(addr, length, motor, TorqueMode.DISABLED.value, num_retry=num_retry)
        addr, length = get_address(self.model_ctrl_table, model, "Lock")
        self._write(addr, length, motor, 0, num_retry=num_retry)

    def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
        for motor in self._get_motors_list(motors):
            self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
            self.write("Lock", motor, 1, num_retry=num_retry)

    def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
        for id_ in ids_values:
            model = self._id_to_model(id_)
            encoding_table = self.model_encoding_table.get(model)
            if encoding_table and data_name in encoding_table:
                sign_bit = encoding_table[data_name]
                ids_values[id_] = encode_sign_magnitude(ids_values[id_], sign_bit)

        return ids_values

    def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
        for id_ in ids_values:
            model = self._id_to_model(id_)
            encoding_table = self.model_encoding_table.get(model)
            if encoding_table and data_name in encoding_table:
                sign_bit = encoding_table[data_name]
                ids_values[id_] = decode_sign_magnitude(ids_values[id_], sign_bit)

        return ids_values

    def _split_into_byte_chunks(self, value: int, length: int) -> list[int]:
        return _split_into_byte_chunks(value, length)

    def _broadcast_ping(self) -> tuple[dict[int, int], int]:
        import scservo_sdk as scs

        data_list: dict[int, int] = {}

        status_length = 6

        rx_length = 0
        wait_length = status_length * scs.MAX_ID

        txpacket = [0] * 6

        tx_time_per_byte = (1000.0 / self.port_handler.getBaudRate()) * 10.0

        txpacket[scs.PKT_ID] = scs.BROADCAST_ID
        txpacket[scs.PKT_LENGTH] = 2
        txpacket[scs.PKT_INSTRUCTION] = scs.INST_PING

        result = self.packet_handler.txPacket(self.port_handler, txpacket)
        if result != scs.COMM_SUCCESS:
            self.port_handler.is_using = False
            return data_list, result

        # set rx timeout
        self.port_handler.setPacketTimeoutMillis((wait_length * tx_time_per_byte) + (3.0 * scs.MAX_ID) + 16.0)

        rxpacket = []
        while not self.port_handler.isPacketTimeout() and rx_length < wait_length:
            rxpacket += self.port_handler.readPort(wait_length - rx_length)
            rx_length = len(rxpacket)

        self.port_handler.is_using = False

        if rx_length == 0:
            return data_list, scs.COMM_RX_TIMEOUT

        while True:
            if rx_length < status_length:
                return data_list, scs.COMM_RX_CORRUPT

            # find packet header
            for idx in range(0, (rx_length - 1)):
                if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
                    break

            if idx == 0:  # found at the beginning of the packet
                # calculate checksum
                checksum = 0
                for idx in range(2, status_length - 1):  # except header & checksum
                    checksum += rxpacket[idx]

                checksum = ~checksum & 0xFF
                if rxpacket[status_length - 1] == checksum:
                    result = scs.COMM_SUCCESS
                    data_list[rxpacket[scs.PKT_ID]] = rxpacket[scs.PKT_ERROR]

                    del rxpacket[0:status_length]
                    rx_length = rx_length - status_length

                    if rx_length == 0:
                        return data_list, result
                else:
                    result = scs.COMM_RX_CORRUPT
                    # remove header (0xFF 0xFF)
                    del rxpacket[0:2]
                    rx_length = rx_length - 2
            else:
                # remove unnecessary packets
                del rxpacket[0:idx]
                rx_length = rx_length - idx

    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
        self._assert_protocol_is_compatible("broadcast_ping")
        for n_try in range(1 + num_retry):
            ids_status, comm = self._broadcast_ping()
            if self._is_comm_success(comm):
                break
            logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")
            logger.debug(self.packet_handler.getTxRxResult(comm))

        if not self._is_comm_success(comm):
            if raise_on_error:
                raise ConnectionError(self.packet_handler.getTxRxResult(comm))
            return None

        ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
        if ids_errors:
            display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}
            logger.error(f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}")

        return self._read_model_number(list(ids_status), raise_on_error)

    def _read_firmware_version(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, str]:
        firmware_versions = {}
        for id_ in motor_ids:
            firm_ver_major, comm, error = self._read(
                *FIRMWARE_MAJOR_VERSION, id_, raise_on_error=raise_on_error
            )
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            firm_ver_minor, comm, error = self._read(
                *FIRMWARE_MINOR_VERSION, id_, raise_on_error=raise_on_error
            )
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            firmware_versions[id_] = f"{firm_ver_major}.{firm_ver_minor}"

        return firmware_versions

    def _read_model_number(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]:
        model_numbers = {}
        for id_ in motor_ids:
            model_nb, comm, error = self._read(*MODEL_NUMBER, id_, raise_on_error=raise_on_error)
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            model_numbers[id_] = model_nb

        return model_numbers
```

## Assistant

**Result** (success):
```
#!/usr/bin/env python

# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# ruff: noqa: N802
# This noqa is for the Protocols classes: PortHandler, PacketHandler GroupSyncRead/Write
# TODO(aliberts): Add block noqa when feature below is available
# https://github.com/astral-sh/ruff/issues/3711

from __future__ import annotations

import abc
import logging
from collections.abc import Sequence
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from pprint import pformat
from typing import Protocol

import serial
from deepdiff import DeepDiff
from tqdm import tqdm

from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from lerobot.utils.utils import enter_pressed, move_cursor_up

type NameOrID = str | int
type Value = int | float

logger = logging.getLogger(__name__)


class MotorsBusBase(abc.ABC):
    """
    Base class for all motor bus implementations.

    This is a minimal interface that all motor buses must implement, regardless of their
    communication protocol (serial, CAN, etc.).
    """

    def __init__(
        self,
        port: str,
        motors: dict[str, Motor],
        calibration: dict[str, MotorCalibration] | None = None,
    ):
        self.port = port
        self.motors = motors
        self.calibration = calibration if calibration else {}

    @abc.abstractmethod
    def connect(self, handshake: bool = True) -> None:
        """Establish connection to the motors."""
        pass

    @abc.abstractmethod
    def disconnect(self, disable_torque: bool = True) -> None:
        """Disconnect from the motors."""
        pass

    @property
    @abc.abstractmethod
    def is_connected(self) -> bool:
        """Check if connected to the motors."""
        pass

    @abc.abstractmethod
    def read(self, data_name: str, motor: str) -> Value:
        """Read a value from a single motor."""
        pass

    @abc.abstractmethod
    def write(self, data_name: str, motor: str, value: Value) -> None:
        """Write a value to a single motor."""
        pass

    @abc.abstractmethod
    def sync_read(self, data_name: str, motors: str | list[str] | None = None) -> dict[str, Value]:
        """Read a value from multiple motors."""
        pass

    @abc.abstractmethod
    def sync_write(self, data_name: str, values: dict[str, Value]) -> None:
        """Write values to multiple motors."""
        pass

    @abc.abstractmethod
    def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
        """Enable torque on selected motors."""
        pass

    @abc.abstractmethod
    def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
        """Disable torque on selected motors."""
        pass

    @abc.abstractmethod
    def read_calibration(self) -> dict[str, MotorCalibration]:
        """Read calibration parameters from the motors."""
        pass

    @abc.abstractmethod
    def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
        """Write calibration parameters to the motors."""
        pass


def get_ctrl_table(model_ctrl_table: dict[str, dict], model: str) -> dict[str, tuple[int, int]]:
    ctrl_table = model_ctrl_table.get(model)
    if ctrl_table is None:
        raise KeyError(f"Control table for {model=} not found.")
    return ctrl_table


def get_address(model_ctrl_table: dict[str, dict], model: str, data_name: str) -> tuple[int, int]:
    ctrl_table = get_ctrl_table(model_ctrl_table, model)
    addr_bytes = ctrl_table.get(data_name)
    if addr_bytes is None:
        raise KeyError(f"Address for '{data_name}' not found in {model} control table.")
    return addr_bytes


def assert_same_address(model_ctrl_table: dict[str, dict], motor_models: list[str], data_name: str) -> None:
    all_addr = []
    all_bytes = []
    for model in motor_models:
        addr, bytes = get_address(model_ctrl_table, model, data_name)
        all_addr.append(addr)
        all_bytes.append(bytes)

    if len(set(all_addr)) != 1:
        raise NotImplementedError(
            f"At least two motor models use a different address for `data_name`='{data_name}'"
            f"({list(zip(motor_models, all_addr, strict=False))})."
        )

    if len(set(all_bytes)) != 1:
        raise NotImplementedError(
            f"At least two motor models use a different bytes representation for `data_name`='{data_name}'"
            f"({list(zip(motor_models, all_bytes, strict=False))})."
        )


class MotorNormMode(str, Enum):
    RANGE_0_100 = "range_0_100"
    RANGE_M100_100 = "range_m100_100"
    DEGREES = "degrees"


@dataclass
class MotorCalibration:
    id: int
    drive_mode: int
    homing_offset: int
    range_min: int
    range_max: int


@dataclass
class Motor:
    id: int
    model: str
    norm_mode: MotorNormMode
    motor_type_str: str | None = None
    recv_id: int | None = None


class PortHandler(Protocol):
    is_open: bool
    baudrate: int
    packet_start_time: float
    packet_timeout: float
    tx_time_per_byte: float
    is_using: bool
    port_name: str
    ser: serial.Serial

    def __init__(self, port_name: str) -> None: ...

    def openPort(self): ...
    def closePort(self): ...
    def clearPort(self): ...
    def setPortName(self, port_name): ...
    def getPortName(self): ...
    def setBaudRate(self, baudrate): ...
    def getBaudRate(self): ...
    def getBytesAvailable(self): ...
    def readPort(self, length): ...
    def writePort(self, packet): ...
    def setPacketTimeout(self, packet_length): ...
    def setPacketTimeoutMillis(self, msec): ...
    def isPacketTimeout(self): ...
    def getCurrentTime(self): ...
    def getTimeSinceStart(self): ...
    def setupPort(self, cflag_baud): ...
    def getCFlagBaud(self, baudrate): ...


class PacketHandler(Protocol):
    def getTxRxResult(self, result): ...
    def getRxPacketError(self, error): ...
    def txPacket(self, port, txpacket): ...
    def rxPacket(self, port): ...
    def txRxPacket(self, port, txpacket): ...
    def ping(self, port, id): ...
    def action(self, port, id): ...
    def readTx(self, port, id, address, length): ...
    def readRx(self, port, id, length): ...
    def readTxRx(self, port, id, address, length): ...
    def read1ByteTx(self, port, id, address): ...
    def read1ByteRx(self, port, id): ...
    def read1ByteTxRx(self, port, id, address): ...
    def read2ByteTx(self, port, id, address): ...
    def read2ByteRx(self, port, id): ...
    def read2ByteTxRx(self, port, id, address): ...
    def read4ByteTx(self, port, id, address): ...
    def read4ByteRx(self, port, id): ...
    def read4ByteTxRx(self, port, id, address): ...
    def writeTxOnly(self, port, id, address, length, data): ...
    def writeTxRx(self, port, id, address, length, data): ...
    def write1ByteTxOnly(self, port, id, address, data): ...
    def write1ByteTxRx(self, port, id, address, data): ...
    def write2ByteTxOnly(self, port, id, address, data): ...
    def write2ByteTxRx(self, port, id, address, data): ...
    def write4ByteTxOnly(self, port, id, address, data): ...
    def write4ByteTxRx(self, port, id, address, data): ...
    def regWriteTxOnly(self, port, id, address, length, data): ...
    def regWriteTxRx(self, port, id, address, length, data): ...
    def syncReadTx(self, port, start_address, data_length, param, param_length): ...
    def syncWriteTxOnly(self, port, start_address, data_length, param, param_length): ...
    def broadcastPing(self, port): ...


class GroupSyncRead(Protocol):
    port: str
    ph: PortHandler
    start_address: int
    data_length: int
    last_result: bool
    is_param_changed: bool
    param: list
    data_dict: dict

    def __init__(
        self, port: PortHandler, ph: PacketHandler, start_address: int, data_length: int
    ) -> None: ...
    def makeParam(self): ...
    def addParam(self, id): ...
    def removeParam(self, id): ...
    def clearParam(self): ...
    def txPacket(self): ...
    def rxPacket(self): ...
    def txRxPacket(self): ...
    def isAvailable(self, id, address, data_length): ...
    def getData(self, id, address, data_length): ...


class GroupSyncWrite(Protocol):
    port: str
    ph: PortHandler
    start_address: int
    data_length: int
    is_param_changed: bool
    param: list
    data_dict: dict

    def __init__(
        self, port: PortHandler, ph: PacketHandler, start_address: int, data_length: int
    ) -> None: ...
    def makeParam(self): ...
    def addParam(self, id, data): ...
    def removeParam(self, id): ...
    def changeParam(self, id, data): ...
    def clearParam(self): ...
    def txPacket(self): ...


class SerialMotorsBus(MotorsBusBase):
    """
    A SerialMotorsBus allows to efficiently read and write to motors connected via serial communication.
    It represents several motors daisy-chained together and connected through a serial port.
    There are currently two implementations of this class:
        - DynamixelMotorsBus
        - FeetechMotorsBus

    This class is specifically for serial-based motor protocols (Dynamixel, Feetech, etc.).

    A MotorsBus subclass instance requires a port (e.g. `FeetechMotorsBus(port="/dev/tty.usbmodem575E0031751"`)).
    To find the port, you can run our utility script:
    ```bash
    lerobot-find-port.py
    >>> Finding all available ports for the MotorsBus.
    >>> ["/dev/tty.usbmodem575E0032081", "/dev/tty.usbmodem575E0031751"]
    >>> Remove the usb cable from your MotorsBus and press Enter when done.
    >>> The port of this MotorsBus is /dev/tty.usbmodem575E0031751.
    >>> Reconnect the usb cable.
    ```

    Example of usage for 1 Feetech sts3215 motor connected to the bus:
    ```python
    bus = FeetechMotorsBus(
        port="/dev/tty.usbmodem575E0031751",
        motors={"my_motor": (1, "sts3215")},
    )
    bus.connect()

    position = bus.read("Present_Position", "my_motor", normalize=False)

    # Move from a few motor steps as an example
    few_steps = 30
    bus.write("Goal_Position", "my_motor", position + few_steps, normalize=False)

    # When done, properly disconnect the port using
    bus.disconnect()
    ```
    """

    apply_drive_mode: bool
    available_baudrates: list[int]
    default_baudrate: int
    default_timeout: int
    model_baudrate_table: dict[str, dict]
    model_ctrl_table: dict[str, dict]
    model_encoding_table: dict[str, dict]
    model_number_table: dict[str, int]
    model_resolution_table: dict[str, int]
    normalized_data: list[str]

    def __init__(
        self,
        port: str,
        motors: dict[str, Motor],
        calibration: dict[str, MotorCalibration] | None = None,
    ):
        super().__init__(port, motors, calibration)

        self.port_handler: PortHandler
        self.packet_handler: PacketHandler
        self.sync_reader: GroupSyncRead
        self.sync_writer: GroupSyncWrite
        self._comm_success: int
        self._no_error: int

        self._id_to_model_dict = {m.id: m.model for m in self.motors.values()}
        self._id_to_name_dict = {m.id: motor for motor, m in self.motors.items()}
        self._model_nb_to_model_dict = {v: k for k, v in self.model_number_table.items()}

        self._validate_motors()

    def __len__(self):
        return len(self.motors)

    def __repr__(self):
        return (
            f"{self.__class__.__name__}(\n"
            f"    Port: '{self.port}',\n"
            f"    Motors: \n{pformat(self.motors, indent=8, sort_dicts=False)},\n"
            ")',\n"
        )

    @cached_property
    def _has_different_ctrl_tables(self) -> bool:
        if len(self.models) < 2:
            return False

        first_table = self.model_ctrl_table[self.models[0]]
        return any(
            DeepDiff(first_table, get_ctrl_table(self.model_ctrl_table, model)) for model in self.models[1:]
        )

    @cached_property
    def models(self) -> list[str]:
        return [m.model for m in self.motors.values()]

    @cached_property
    def ids(self) -> list[int]:
        return [m.id for m in self.motors.values()]

    def _model_nb_to_model(self, motor_nb: int) -> str:
        return self._model_nb_to_model_dict[motor_nb]

    def _id_to_model(self, motor_id: int) -> str:
        return self._id_to_model_dict[motor_id]

    def _id_to_name(self, motor_id: int) -> str:
        return self._id_to_name_dict[motor_id]

    def _get_motor_id(self, motor: NameOrID) -> int:
        if isinstance(motor, str):
            return self.motors[motor].id
        elif isinstance(motor, int):
            return motor
        else:
            raise TypeError(f"'{motor}' should be int, str.")

    def _get_motor_model(self, motor: NameOrID) -> str:
        if isinstance(motor, str):
            return self.motors[motor].model
        elif isinstance(motor, int):
            return self._id_to_model_dict[motor]
        else:
            raise TypeError(f"'{motor}' should be int, str.")

    def _get_motors_list(self, motors: NameOrID | Sequence[NameOrID] | None) -> list[str]:
        if motors is None:
            return list(self.motors)
        elif isinstance(motors, str):
            return [motors]
        elif isinstance(motors, int):
            return [self._id_to_name(motors)]
        elif isinstance(motors, Sequence):
            return [m if isinstance(m, str) else self._id_to_name(m) for m in motors]
        else:
            raise TypeError(motors)

    def _get_ids_values_dict(self, values: Value | dict[str, Value] | None) -> dict[int, Value]:
        if isinstance(values, (int | float)):
            return dict.fromkeys(self.ids, values)
        elif isinstance(values, dict):
            return {self.motors[motor].id: val for motor, val in values.items()}
        else:
            raise TypeError(f"'values' is expected to be a single value or a dict. Got {values}")

    def _validate_motors(self) -> None:
        if len(self.ids) != len(set(self.ids)):
            raise ValueError(f"Some motors have the same id!\n{self}")

        # Ensure ctrl table available for all models
        for model in self.models:
            get_ctrl_table(self.model_ctrl_table, model)

    def _is_comm_success(self, comm: int) -> bool:
        return comm == self._comm_success

    def _is_error(self, error: int) -> bool:
        return error != self._no_error

    def _assert_motors_exist(self) -> None:
        expected_models = {m.id: self.model_number_table[m.model] for m in self.motors.values()}

        found_models = {}
        for id_ in self.ids:
            model_nb = self.ping(id_)
            if model_nb is not None:
                found_models[id_] = model_nb

        missing_ids = [id_ for id_ in self.ids if id_ not in found_models]
        wrong_models = {
            id_: (expected_models[id_], found_models[id_])
            for id_ in found_models
            if expected_models.get(id_) != found_models[id_]
        }

        if missing_ids or wrong_models:
            error_lines = [f"{self.__class__.__name__} motor check failed on port '{self.port}':"]

            if missing_ids:
                error_lines.append("\nMissing motor IDs:")
                error_lines.extend(
                    f"  - {id_} (expected model: {expected_models[id_]})" for id_ in missing_ids
                )

            if wrong_models:
                error_lines.append("\nMotors with incorrect model numbers:")
                error_lines.extend(
                    f"  - {id_} ({self._id_to_name(id_)}): expected {expected}, found {found}"
                    for id_, (expected, found) in wrong_models.items()
                )

            error_lines.append("\nFull expected motor list (id: model_number):")
            error_lines.append(pformat(expected_models, indent=4, sort_dicts=False))
            error_lines.append("\nFull found motor list (id: model_number):")
            error_lines.append(pformat(found_models, indent=4, sort_dicts=False))

            raise RuntimeError("\n".join(error_lines))

    @abc.abstractmethod
    def _assert_protocol_is_compatible(self, instruction_name: str) -> None:
        pass

    @property
    def is_connected(self) -> bool:
        """bool: `True` if the underlying serial port is open."""
        return self.port_handler.is_open

    @check_if_already_connected
    def connect(self, handshake: bool = True) -> None:
        """Open the serial port and initialise communication.

        Args:
            handshake (bool, optional): Pings every expected motor and performs additional
                integrity checks specific to the implementation. Defaults to `True`.

        Raises:
            DeviceAlreadyConnectedError: The port is already open.
            ConnectionError: The underlying SDK failed to open the port or the handshake did not succeed.
        """

        self._connect(handshake)
        self.set_timeout()
        logger.debug(f"{self.__class__.__name__} connected.")

    def _connect(self, handshake: bool = True) -> None:
        try:
            if not self.port_handler.openPort():
                raise OSError(f"Failed to open port '{self.port}'.")
            elif handshake:
                self._handshake()
        except (FileNotFoundError, OSError, serial.SerialException) as e:
            raise ConnectionError(
                f"\nCould not connect on port '{self.port}'. Make sure you are using the correct port."
                "\nTry running `lerobot-find-port`\n"
            ) from e

    @abc.abstractmethod
    def _handshake(self) -> None:
        pass

    @check_if_not_connected
    def disconnect(self, disable_torque: bool = True) -> None:
        """Close the serial port (optionally disabling torque first).

        Args:
            disable_torque (bool, optional): If `True` (default) torque is disabled on every motor before
                closing the port. This can prevent damaging motors if they are left applying resisting torque
                after disconnect.
        """

        if disable_torque:
            self.port_handler.clearPort()
            self.port_handler.is_using = False
            self.disable_torque(num_retry=5)

        self.port_handler.closePort()
        logger.debug(f"{self.__class__.__name__} disconnected.")

    @classmethod
    def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]:
        """Probe *port* at every supported baud-rate and list responding IDs.

        Args:
            port (str): Serial/USB port to scan (e.g. ``"/dev/ttyUSB0"``).
            *args, **kwargs: Forwarded to the subclass constructor.

        Returns:
            dict[int, list[int]]: Mapping *baud-rate → list of motor IDs*
            for every baud-rate that produced at least one response.
        """
        bus = cls(port, {}, *args, **kwargs)
        bus._connect(handshake=False)
        baudrate_ids = {}
        for baudrate in tqdm(bus.available_baudrates, desc="Scanning port"):
            bus.set_baudrate(baudrate)
            ids_models = bus.broadcast_ping()
            if ids_models:
                tqdm.write(f"Motors found for {baudrate=}: {pformat(ids_models, indent=4)}")
                baudrate_ids[baudrate] = list(ids_models)

        bus.port_handler.closePort()
        return baudrate_ids

    def setup_motor(
        self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None
    ) -> None:
        """Assign the correct ID and baud-rate to a single motor.

        This helper temporarily switches to the motor's current settings, disables torque, sets the desired
        ID, and finally programs the bus' default baud-rate.

        Args:
            motor (str): Key of the motor in :pyattr:`motors`.
            initial_baudrate (int | None, optional): Current baud-rate (skips scanning when provided).
                Defaults to None.
            initial_id (int | None, optional): Current ID (skips scanning when provided). Defaults to None.

        Raises:
            RuntimeError: The motor could not be found or its model number
                does not match the expected one.
            ConnectionError: Communication with the motor failed.
        """
        if not self.is_connected:
            self._connect(handshake=False)

        if initial_baudrate is None:
            initial_baudrate, initial_id = self._find_single_motor(motor)

        if initial_id is None:
            _, initial_id = self._find_single_motor(motor, initial_baudrate)

        model = self.motors[motor].model
        target_id = self.motors[motor].id
        self.set_baudrate(initial_baudrate)
        self._disable_torque(initial_id, model)

        # Set ID
        addr, length = get_address(self.model_ctrl_table, model, "ID")
        self._write(addr, length, initial_id, target_id)

        # Set Baudrate
        addr, length = get_address(self.model_ctrl_table, model, "Baud_Rate")
        baudrate_value = self.model_baudrate_table[model][self.default_baudrate]
        self._write(addr, length, target_id, baudrate_value)

        self.set_baudrate(self.default_baudrate)

    @abc.abstractmethod
    def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
        pass

    @abc.abstractmethod
    def configure_motors(self) -> None:
        """Write implementation-specific recommended settings to every motor.

        Typical changes include shortening the return delay, increasing
        acceleration limits or disabling safety locks.
        """
        pass

    @abc.abstractmethod
    def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
        """Disable torque on selected motors.

        Disabling Torque allows to write to the motors' permanent memory area (EPROM/EEPROM).

        Args:
            motors ( str | list[str] | None, optional): Target motors.  Accepts a motor name, an ID, a
                list of names or `None` to affect every registered motor.  Defaults to `None`.
            num_retry (int, optional): Number of additional retry attempts on communication failure.
                Defaults to 0.
        """
        pass

    @abc.abstractmethod
    def _disable_torque(self, motor: int, model: str, num_retry: int = 0) -> None:
        pass

    @abc.abstractmethod
    def enable_torque(self, motors: int | str | list[str] | None = None, num_retry: int = 0) -> None:
        """Enable torque on selected motors.

        Args:
            motors (int | str | list[str] | None, optional): Same semantics as :pymeth:`disable_torque`.
                Defaults to `None`.
            num_retry (int, optional): Number of additional retry attempts on communication failure.
                Defaults to 0.
        """
        pass

    @contextmanager
    def torque_disabled(self, motors: str | list[str] | None = None):
        """Context-manager that guarantees torque is re-enabled.

        This helper is useful to temporarily disable torque when configuring motors.

        Examples:
            >>> with bus.torque_disabled():
            ...     # Safe operations here
            ...     pass
        """
        self.disable_torque(motors)
        try:
            yield
        finally:
            self.enable_torque(motors)

    def set_timeout(self, timeout_ms: int | None = None):
        """Change the packet timeout used by the SDK.

        Args:
            timeout_ms (int | None, optional): Timeout in *milliseconds*. If `None` (default) the method falls
                back to :pyattr:`default_timeout`.
        """
        timeout_ms = timeout_ms if timeout_ms is not None else self.default_timeout
        self.port_handler.setPacketTimeoutMillis(timeout_ms)

    def get_baudrate(self) -> int:
        """Return the current baud-rate configured on the port.

        Returns:
            int: Baud-rate in bits / second.
        """
        return self.port_handler.getBaudRate()

    def set_baudrate(self, baudrate: int) -> None:
        """Set a new UART baud-rate on the port.

        Args:
            baudrate (int): Desired baud-rate in bits / second.

        Raises:
            RuntimeError: The SDK failed to apply the change.
        """
        present_bus_baudrate = self.port_handler.getBaudRate()
        if present_bus_baudrate != baudrate:
            logger.info(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
            self.port_handler.setBaudRate(baudrate)

            if self.port_handler.getBaudRate() != baudrate:
                raise RuntimeError("Failed to write bus baud rate.")

    @property
    @abc.abstractmethod
    def is_calibrated(self) -> bool:
        """bool: ``True`` if the cached calibration matches the motors."""
        pass

    @abc.abstractmethod
    def read_calibration(self) -> dict[str, MotorCalibration]:
        """Read calibration parameters from the motors.

        Returns:
            dict[str, MotorCalibration]: Mapping *motor name → calibration*.
        """
        pass

    @abc.abstractmethod
    def write_calibration(self, calibration_dict: dict[str, MotorCalibration], cache: bool = True) -> None:
        """Write calibration parameters to the motors and optionally cache them.

        Args:
            calibration_dict (dict[str, MotorCalibration]): Calibration obtained from
                :pymeth:`read_calibration` or crafted by the user.
            cache (bool, optional): Save the calibration to :pyattr:`calibration`. Defaults to True.
        """
        pass

    def reset_calibration(self, motors: NameOrID | Sequence[NameOrID] | None = None) -> None:
        """Restore factory calibration for the selected motors.

        Homing offset is set to ``0`` and min/max position limits are set to the full usable range.
        The in-memory :pyattr:`calibration` is cleared.

        Args:
            motors (NameOrID | Sequence[NameOrID] | None, optional): Selection of motors. `None` (default)
                resets every motor.
        """
        motor_names = self._get_motors_list(motors)

        for motor in motor_names:
            model = self._get_motor_model(motor)
            max_res = self.model_resolution_table[model] - 1
            self.write("Homing_Offset", motor, 0, normalize=False)
            self.write("Min_Position_Limit", motor, 0, normalize=False)
            self.write("Max_Position_Limit", motor, max_res, normalize=False)

        self.calibration = {}

    def set_half_turn_homings(
        self, motors: NameOrID | Sequence[NameOrID] | None = None
    ) -> dict[NameOrID, Value]:
        """Centre each motor range around its current position.

        The function computes and writes a homing offset such that the present position becomes exactly one
        half-turn (e.g. `2047` on a 12-bit encoder).

        Args:
            motors (NameOrID | list[NameOrID] | None, optional): Motors to adjust. Defaults to all motors (`None`).

        Returns:
            dict[str, Value]: Mapping *motor name → written homing offset*.
        """
        motor_names = self._get_motors_list(motors)

        self.reset_calibration(motor_names)
        actual_positions = self.sync_read("Present_Position", motor_names, normalize=False)
        homing_offsets = self._get_half_turn_homings(actual_positions)
        for motor, offset in homing_offsets.items():
            self.write("Homing_Offset", motor, offset)

        return homing_offsets

    @abc.abstractmethod
    def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
        pass

    def record_ranges_of_motion(
        self, motors: NameOrID | Sequence[NameOrID] | None = None, display_values: bool = True
    ) -> tuple[dict[str, Value], dict[str, Value]]:
        """Interactively record the min/max encoder values of each motor.

        Move the joints by hand (with torque disabled) while the method streams live positions. Press
        :kbd:`Enter` to finish.

        Args:
            motors (NameOrID | list[NameOrID] | None, optional): Motors to record.
                Defaults to every motor (`None`).
            display_values (bool, optional): When `True` (default) a live table is printed to the console.

        Returns:
            tuple[dict[str, Value], dict[str, Value]]: Two dictionaries *mins* and *maxes* with the
                extreme values observed for each motor.
        """
        motor_names = self._get_motors_list(motors)

        start_positions = self.sync_read("Present_Position", motor_names, normalize=False)
        mins = start_positions.copy()
        maxes = start_positions.copy()

        user_pressed_enter = False
        while not user_pressed_enter:
            positions = self.sync_read("Present_Position", motor_names, normalize=False)
            mins = {motor: min(positions[motor], min_) for motor, min_ in mins.items()}
            maxes = {motor: max(positions[motor], max_) for motor, max_ in maxes.items()}

            if display_values:
                print("\n-------------------------------------------")
                print(f"{'NAME':<15} | {'MIN':>6} | {'POS':>6} | {'MAX':>6}")
                for motor in motor_names:
                    print(f"{motor:<15} | {mins[motor]:>6} | {positions[motor]:>6} | {maxes[motor]:>6}")

            if enter_pressed():
                user_pressed_enter = True

            if display_values and not user_pressed_enter:
                # Move cursor up to overwrite the previous output
                move_cursor_up(len(motor_names) + 3)

        same_min_max = [motor for motor in motor_names if mins[motor] == maxes[motor]]
        if same_min_max:
            raise ValueError(f"Some motors have the same min and max values:\n{pformat(same_min_max)}")

        return mins, maxes

    def _normalize(self, ids_values: dict[int, int]) -> dict[int, float]:
        if not self.calibration:
            raise RuntimeError(f"{self} has no calibration registered.")

        normalized_values = {}
        for id_, val in ids_values.items():
            motor = self._id_to_name(id_)
            min_ = self.calibration[motor].range_min
            max_ = self.calibration[motor].range_max
            drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
            if max_ == min_:
                raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")

            bounded_val = min(max_, max(min_, val))
            if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
                norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
                normalized_values[id_] = -norm if drive_mode else norm
            elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
                norm = ((bounded_val - min_) / (max_ - min_)) * 100
                normalized_values[id_] = 100 - norm if drive_mode else norm
            elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
                mid = (min_ + max_) / 2
                max_res = self.model_resolution_table[self._id_to_model(id_)] - 1
                normalized_values[id_] = (val - mid) * 360 / max_res
            else:
                raise NotImplementedError

        return normalized_values

    def _unnormalize(self, ids_values: dict[int, float]) -> dict[int, int]:
        if not self.calibration:
            raise RuntimeError(f"{self} has no calibration registered.")

        unnormalized_values = {}
        for id_, val in ids_values.items():
            motor = self._id_to_name(id_)
            min_ = self.calibration[motor].range_min
            max_ = self.calibration[motor].range_max
            drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
            if max_ == min_:
                raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")

            if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
                val = -val if drive_mode else val
                bounded_val = min(100.0, max(-100.0, val))
                unnormalized_values[id_] = int(((bounded_val + 100) / 200) * (max_ - min_) + min_)
            elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
                val = 100 - val if drive_mode else val
                bounded_val = min(100.0, max(0.0, val))
                unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_)
            elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
                mid = (min_ + max_) / 2
                max_res = self.model_resolution_table[self._id_to_model(id_)] - 1
                unnormalized_values[id_] = int((val * max_res / 360) + mid)
            else:
                raise NotImplementedError

        return unnormalized_values

    @abc.abstractmethod
    def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
        pass

    @abc.abstractmethod
    def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
        pass

    def _serialize_data(self, value: int, length: int) -> list[int]:
        """
        Converts an unsigned integer value into a list of byte-sized integers to be sent via a communication
        protocol. Depending on the protocol, split values can be in big-endian or little-endian order.

        Supported data length for both Feetech and Dynamixel:
            - 1 (for values 0 to 255)
            - 2 (for values 0 to 65,535)
            - 4 (for values 0 to 4,294,967,295)
        """
        if value < 0:
            raise ValueError(f"Negative values are not allowed: {value}")

        max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(length)
        if max_value is None:
            raise NotImplementedError(f"Unsupported byte size: {length}. Expected [1, 2, 4].")

        if value > max_value:
            raise ValueError(f"Value {value} exceeds the maximum for {length} bytes ({max_value}).")

        return self._split_into_byte_chunks(value, length)

    @abc.abstractmethod
    def _split_into_byte_chunks(self, value: int, length: int) -> list[int]:
        """Convert an integer into a list of byte-sized integers."""
        pass

    def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
        """Ping a single motor and return its model number.

        Args:
            motor (NameOrID): Target motor (name or ID).
            num_retry (int, optional): Extra attempts before giving up. Defaults to `0`.
            raise_on_error (bool, optional): If `True` communication errors raise exceptions instead of
                returning `None`. Defaults to `False`.

        Returns:
            int | None: Motor model number or `None` on failure.
        """
        id_ = self._get_motor_id(motor)
        for n_try in range(1 + num_retry):
            model_number, comm, error = self.packet_handler.ping(self.port_handler, id_)
            if self._is_comm_success(comm):
                break
            logger.debug(f"ping failed for {id_=}: {n_try=} got {comm=} {error=}")

        if not self._is_comm_success(comm):
            if raise_on_error:
                raise ConnectionError(self.packet_handler.getTxRxResult(comm))
            else:
                return None
        if self._is_error(error):
            if raise_on_error:
                raise RuntimeError(self.packet_handler.getRxPacketError(error))
            else:
                return None

        return model_number

    @abc.abstractmethod
    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
        """Ping every ID on the bus using the broadcast address.

        Args:
            num_retry (int, optional): Retry attempts.  Defaults to `0`.
            raise_on_error (bool, optional): When `True` failures raise an exception instead of returning
                `None`. Defaults to `False`.

        Returns:
            dict[int, int] | None: Mapping *id → model number* or `None` if the call failed.
        """
        pass

    @check_if_not_connected
    def read(
        self,
        data_name: str,
        motor: str,
        *,
        normalize: bool = True,
        num_retry: int = 0,
    ) -> Value:
        """Read a register from a motor.

        Args:
            data_name (str): Control-table key (e.g. `"Present_Position"`).
            motor (str): Motor name.
            normalize (bool, optional): When `True` (default) scale the value to a user-friendly range as
                defined by the calibration.
            num_retry (int, optional): Retry attempts.  Defaults to `0`.

        Returns:
            Value: Raw or normalised value depending on *normalize*.
        """

        id_ = self.motors[motor].id
        model = self.motors[motor].model
        addr, length = get_address(self.model_ctrl_table, model, data_name)

        err_msg = f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries."
        value, _, _ = self._read(addr, length, id_, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)

        decoded = self._decode_sign(data_name, {id_: value})

        if normalize and data_name in self.normalized_data:
            normalized = self._normalize(decoded)
            return normalized[id_]

        return decoded[id_]

    def _read(
        self,
        address: int,
        length: int,
        motor_id: int,
        *,
        num_retry: int = 0,
        raise_on_error: bool = True,
        err_msg: str = "",
    ) -> tuple[int, int, int]:
        if length == 1:
            read_fn = self.packet_handler.read1ByteTxRx
        elif length == 2:
            read_fn = self.packet_handler.read2ByteTxRx
        elif length == 4:
            read_fn = self.packet_handler.read4ByteTxRx
        else:
            raise ValueError(length)

        for n_try in range(1 + num_retry):
            value, comm, error = read_fn(self.port_handler, motor_id, address)
            if self._is_comm_success(comm):
                break
            logger.debug(
                f"Failed to read @{address=} ({length=}) on {motor_id=} ({n_try=}): "
                + self.packet_handler.getTxRxResult(comm)
            )

        if not self._is_comm_success(comm) and raise_on_error:
            raise ConnectionError(f"{err_msg} {self.packet_handler.getTxRxResult(comm)}")
        elif self._is_error(error) and raise_on_error:
            raise RuntimeError(f"{err_msg} {self.packet_handler.getRxPacketError(error)}")

        return value, comm, error

    @check_if_not_connected
    def write(
        self, data_name: str, motor: str, value: Value, *, normalize: bool = True, num_retry: int = 0
    ) -> None:
        """Write a value to a single motor's register.

        Contrary to :pymeth:`sync_write`, this expects a response status packet emitted by the motor, which
        provides a guarantee that the value was written to the register successfully. In consequence, it is
        slower than :pymeth:`sync_write` but it is more reliable. It should typically be used when configuring
        motors.

        Args:
            data_name (str): Register name.
            motor (str): Motor name.
            value (Value): Value to write.  If *normalize* is `True` the value is first converted to raw
                units.
            normalize (bool, optional): Enable or disable normalisation. Defaults to `True`.
            num_retry (int, optional): Retry attempts.  Defaults to `0`.
        """

        id_ = self.motors[motor].id
        model = self.motors[motor].model
        addr, length = get_address(self.model_ctrl_table, model, data_name)

        int_value = int(value)
        if normalize and data_name in self.normalized_data:
            int_value = self._unnormalize({id_: value})[id_]

        int_value = self._encode_sign(data_name, {id_: int_value})[id_]

        err_msg = f"Failed to write '{data_name}' on {id_=} with '{int_value}' after {num_retry + 1} tries."
        self._write(addr, length, id_, int_value, num_retry=num_retry, raise_on_error=True, err_msg=err_msg)

    def _write(
        self,
        addr: int,
        length: int,
        motor_id: int,
        value: int,
        *,
        num_retry: int = 0,
        raise_on_error: bool = True,
        err_msg: str = "",
    ) -> tuple[int, int]:
        data = self._serialize_data(value, length)
        for n_try in range(1 + num_retry):
            comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, length, data)
            if self._is_comm_success(comm):
                break
            logger.debug(
                f"Failed to sync write @{addr=} ({length=}) on id={motor_id} with {value=} ({n_try=}): "
                + self.packet_handler.getTxRxResult(comm)
            )

        if not self._is_comm_success(comm) and raise_on_error:
            raise ConnectionError(f"{err_msg} {self.packet_handler.getTxRxResult(comm)}")
        elif self._is_error(error) and raise_on_error:
            raise RuntimeError(f"{err_msg} {self.packet_handler.getRxPacketError(error)}")

        return comm, error

    @check_if_not_connected
    def sync_read(
        self,
        data_name: str,
        motors: NameOrID | Sequence[NameOrID] | None = None,
        *,
        normalize: bool = True,
        num_retry: int = 0,
    ) -> dict[str, Value]:
        """Read the same register from several motors at once.

        Args:
            data_name (str): Register name.
            motors (NameOrID | Sequence[NameOrID] | None, optional): Motors to query. `None` (default) reads every motor.
            normalize (bool, optional): Normalisation flag.  Defaults to `True`.
            num_retry (int, optional): Retry attempts.  Defaults to `0`.

        Returns:
            dict[str, Value]: Mapping *motor name → value*.
        """

        self._assert_protocol_is_compatible("sync_read")

        names = self._get_motors_list(motors)
        ids = [self.motors[motor].id for motor in names]
        models = [self.motors[motor].model for motor in names]

        if self._has_different_ctrl_tables:
            assert_same_address(self.model_ctrl_table, models, data_name)

        model = next(iter(models))
        addr, length = get_address(self.model_ctrl_table, model, data_name)

        err_msg = f"Failed to sync read '{data_name}' on {ids=} after {num_retry + 1} tries."
        raw_ids_values, _ = self._sync_read(
            addr, length, ids, num_retry=num_retry, raise_on_error=True, err_msg=err_msg
        )

        decoded = self._decode_sign(data_name, raw_ids_values)

        if normalize and data_name in self.normalized_data:
            normalized = self._normalize(decoded)
            return {self._id_to_name(id_): value for id_, value in normalized.items()}

        return {self._id_to_name(id_): value for id_, value in decoded.items()}

    def _sync_read(
        self,
        addr: int,
        length: int,
        motor_ids: list[int],
        *,
        num_retry: int = 0,
        raise_on_error: bool = True,
        err_msg: str = "",
    ) -> tuple[dict[int, int], int]:
        self._setup_sync_reader(motor_ids, addr, length)
        for n_try in range(1 + num_retry):
            comm = self.sync_reader.txRxPacket()
            if self._is_comm_success(comm):
                break
            logger.debug(
                f"Failed to sync read @{addr=} ({length=}) on {motor_ids=} ({n_try=}): "
                + self.packet_handler.getTxRxResult(comm)
            )

        if not self._is_comm_success(comm) and raise_on_error:
            raise ConnectionError(f"{err_msg} {self.packet_handler.getTxRxResult(comm)}")

        values = {id_: self.sync_reader.getData(id_, addr, length) for id_ in motor_ids}
        return values, comm

    def _setup_sync_reader(self, motor_ids: list[int], addr: int, length: int) -> None:
        self.sync_reader.clearParam()
        self.sync_reader.start_address = addr
        self.sync_reader.data_length = length
        for id_ in motor_ids:
            self.sync_reader.addParam(id_)

    # TODO(aliberts, pkooij): Implementing something like this could get even much faster read times if need be.
    # Would have to handle the logic of checking if a packet has been sent previously though but doable.
    # This could be at the cost of increase latency between the moment the data is produced by the motors and
    # the moment it is used by a policy.
    # def _async_read(self, motor_ids: list[int], address: int, length: int):
    #     if self.sync_reader.start_address != address or self.sync_reader.data_length != length or ...:
    #         self._setup_sync_reader(motor_ids, address, length)
    #     else:
    #         self.sync_reader.rxPacket()
    #         self.sync_reader.txPacket()

    #     for id_ in motor_ids:
    #         value = self.sync_reader.getData(id_, address, length)

    @check_if_not_connected
    def sync_write(
        self,
        data_name: str,
        values: Value | dict[str, Value],
        *,
        normalize: bool = True,
        num_retry: int = 0,
    ) -> None:
        """Write the same register on multiple motors.

        Contrary to :pymeth:`write`, this *does not* expects a response status packet emitted by the motor, which
        can allow for lost packets. It is faster than :pymeth:`write` and should typically be used when
        frequency matters and losing some packets is acceptable (e.g. teleoperation loops).

        Args:
            data_name (str): Register name.
            values (Value | dict[str, Value]): Either a single value (applied to every motor) or a mapping
                *motor name → value*.
            normalize (bool, optional): If `True` (default) convert values from the user range to raw units.
            num_retry (int, optional): Retry attempts.  Defaults to `0`.
        """

        raw_ids_values = self._get_ids_values_dict(values)
        models = [self._id_to_model(id_) for id_ in raw_ids_values]
        if self._has_different_ctrl_tables:
            assert_same_address(self.model_ctrl_table, models, data_name)

        model = next(iter(models))
        addr, length = get_address(self.model_ctrl_table, model, data_name)

        int_ids_values = {id_: int(val) for id_, val in raw_ids_values.items()}
        if normalize and data_name in self.normalized_data:
            int_ids_values = self._unnormalize(raw_ids_values)

        int_ids_values = self._encode_sign(data_name, int_ids_values)

        err_msg = f"Failed to sync write '{data_name}' with ids_values={int_ids_values} after {num_retry + 1} tries."
        self._sync_write(
            addr, length, int_ids_values, num_retry=num_retry, raise_on_error=True, err_msg=err_msg
        )

    def _sync_write(
        self,
        addr: int,
        length: int,
        ids_values: dict[int, int],
        num_retry: int = 0,
        raise_on_error: bool = True,
        err_msg: str = "",
    ) -> int:
        self._setup_sync_writer(ids_values, addr, length)
        for n_try in range(1 + num_retry):
            comm = self.sync_writer.txPacket()
            if self._is_comm_success(comm):
                break
            logger.debug(
                f"Failed to sync write @{addr=} ({length=}) with {ids_values=} ({n_try=}): "
                + self.packet_handler.getTxRxResult(comm)
            )

        if not self._is_comm_success(comm) and raise_on_error:
            raise ConnectionError(f"{err_msg} {self.packet_handler.getTxRxResult(comm)}")

        return comm

    def _setup_sync_writer(self, ids_values: dict[int, int], addr: int, length: int) -> None:
        self.sync_writer.clearParam()
        self.sync_writer.start_address = addr
        self.sync_writer.data_length = length
        for id_, value in ids_values.items():
            data = self._serialize_data(value, length)
            self.sync_writer.addParam(id_, data)


# Backward compatibility alias
MotorsBus = SerialMotorsBus
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/src/lerobot/motors/feetech/tables.py"
}
```

## Assistant

**Result** (success):
```
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FIRMWARE_MAJOR_VERSION = (0, 1)
FIRMWARE_MINOR_VERSION = (1, 1)
MODEL_NUMBER = (3, 2)

# TODO(Steven): Consider doing the following:
# from enum import Enum
# class MyControlTableKey(Enum):
#   ID = "ID"
#   GOAL_SPEED = "Goal_Speed"
#   ...
#
# MY_CONTROL_TABLE ={
#   MyControlTableKey.ID.value: (5,1)
#   MyControlTableKey.GOAL_SPEED.value: (46, 2)
#   ...
# }
# This allows me do to:
# bus.write(MyControlTableKey.GOAL_SPEED, ...)
# Instead of:
# bus.write("Goal_Speed", ...)
# This is important for two reasons:
# 1. The linter will tell me if I'm trying to use an invalid key, instead of me realizing when I get the RunTimeError
# 2. We can change the value of the MyControlTableKey enums without impacting the client code

# data_name: (address, size_byte)
# http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0
STS_SMS_SERIES_CONTROL_TABLE = {
    # EPROM
    "Firmware_Major_Version": FIRMWARE_MAJOR_VERSION,  # read-only
    "Firmware_Minor_Version": FIRMWARE_MINOR_VERSION,  # read-only
    "Model_Number": MODEL_NUMBER,  # read-only
    "ID": (5, 1),
    "Baud_Rate": (6, 1),
    "Return_Delay_Time": (7, 1),
    "Response_Status_Level": (8, 1),
    "Min_Position_Limit": (9, 2),
    "Max_Position_Limit": (11, 2),
    "Max_Temperature_Limit": (13, 1),
    "Max_Voltage_Limit": (14, 1),
    "Min_Voltage_Limit": (15, 1),
    "Max_Torque_Limit": (16, 2),
    "Phase": (18, 1),
    "Unloading_Condition": (19, 1),
    "LED_Alarm_Condition": (20, 1),
    "P_Coefficient": (21, 1),
    "D_Coefficient": (22, 1),
    "I_Coefficient": (23, 1),
    "Minimum_Startup_Force": (24, 2),
    "CW_Dead_Zone": (26, 1),
    "CCW_Dead_Zone": (27, 1),
    "Protection_Current": (28, 2),
    "Angular_Resolution": (30, 1),
    "Homing_Offset": (31, 2),
    "Operating_Mode": (33, 1),
    "Protective_Torque": (34, 1),
    "Protection_Time": (35, 1),
    "Overload_Torque": (36, 1),
    "Velocity_closed_loop_P_proportional_coefficient": (37, 1),
    "Over_Current_Protection_Time": (38, 1),
    "Velocity_closed_loop_I_integral_coefficient": (39, 1),
    # SRAM
    "Torque_Enable": (40, 1),
    "Acceleration": (41, 1),
    "Goal_Position": (42, 2),
    "Goal_Time": (44, 2),
    "Goal_Velocity": (46, 2),
    "Torque_Limit": (48, 2),
    "Lock": (55, 1),
    "Present_Position": (56, 2),  # read-only
    "Present_Velocity": (58, 2),  # read-only
    "Present_Load": (60, 2),  # read-only
    "Present_Voltage": (62, 1),  # read-only
    "Present_Temperature": (63, 1),  # read-only
    "Status": (65, 1),  # read-only
    "Moving": (66, 1),  # read-only
    "Present_Current": (69, 2),  # read-only
    "Goal_Position_2": (71, 2),  # read-only
    # Factory
    "Moving_Velocity": (80, 1),
    "Moving_Velocity_Threshold": (80, 1),
    "DTs": (81, 1),  # (ms)
    "Velocity_Unit_factor": (82, 1),
    "Hts": (83, 1),  # (ns) valid for firmware >= 2.54, other versions keep 0
    "Maximum_Velocity_Limit": (84, 1),
    "Maximum_Acceleration": (85, 1),
    "Acceleration_Multiplier ": (86, 1),  # Acceleration multiplier in effect when acceleration is 0
}

# http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SCSCL-emanual-cbcc8ab2e3384282a01d4bf3
SCS_SERIES_CONTROL_TABLE = {
    # EPROM
    "Firmware_Major_Version": FIRMWARE_MAJOR_VERSION,  # read-only
    "Firmware_Minor_Version": FIRMWARE_MINOR_VERSION,  # read-only
    "Model_Number": MODEL_NUMBER,  # read-only
    "ID": (5, 1),
    "Baud_Rate": (6, 1),
    "Return_Delay_Time": (7, 1),
    "Response_Status_Level": (8, 1),
    "Min_Position_Limit": (9, 2),
    "Max_Position_Limit": (11, 2),
    "Max_Temperature_Limit": (13, 1),
    "Max_Voltage_Limit": (14, 1),
    "Min_Voltage_Limit": (15, 1),
    "Max_Torque_Limit": (16, 2),
    "Phase": (18, 1),
    "Unloading_Condition": (19, 1),
    "LED_Alarm_Condition": (20, 1),
    "P_Coefficient": (21, 1),
    "D_Coefficient": (22, 1),
    "I_Coefficient": (23, 1),
    "Minimum_Startup_Force": (24, 2),
    "CW_Dead_Zone": (26, 1),
    "CCW_Dead_Zone": (27, 1),
    "Protective_Torque": (37, 1),
    "Protection_Time": (38, 1),
    # SRAM
    "Torque_Enable": (40, 1),
    "Acceleration": (41, 1),
    "Goal_Position": (42, 2),
    "Running_Time": (44, 2),
    "Goal_Velocity": (46, 2),
    "Lock": (48, 1),
    "Present_Position": (56, 2),  # read-only
    "Present_Velocity": (58, 2),  # read-only
    "Present_Load": (60, 2),  # read-only
    "Present_Voltage": (62, 1),  # read-only
    "Present_Temperature": (63, 1),  # read-only
    "Sync_Write_Flag": (64, 1),  # read-only
    "Status": (65, 1),  # read-only
    "Moving": (66, 1),  # read-only
    # Factory
    "PWM_Maximum_Step": (78, 1),
    "Moving_Velocity_Threshold*50": (79, 1),
    "DTs": (80, 1),  # (ms)
    "Minimum_Velocity_Limit*50": (81, 1),
    "Maximum_Velocity_Limit*50": (82, 1),
    "Acceleration_2": (83, 1),  # don't know what that is
}

STS_SMS_SERIES_BAUDRATE_TABLE = {
    1_000_000: 0,
    500_000: 1,
    250_000: 2,
    128_000: 3,
    115_200: 4,
    57_600: 5,
    38_400: 6,
    19_200: 7,
}

SCS_SERIES_BAUDRATE_TABLE = {
    1_000_000: 0,
    500_000: 1,
    250_000: 2,
    128_000: 3,
    115_200: 4,
    57_600: 5,
    38_400: 6,
    19_200: 7,
}

MODEL_CONTROL_TABLE = {
    "sts_series": STS_SMS_SERIES_CONTROL_TABLE,
    "scs_series": SCS_SERIES_CONTROL_TABLE,
    "sms_series": STS_SMS_SERIES_CONTROL_TABLE,
    "sts3215": STS_SMS_SERIES_CONTROL_TABLE,
    "sts3250": STS_SMS_SERIES_CONTROL_TABLE,
    "scs0009": SCS_SERIES_CONTROL_TABLE,
    "sm8512bl": STS_SMS_SERIES_CONTROL_TABLE,
}

MODEL_RESOLUTION = {
    "sts_series": 4096,
    "sms_series": 4096,
    "scs_series": 1024,
    "sts3215": 4096,
    "sts3250": 4096,
    "sm8512bl": 4096,
    "scs0009": 1024,
}

MODEL_BAUDRATE_TABLE = {
    "sts_series": STS_SMS_SERIES_BAUDRATE_TABLE,
    "sms_series": STS_SMS_SERIES_BAUDRATE_TABLE,
    "scs_series": SCS_SERIES_BAUDRATE_TABLE,
    "sm8512bl": STS_SMS_SERIES_BAUDRATE_TABLE,
    "sts3215": STS_SMS_SERIES_BAUDRATE_TABLE,
    "sts3250": STS_SMS_SERIES_BAUDRATE_TABLE,
    "scs0009": SCS_SERIES_BAUDRATE_TABLE,
}

# Sign-Magnitude encoding bits
STS_SMS_SERIES_ENCODINGS_TABLE = {
    "Present_Load": 10,
    "Homing_Offset": 11,
    "Goal_Position": 15,
    "Goal_Velocity": 15,
    "Goal_Speed": 15,
    "Present_Position": 15,
    "Present_Velocity": 15,
    "Present_Speed": 15,
}

MODEL_ENCODING_TABLE = {
    "sts_series": STS_SMS_SERIES_ENCODINGS_TABLE,
    "sms_series": STS_SMS_SERIES_ENCODINGS_TABLE,
    "scs_series": {},
    "sts3215": STS_SMS_SERIES_ENCODINGS_TABLE,
    "sts3250": STS_SMS_SERIES_ENCODINGS_TABLE,
    "sm8512bl": STS_SMS_SERIES_ENCODINGS_TABLE,
    "scs0009": {},
}

SCAN_BAUDRATES = [
    4_800,
    9_600,
    14_400,
    19_200,
    38_400,
    57_600,
    115_200,
    128_000,
    250_000,
    500_000,
    1_000_000,
]

MODEL_NUMBER_TABLE = {
    "sts3215": 777,
    "sts3250": 2825,
    "sm8512bl": 11272,
    "scs0009": 1284,
}

MODEL_PROTOCOL = {
    "sts_series": 0,
    "sms_series": 0,
    "scs_series": 1,
    "sts3215": 0,
    "sts3250": 0,
    "sm8512bl": 0,
    "scs0009": 1,
}
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/tests/motors/test_feetech.py"
}
```

## Assistant

**Result** (success):
```
#!/usr/bin/env python

# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import sys
from collections.abc import Generator
from unittest.mock import MagicMock, patch

import pytest

from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.encoding_utils import encode_sign_magnitude
from lerobot.motors.feetech import MODEL_NUMBER, MODEL_NUMBER_TABLE, FeetechMotorsBus
from lerobot.motors.feetech.tables import STS_SMS_SERIES_CONTROL_TABLE

try:
    import scservo_sdk as scs

    from tests.mocks.mock_feetech import MockMotors, MockPortHandler
except (ImportError, ModuleNotFoundError):
    pytest.skip("scservo_sdk not available", allow_module_level=True)


@pytest.fixture(autouse=True)
def patch_port_handler():
    if sys.platform == "darwin":
        with patch.object(scs, "PortHandler", MockPortHandler):
            yield
    else:
        yield


@pytest.fixture
def mock_motors() -> Generator[MockMotors, None, None]:
    motors = MockMotors()
    motors.open()
    yield motors
    motors.close()


@pytest.fixture
def dummy_motors() -> dict[str, Motor]:
    return {
        "dummy_1": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
        "dummy_2": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
        "dummy_3": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
    }


@pytest.fixture
def dummy_calibration(dummy_motors) -> dict[str, MotorCalibration]:
    homings = [-709, -2006, 1624]
    mins = [43, 27, 145]
    maxes = [1335, 3608, 3999]
    calibration = {}
    for motor, m in dummy_motors.items():
        calibration[motor] = MotorCalibration(
            id=m.id,
            drive_mode=0,
            homing_offset=homings[m.id - 1],
            range_min=mins[m.id - 1],
            range_max=maxes[m.id - 1],
        )
    return calibration


@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
def test_autouse_patch():
    """Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
    assert scs.PortHandler is MockPortHandler


@pytest.mark.parametrize(
    "protocol, value, length, expected",
    [
        (0, 0x12,       1, [0x12]),
        (1, 0x12,       1, [0x12]),
        (0, 0x1234,     2, [0x34, 0x12]),
        (1, 0x1234,     2, [0x12, 0x34]),
        (0, 0x12345678, 4, [0x78, 0x56, 0x34, 0x12]),
        (1, 0x12345678, 4, [0x56, 0x78, 0x12, 0x34]),
    ],
    ids=[
        "P0: 1 byte",
        "P1: 1 byte",
        "P0: 2 bytes",
        "P1: 2 bytes",
        "P0: 4 bytes",
        "P1: 4 bytes",
    ],
)  # fmt: skip
def test__split_into_byte_chunks(protocol, value, length, expected):
    bus = FeetechMotorsBus("", {}, protocol_version=protocol)
    assert bus._split_into_byte_chunks(value, length) == expected


def test_abc_implementation(dummy_motors):
    """Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
    FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors)


@pytest.mark.parametrize("id_", [1, 2, 3])
def test_ping(id_, mock_motors, dummy_motors):
    expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model]
    addr, length = MODEL_NUMBER
    ping_stub = mock_motors.build_ping_stub(id_)
    mobel_nb_stub = mock_motors.build_read_stub(addr, length, id_, expected_model_nb)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    ping_model_nb = bus.ping(id_)

    assert ping_model_nb == expected_model_nb
    assert mock_motors.stubs[ping_stub].called
    assert mock_motors.stubs[mobel_nb_stub].called


def test_broadcast_ping(mock_motors, dummy_motors):
    models = {m.id: m.model for m in dummy_motors.values()}
    addr, length = MODEL_NUMBER
    ping_stub = mock_motors.build_broadcast_ping_stub(list(models))
    mobel_nb_stubs = []
    expected_model_nbs = {}
    for id_, model in models.items():
        model_nb = MODEL_NUMBER_TABLE[model]
        stub = mock_motors.build_read_stub(addr, length, id_, model_nb)
        expected_model_nbs[id_] = model_nb
        mobel_nb_stubs.append(stub)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    ping_model_nbs = bus.broadcast_ping()

    assert ping_model_nbs == expected_model_nbs
    assert mock_motors.stubs[ping_stub].called
    assert all(mock_motors.stubs[stub].called for stub in mobel_nb_stubs)


@pytest.mark.parametrize(
    "addr, length, id_, value",
    [
        (0, 1, 1, 2),
        (10, 2, 2, 999),
        (42, 4, 3, 1337),
    ],
)
def test__read(addr, length, id_, value, mock_motors, dummy_motors):
    stub = mock_motors.build_read_stub(addr, length, id_, value)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    read_value, _, _ = bus._read(addr, length, id_)

    assert mock_motors.stubs[stub].called
    assert read_value == value


@pytest.mark.parametrize("raise_on_error", (True, False))
def test__read_error(raise_on_error, mock_motors, dummy_motors):
    addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE)
    stub = mock_motors.build_read_stub(addr, length, id_, value, error=error)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    if raise_on_error:
        with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
            bus._read(addr, length, id_, raise_on_error=raise_on_error)
    else:
        _, _, read_error = bus._read(addr, length, id_, raise_on_error=raise_on_error)
        assert read_error == error

    assert mock_motors.stubs[stub].called


@pytest.mark.parametrize("raise_on_error", (True, False))
def test__read_comm(raise_on_error, mock_motors, dummy_motors):
    addr, length, id_, value = (10, 4, 1, 1337)
    stub = mock_motors.build_read_stub(addr, length, id_, value, reply=False)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    if raise_on_error:
        with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
            bus._read(addr, length, id_, raise_on_error=raise_on_error)
    else:
        _, read_comm, _ = bus._read(addr, length, id_, raise_on_error=raise_on_error)
        assert read_comm == scs.COMM_RX_TIMEOUT

    assert mock_motors.stubs[stub].called


@pytest.mark.parametrize(
    "addr, length, id_, value",
    [
        (0, 1, 1, 2),
        (10, 2, 2, 999),
        (42, 4, 3, 1337),
    ],
)
def test__write(addr, length, id_, value, mock_motors, dummy_motors):
    stub = mock_motors.build_write_stub(addr, length, id_, value)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    comm, error = bus._write(addr, length, id_, value)

    assert mock_motors.stubs[stub].wait_called()
    assert comm == scs.COMM_SUCCESS
    assert error == 0


@pytest.mark.parametrize("raise_on_error", (True, False))
def test__write_error(raise_on_error, mock_motors, dummy_motors):
    addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE)
    stub = mock_motors.build_write_stub(addr, length, id_, value, error=error)
    bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
    bus.connect(handshake=False)

    if raise_on_error:
        with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
            bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
    else:
        _, write_error = bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
        assert write_error == error

    assert mock_motors.stubs[stub].called


@pytest.mark.parametrize("raise_on_error", (True, False))
def test__write_comm(raise_on_error, mock_motors, dummy_motors):
    addr, length, id_, value = (10, 4, 1, 1337)
    stub = mock_motors.build_write_stub(addr, length, id_, value, reply=False)
    bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
    bus.connect(handshake=False)

    if raise_on_error:
        with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
            bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
    else:
        write_comm, _ = bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
        assert write_comm == scs.COMM_RX_TIMEOUT

    assert mock_motors.stubs[stub].called


@pytest.mark.parametrize(
    "addr, length, ids_values",
    [
        (0, 1, {1: 4}),
        (10, 2, {1: 1337, 2: 42}),
        (42, 4, {1: 1337, 2: 42, 3: 4016}),
    ],
    ids=["1 motor", "2 motors", "3 motors"],
)
def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors):
    stub = mock_motors.build_sync_read_stub(addr, length, ids_values)
    bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
    bus.connect(handshake=False)

    read_values, _ = bus._sync_read(addr, length, list(ids_values))

    assert mock_motors.stubs[stub].called
    assert read_values == ids_values


@pytest.mark.parametrize("raise_on_error", (True, False))
def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors):
    addr, length, ids_values = (10, 4, {1: 1337})
    stub = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False)
    bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
    bus.connect(handshake=False)

    if raise_on_error:
        with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
            bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error)
    else:
        _, read_comm = bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error)
        assert read_comm == scs.COMM_RX_TIMEOUT

    assert mock_motors.stubs[stub].called


@pytest.mark.parametrize(
    "addr, length, ids_values",
    [
        (0, 1, {1: 4}),
        (10, 2, {1: 1337, 2: 42}),
        (42, 4, {1: 1337, 2: 42, 3: 4016}),
    ],
    ids=["1 motor", "2 motors", "3 motors"],
)
def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors):
    stub = mock_motors.build_sync_write_stub(addr, length, ids_values)
    bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
    bus.connect(handshake=False)

    comm = bus._sync_write(addr, length, ids_values)

    assert mock_motors.stubs[stub].wait_called()
    assert comm == scs.COMM_SUCCESS


def test_is_calibrated(mock_motors, dummy_motors, dummy_calibration):
    mins_stubs, maxes_stubs, homings_stubs = [], [], []
    for cal in dummy_calibration.values():
        mins_stubs.append(
            mock_motors.build_read_stub(
                *STS_SMS_SERIES_CONTROL_TABLE["Min_Position_Limit"], cal.id, cal.range_min
            )
        )
        maxes_stubs.append(
            mock_motors.build_read_stub(
                *STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], cal.id, cal.range_max
            )
        )
        homings_stubs.append(
            mock_motors.build_read_stub(
                *STS_SMS_SERIES_CONTROL_TABLE["Homing_Offset"],
                cal.id,
                encode_sign_magnitude(cal.homing_offset, 11),
            )
        )

    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
        calibration=dummy_calibration,
    )
    bus.connect(handshake=False)

    is_calibrated = bus.is_calibrated

    assert is_calibrated
    assert all(mock_motors.stubs[stub].called for stub in mins_stubs)
    assert all(mock_motors.stubs[stub].called for stub in maxes_stubs)
    assert all(mock_motors.stubs[stub].called for stub in homings_stubs)


def test_reset_calibration(mock_motors, dummy_motors):
    write_homing_stubs = []
    write_mins_stubs = []
    write_maxes_stubs = []
    for motor in dummy_motors.values():
        write_homing_stubs.append(
            mock_motors.build_write_stub(*STS_SMS_SERIES_CONTROL_TABLE["Homing_Offset"], motor.id, 0)
        )
        write_mins_stubs.append(
            mock_motors.build_write_stub(*STS_SMS_SERIES_CONTROL_TABLE["Min_Position_Limit"], motor.id, 0)
        )
        write_maxes_stubs.append(
            mock_motors.build_write_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095)
        )

    bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
    bus.connect(handshake=False)

    bus.reset_calibration()

    assert all(mock_motors.stubs[stub].wait_called() for stub in write_homing_stubs)
    assert all(mock_motors.stubs[stub].wait_called() for stub in write_mins_stubs)
    assert all(mock_motors.stubs[stub].wait_called() for stub in write_maxes_stubs)


def test_set_half_turn_homings(mock_motors, dummy_motors):
    """
    For this test, we assume that the homing offsets are already 0 such that
    Present_Position == Actual_Position
    """
    current_positions = {
        1: 1337,
        2: 42,
        3: 3672,
    }
    expected_homings = {
        1: -710,  # 1337 - 2047
        2: -2005,  # 42 - 2047
        3: 1625,  # 3672 - 2047
    }
    read_pos_stub = mock_motors.build_sync_read_stub(
        *STS_SMS_SERIES_CONTROL_TABLE["Present_Position"], current_positions
    )
    write_homing_stubs = []
    for id_, homing in expected_homings.items():
        encoded_homing = encode_sign_magnitude(homing, 11)
        stub = mock_motors.build_write_stub(
            *STS_SMS_SERIES_CONTROL_TABLE["Homing_Offset"], id_, encoded_homing
        )
        write_homing_stubs.append(stub)

    bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
    bus.connect(handshake=False)
    bus.reset_calibration = MagicMock()

    bus.set_half_turn_homings()

    bus.reset_calibration.assert_called_once()
    assert mock_motors.stubs[read_pos_stub].called
    assert all(mock_motors.stubs[stub].wait_called() for stub in write_homing_stubs)


def test_record_ranges_of_motion(mock_motors, dummy_motors):
    positions = {
        1: [351, 42, 1337],
        2: [28, 3600, 2444],
        3: [4002, 2999, 146],
    }
    expected_mins = {
        "dummy_1": 42,
        "dummy_2": 28,
        "dummy_3": 146,
    }
    expected_maxes = {
        "dummy_1": 1337,
        "dummy_2": 3600,
        "dummy_3": 4002,
    }
    stub = mock_motors.build_sequential_sync_read_stub(
        *STS_SMS_SERIES_CONTROL_TABLE["Present_Position"], positions
    )
    with patch("lerobot.motors.motors_bus.enter_pressed", side_effect=[False, True]):
        bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
        bus.connect(handshake=False)

        mins, maxes = bus.record_ranges_of_motion(display_values=False)

    assert mock_motors.stubs[stub].calls == 3
    assert mins == expected_mins
    assert maxes == expected_maxes
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "rg -n \"Max_Voltage_Limit|Min_Voltage_Limit|Present_Voltage|Input voltage error|ERRBIT_VOLTAGE|Unloading_Condition|LED_Alarm_Condition\" lerobot/src lerobot/tests"
}
```

## Assistant

**Result** (success):
```
lerobot/tests/motors/test_feetech.py:182:    addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE)
lerobot/tests/motors/test_feetech.py:191:        with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
lerobot/tests/motors/test_feetech.py:245:    addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE)
lerobot/tests/motors/test_feetech.py:251:        with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
lerobot/src/lerobot/motors/feetech/tables.py:53:    "Max_Voltage_Limit": (14, 1),
lerobot/src/lerobot/motors/feetech/tables.py:54:    "Min_Voltage_Limit": (15, 1),
lerobot/src/lerobot/motors/feetech/tables.py:57:    "Unloading_Condition": (19, 1),
lerobot/src/lerobot/motors/feetech/tables.py:58:    "LED_Alarm_Condition": (20, 1),
lerobot/src/lerobot/motors/feetech/tables.py:86:    "Present_Voltage": (62, 1),  # read-only
lerobot/src/lerobot/motors/feetech/tables.py:116:    "Max_Voltage_Limit": (14, 1),
lerobot/src/lerobot/motors/feetech/tables.py:117:    "Min_Voltage_Limit": (15, 1),
lerobot/src/lerobot/motors/feetech/tables.py:120:    "Unloading_Condition": (19, 1),
lerobot/src/lerobot/motors/feetech/tables.py:121:    "LED_Alarm_Condition": (20, 1),
lerobot/src/lerobot/motors/feetech/tables.py:140:    "Present_Voltage": (62, 1),  # read-only
lerobot/src/lerobot/motors/dynamixel/tables.py:52:    "Max_Voltage_Limit": (32, 2),
lerobot/src/lerobot/motors/dynamixel/tables.py:53:    "Min_Voltage_Limit": (34, 2),
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/tests/mocks/mock_feetech.py"
}
```

## Assistant

**Result** (success):
```
#!/usr/bin/env python

# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
from collections.abc import Callable

import scservo_sdk as scs
import serial
from mock_serial import MockSerial

from lerobot.motors.feetech.feetech import _split_into_byte_chunks, patch_setPacketTimeout

from .mock_serial_patch import WaitableStub


class MockFeetechPacket(abc.ABC):
    @classmethod
    def build(cls, scs_id: int, params: list[int], length: int, *args, **kwargs) -> bytes:
        packet = cls._build(scs_id, params, length, *args, **kwargs)
        packet = cls._add_checksum(packet)
        return bytes(packet)

    @abc.abstractclassmethod
    def _build(cls, scs_id: int, params: list[int], length: int, *args, **kwargs) -> list[int]:
        pass

    @staticmethod
    def _add_checksum(packet: list[int]) -> list[int]:
        checksum = 0
        for id_ in range(2, len(packet) - 1):  # except header & checksum
            checksum += packet[id_]

        packet[-1] = ~checksum & 0xFF

        return packet


class MockInstructionPacket(MockFeetechPacket):
    """
    Helper class to build valid Feetech Instruction Packets.

    Instruction Packet structure
    (from https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf)

    | Header    | Packet ID | Length | Instruction | Params            | Checksum |
    | --------- | --------- | ------ | ----------- | ----------------- | -------- |
    | 0xFF 0xFF | ID        | Len    | Instr       | Param 1 … Param N | Sum      |

    """

    @classmethod
    def _build(cls, scs_id: int, params: list[int], length: int, instruction: int) -> list[int]:
        return [
            0xFF, 0xFF,   # header
            scs_id,       # servo id
            length,       # length
            instruction,  # instruction type
            *params,      # data bytes
            0x00,         # placeholder for checksum
        ]  # fmt: skip

    @classmethod
    def ping(
        cls,
        scs_id: int,
    ) -> bytes:
        """
        Builds a "Ping" broadcast instruction.

        No parameters required.
        """
        return cls.build(scs_id=scs_id, params=[], length=2, instruction=scs.INST_PING)

    @classmethod
    def read(
        cls,
        scs_id: int,
        start_address: int,
        data_length: int,
    ) -> bytes:
        """
        Builds a "Read" instruction.

        The parameters for Read are:
            param[0]   = start_address
            param[1]   = data_length

        And 'length' = 4, where:
            +1 is for instruction byte,
            +1 is for the address byte,
            +1 is for the length bytes,
            +1 is for the checksum at the end.
        """
        params = [start_address, data_length]
        length = 4
        return cls.build(scs_id=scs_id, params=params, length=length, instruction=scs.INST_READ)

    @classmethod
    def write(
        cls,
        scs_id: int,
        value: int,
        start_address: int,
        data_length: int,
    ) -> bytes:
        """
        Builds a "Write" instruction.

        The parameters for Write are:
            param[0]   = start_address L
            param[1]   = start_address H
            param[2]   = 1st Byte
            param[3]   = 2nd Byte
            ...
            param[1+X] = X-th Byte

        And 'length' = data_length + 3, where:
            +1 is for instruction byte,
            +1 is for the length bytes,
            +1 is for the checksum at the end.
        """
        data = _split_into_byte_chunks(value, data_length)
        params = [start_address, *data]
        length = data_length + 3
        return cls.build(scs_id=scs_id, params=params, length=length, instruction=scs.INST_WRITE)

    @classmethod
    def sync_read(
        cls,
        scs_ids: list[int],
        start_address: int,
        data_length: int,
    ) -> bytes:
        """
        Builds a "Sync_Read" broadcast instruction.

        The parameters for Sync Read are:
            param[0]   = start_address
            param[1]   = data_length
            param[2+]  = motor IDs to read from

        And 'length' = (number_of_params + 4), where:
            +1 is for instruction byte,
            +1 is for the address byte,
            +1 is for the length bytes,
            +1 is for the checksum at the end.
        """
        params = [start_address, data_length, *scs_ids]
        length = len(scs_ids) + 4
        return cls.build(
            scs_id=scs.BROADCAST_ID, params=params, length=length, instruction=scs.INST_SYNC_READ
        )

    @classmethod
    def sync_write(
        cls,
        ids_values: dict[int, int],
        start_address: int,
        data_length: int,
    ) -> bytes:
        """
        Builds a "Sync_Write" broadcast instruction.

        The parameters for Sync_Write are:
            param[0]   = start_address
            param[1]   = data_length
            param[2]   = [1st motor] ID
            param[2+1] = [1st motor] 1st Byte
            param[2+2] = [1st motor] 2nd Byte
            ...
            param[5+X] = [1st motor] X-th Byte
            param[6]   = [2nd motor] ID
            param[6+1] = [2nd motor] 1st Byte
            param[6+2] = [2nd motor] 2nd Byte
            ...
            param[6+X] = [2nd motor] X-th Byte

        And 'length' = ((number_of_params * 1 + data_length) + 4), where:
            +1 is for instruction byte,
            +1 is for the address byte,
            +1 is for the length bytes,
            +1 is for the checksum at the end.
        """
        data = []
        for id_, value in ids_values.items():
            split_value = _split_into_byte_chunks(value, data_length)
            data += [id_, *split_value]
        params = [start_address, data_length, *data]
        length = len(ids_values) * (1 + data_length) + 4
        return cls.build(
            scs_id=scs.BROADCAST_ID, params=params, length=length, instruction=scs.INST_SYNC_WRITE
        )


class MockStatusPacket(MockFeetechPacket):
    """
    Helper class to build valid Feetech Status Packets.

    Status Packet structure
    (from https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf)

    | Header    | Packet ID | Length | Error | Params            | Checksum |
    | --------- | --------- | ------ | ----- | ----------------- | -------- |
    | 0xFF 0xFF | ID        | Len    | Err   | Param 1 … Param N | Sum      |

    """

    @classmethod
    def _build(cls, scs_id: int, params: list[int], length: int, error: int = 0) -> list[int]:
        return [
            0xFF, 0xFF,  # header
            scs_id,      # servo id
            length,      # length
            error,       # status
            *params,     # data bytes
            0x00,        # placeholder for checksum
        ]  # fmt: skip

    @classmethod
    def ping(cls, scs_id: int, error: int = 0) -> bytes:
        """Builds a 'Ping' status packet.

        Args:
            scs_id (int): ID of the servo responding.
            error (int, optional): Error to be returned. Defaults to 0 (success).

        Returns:
            bytes: The raw 'Ping' status packet ready to be sent through serial.
        """
        return cls.build(scs_id, params=[], length=2, error=error)

    @classmethod
    def read(cls, scs_id: int, value: int, param_length: int, error: int = 0) -> bytes:
        """Builds a 'Read' status packet.

        Args:
            scs_id (int): ID of the servo responding.
            value (int): Desired value to be returned in the packet.
            param_length (int): The address length as reported in the control table.

        Returns:
            bytes: The raw 'Sync Read' status packet ready to be sent through serial.
        """
        params = _split_into_byte_chunks(value, param_length)
        length = param_length + 2
        return cls.build(scs_id, params=params, length=length, error=error)


class MockPortHandler(scs.PortHandler):
    """
    This class overwrite the 'setupPort' method of the Feetech PortHandler because it can specify
    baudrates that are not supported with a serial port on MacOS.
    """

    def setupPort(self, cflag_baud):  # noqa: N802
        if self.is_open:
            self.closePort()

        self.ser = serial.Serial(
            port=self.port_name,
            # baudrate=self.baudrate,  <- This will fail on MacOS
            # parity = serial.PARITY_ODD,
            # stopbits = serial.STOPBITS_TWO,
            bytesize=serial.EIGHTBITS,
            timeout=0,
        )
        self.is_open = True
        self.ser.reset_input_buffer()
        self.tx_time_per_byte = (1000.0 / self.baudrate) * 10.0

        return True

    def setPacketTimeout(self, packet_length):  # noqa: N802
        return patch_setPacketTimeout(self, packet_length)


class MockMotors(MockSerial):
    """
    This class will simulate physical motors by responding with valid status packets upon receiving some
    instruction packets. It is meant to test MotorsBus classes.
    """

    def __init__(self):
        super().__init__()

    @property
    def stubs(self) -> dict[str, WaitableStub]:
        return super().stubs

    def stub(self, *, name=None, **kwargs):
        new_stub = WaitableStub(**kwargs)
        self._MockSerial__stubs[name or new_stub.receive_bytes] = new_stub
        return new_stub

    def build_broadcast_ping_stub(self, ids: list[int] | None = None, num_invalid_try: int = 0) -> str:
        ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
        return_packets = b"".join(MockStatusPacket.ping(id_) for id_ in ids)
        ping_response = self._build_send_fn(return_packets, num_invalid_try)
        stub_name = "Ping_" + "_".join([str(id_) for id_ in ids])
        self.stub(
            name=stub_name,
            receive_bytes=ping_request,
            send_fn=ping_response,
        )
        return stub_name

    def build_ping_stub(self, scs_id: int, num_invalid_try: int = 0, error: int = 0) -> str:
        ping_request = MockInstructionPacket.ping(scs_id)
        return_packet = MockStatusPacket.ping(scs_id, error)
        ping_response = self._build_send_fn(return_packet, num_invalid_try)
        stub_name = f"Ping_{scs_id}_{error}"
        self.stub(
            name=stub_name,
            receive_bytes=ping_request,
            send_fn=ping_response,
        )
        return stub_name

    def build_read_stub(
        self,
        address: int,
        length: int,
        scs_id: int,
        value: int,
        reply: bool = True,
        error: int = 0,
        num_invalid_try: int = 0,
    ) -> str:
        read_request = MockInstructionPacket.read(scs_id, address, length)
        return_packet = MockStatusPacket.read(scs_id, value, length, error) if reply else b""
        read_response = self._build_send_fn(return_packet, num_invalid_try)
        stub_name = f"Read_{address}_{length}_{scs_id}_{value}_{error}"
        self.stub(
            name=stub_name,
            receive_bytes=read_request,
            send_fn=read_response,
        )
        return stub_name

    def build_write_stub(
        self,
        address: int,
        length: int,
        scs_id: int,
        value: int,
        reply: bool = True,
        error: int = 0,
        num_invalid_try: int = 0,
    ) -> str:
        sync_read_request = MockInstructionPacket.write(scs_id, value, address, length)
        return_packet = MockStatusPacket.build(scs_id, params=[], length=2, error=error) if reply else b""
        stub_name = f"Write_{address}_{length}_{scs_id}"
        self.stub(
            name=stub_name,
            receive_bytes=sync_read_request,
            send_fn=self._build_send_fn(return_packet, num_invalid_try),
        )
        return stub_name

    def build_sync_read_stub(
        self,
        address: int,
        length: int,
        ids_values: dict[int, int],
        reply: bool = True,
        num_invalid_try: int = 0,
    ) -> str:
        sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
        return_packets = (
            b"".join(MockStatusPacket.read(id_, pos, length) for id_, pos in ids_values.items())
            if reply
            else b""
        )
        sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
        stub_name = f"Sync_Read_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values])
        self.stub(
            name=stub_name,
            receive_bytes=sync_read_request,
            send_fn=sync_read_response,
        )
        return stub_name

    def build_sequential_sync_read_stub(
        self, address: int, length: int, ids_values: dict[int, list[int]] | None = None
    ) -> str:
        sequence_length = len(next(iter(ids_values.values())))
        assert all(len(positions) == sequence_length for positions in ids_values.values())
        sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
        sequential_packets = []
        for count in range(sequence_length):
            return_packets = b"".join(
                MockStatusPacket.read(id_, positions[count], length) for id_, positions in ids_values.items()
            )
            sequential_packets.append(return_packets)

        sync_read_response = self._build_sequential_send_fn(sequential_packets)
        stub_name = f"Seq_Sync_Read_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values])
        self.stub(
            name=stub_name,
            receive_bytes=sync_read_request,
            send_fn=sync_read_response,
        )
        return stub_name

    def build_sync_write_stub(
        self, address: int, length: int, ids_values: dict[int, int], num_invalid_try: int = 0
    ) -> str:
        sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
        stub_name = f"Sync_Write_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values])
        self.stub(
            name=stub_name,
            receive_bytes=sync_read_request,
            send_fn=self._build_send_fn(b"", num_invalid_try),
        )
        return stub_name

    @staticmethod
    def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
        def send_fn(_call_count: int) -> bytes:
            if num_invalid_try >= _call_count:
                return b""
            return packet

        return send_fn

    @staticmethod
    def _build_sequential_send_fn(packets: list[bytes]) -> Callable[[int], bytes]:
        def send_fn(_call_count: int) -> bytes:
            return packets[_call_count - 1]

        return send_fn
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "path": "lerobot/src/lerobot/robots/so_follower/config_so_follower.py"
}
```

## Assistant

**Result** (success):
```
#!/usr/bin/env python

# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass, field

from lerobot.cameras import CameraConfig

from ..config import RobotConfig


@dataclass
class SOFollowerConfig:
    """Base configuration class for SO Follower robots."""

    # Port to connect to the arm
    port: str

    disable_torque_on_disconnect: bool = True

    # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
    # Set this to a positive scalar to have the same value for all motors, or a dictionary that maps motor
    # names to the max_relative_target value for that motor.
    max_relative_target: float | dict[str, float] | None = None

    # cameras
    cameras: dict[str, CameraConfig] = field(default_factory=dict)

    # Set to `True` for backward compatibility with previous policies/dataset
    use_degrees: bool = True


@RobotConfig.register_subclass("so101_follower")
@RobotConfig.register_subclass("so100_follower")
@dataclass
class SOFollowerRobotConfig(RobotConfig, SOFollowerConfig):
    pass


SO100FollowerConfig = SOFollowerRobotConfig
SO101FollowerConfig = SOFollowerRobotConfig
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "python - <<'PY'\nimport scservo_sdk, inspect, os\nprint(scservo_sdk.__file__)\nprint(os.path.dirname(scservo_sdk.__file__))\nPY"
}
```

## Assistant

**Result** (success):
```
/Users/mukund/dev/robot-1/.venv/lib/python3.12/site-packages/scservo_sdk/__init__.py
/Users/mukund/dev/robot-1/.venv/lib/python3.12/site-packages/scservo_sdk
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "rg -n \"def getRxPacketError|ERRBIT_VOLTAGE|Input voltage error|ERRBIT\" .venv/lib/python3.12/site-packages/scservo_sdk"
}
```

## Assistant

**Result** (success):
```
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:18:ERRBIT_VOLTAGE = 1
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:19:ERRBIT_ANGLE = 2
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:20:ERRBIT_OVERHEAT = 4
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:21:ERRBIT_OVERELE = 8
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:22:ERRBIT_OVERLOAD = 32
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:51:    def getRxPacketError(self, error):
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:52:        if error & ERRBIT_VOLTAGE:
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:53:            return "[RxPacketError] Input voltage error!"
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:55:        if error & ERRBIT_ANGLE:
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:58:        if error & ERRBIT_OVERHEAT:
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:61:        if error & ERRBIT_OVERELE:
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:64:        if error & ERRBIT_OVERLOAD:
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "path": ".venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py"
}
```

## Assistant

**Result** (success):
```
#!/usr/bin/env python

from .scservo_def import *

TXPACKET_MAX_LEN = 250
RXPACKET_MAX_LEN = 250

# for Protocol Packet
PKT_HEADER0 = 0
PKT_HEADER1 = 1
PKT_ID = 2
PKT_LENGTH = 3
PKT_INSTRUCTION = 4
PKT_ERROR = 4
PKT_PARAMETER0 = 5

# Protocol Error bit
ERRBIT_VOLTAGE = 1
ERRBIT_ANGLE = 2
ERRBIT_OVERHEAT = 4
ERRBIT_OVERELE = 8
ERRBIT_OVERLOAD = 32


class protocol_packet_handler(object):
    def getProtocolVersion(self):
        return 1.0

    def getTxRxResult(self, result):
        if result == COMM_SUCCESS:
            return "[TxRxResult] Communication success!"
        elif result == COMM_PORT_BUSY:
            return "[TxRxResult] Port is in use!"
        elif result == COMM_TX_FAIL:
            return "[TxRxResult] Failed transmit instruction packet!"
        elif result == COMM_RX_FAIL:
            return "[TxRxResult] Failed get status packet from device!"
        elif result == COMM_TX_ERROR:
            return "[TxRxResult] Incorrect instruction packet!"
        elif result == COMM_RX_WAITING:
            return "[TxRxResult] Now receiving status packet!"
        elif result == COMM_RX_TIMEOUT:
            return "[TxRxResult] There is no status packet!"
        elif result == COMM_RX_CORRUPT:
            return "[TxRxResult] Incorrect status packet!"
        elif result == COMM_NOT_AVAILABLE:
            return "[TxRxResult] Protocol does not support this function!"
        else:
            return ""

    def getRxPacketError(self, error):
        if error & ERRBIT_VOLTAGE:
            return "[RxPacketError] Input voltage error!"

        if error & ERRBIT_ANGLE:
            return "[RxPacketError] Angle sen error!"

        if error & ERRBIT_OVERHEAT:
            return "[RxPacketError] Overheat error!"

        if error & ERRBIT_OVERELE:
            return "[RxPacketError] OverEle error!"
        
        if error & ERRBIT_OVERLOAD:
            return "[RxPacketError] Overload error!"

        return ""

    def txPacket(self, port, txpacket):
        checksum = 0
        total_packet_length = txpacket[PKT_LENGTH] + 4  # 4: HEADER0 HEADER1 ID LENGTH

        if port.is_using:
            return COMM_PORT_BUSY
        port.is_using = True

        # check max packet length
        if total_packet_length > TXPACKET_MAX_LEN:
            port.is_using = False
            return COMM_TX_ERROR

        # make packet header
        txpacket[PKT_HEADER0] = 0xFF
        txpacket[PKT_HEADER1] = 0xFF

        # add a checksum to the packet
        for idx in range(2, total_packet_length - 1):  # except header, checksum
            checksum += txpacket[idx]

        txpacket[total_packet_length - 1] = ~checksum & 0xFF

        #print "[TxPacket] %r" % txpacket

        # tx packet
        port.clearPort()
        written_packet_length = port.writePort(txpacket)
        if total_packet_length != written_packet_length:
            port.is_using = False
            return COMM_TX_FAIL

        return COMM_SUCCESS

    def rxPacket(self, port):
        rxpacket = []

        result = COMM_TX_FAIL
        checksum = 0
        rx_length = 0
        wait_length = 6  # minimum length (HEADER0 HEADER1 ID LENGTH ERROR CHKSUM)

        while True:
            rxpacket.extend(port.readPort(wait_length - rx_length))
            rx_length = len(rxpacket)
            if rx_length >= wait_length:
                # find packet header
                for idx in range(0, (rx_length - 1)):
                    if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
                        break

                if idx == 0:  # found at the beginning of the packet
                    if (rxpacket[PKT_ID] > 0xFD) or (rxpacket[PKT_LENGTH] > RXPACKET_MAX_LEN) or (
                            rxpacket[PKT_ERROR] > 0x7F):
                        # unavailable ID or unavailable Length or unavailable Error
                        # remove the first byte in the packet
                        del rxpacket[0]
                        rx_length -= 1
                        continue

                    # re-calculate the exact length of the rx packet
                    if wait_length != (rxpacket[PKT_LENGTH] + PKT_LENGTH + 1):
                        wait_length = rxpacket[PKT_LENGTH] + PKT_LENGTH + 1
                        continue

                    if rx_length < wait_length:
                        # check timeout
                        if port.isPacketTimeout():
                            if rx_length == 0:
                                result = COMM_RX_TIMEOUT
                            else:
                                result = COMM_RX_CORRUPT
                            break
                        else:
                            continue

                    # calculate checksum
                    for i in range(2, wait_length - 1):  # except header, checksum
                        checksum += rxpacket[i]
                    checksum = ~checksum & 0xFF

                    # verify checksum
                    if rxpacket[wait_length - 1] == checksum:
                        result = COMM_SUCCESS
                    else:
                        result = COMM_RX_CORRUPT
                    break

                else:
                    # remove unnecessary packets
                    del rxpacket[0: idx]
                    rx_length -= idx

            else:
                # check timeout
                if port.isPacketTimeout():
                    if rx_length == 0:
                        result = COMM_RX_TIMEOUT
                    else:
                        result = COMM_RX_CORRUPT
                    break

        port.is_using = False

        #print "[RxPacket] %r" % rxpacket

        return rxpacket, result

    def txRxPacket(self, port, txpacket):
        rxpacket = None
        error = 0

        # tx packet
        result = self.txPacket(port, txpacket)
        if result != COMM_SUCCESS:
            return rxpacket, result, error

        # (ID == Broadcast ID) == no need to wait for status packet or not available
        if (txpacket[PKT_ID] == BROADCAST_ID):
            port.is_using = False
            return rxpacket, result, error

        # set packet timeout
        if txpacket[PKT_INSTRUCTION] == INST_READ:
            port.setPacketTimeout(txpacket[PKT_PARAMETER0 + 1] + 6)
        else:
            port.setPacketTimeout(6)  # HEADER0 HEADER1 ID LENGTH ERROR CHECKSUM

        # rx packet
        while True:
            rxpacket, result = self.rxPacket(port)
            if result != COMM_SUCCESS or txpacket[PKT_ID] == rxpacket[PKT_ID]:
                break

        if result == COMM_SUCCESS and txpacket[PKT_ID] == rxpacket[PKT_ID]:
            error = rxpacket[PKT_ERROR]

        return rxpacket, result, error

    def ping(self, port, scs_id):
        model_number = 0
        error = 0

        txpacket = [0] * 6

        if scs_id >= BROADCAST_ID:
            return model_number, COMM_NOT_AVAILABLE, error

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = 2
        txpacket[PKT_INSTRUCTION] = INST_PING

        rxpacket, result, error = self.txRxPacket(port, txpacket)

        if result == COMM_SUCCESS:
            data_read, result, error = self.readTxRx(port, scs_id, 3, 2)  # Address 3 : Model Number
            if result == COMM_SUCCESS:
                model_number = SCS_MAKEWORD(data_read[0], data_read[1])

        return model_number, result, error

    def action(self, port, scs_id):
        txpacket = [0] * 6

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = 2
        txpacket[PKT_INSTRUCTION] = INST_ACTION

        _, result, _ = self.txRxPacket(port, txpacket)

        return result

    def readTx(self, port, scs_id, address, length):

        txpacket = [0] * 8

        if scs_id >= BROADCAST_ID:
            return COMM_NOT_AVAILABLE

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = 4
        txpacket[PKT_INSTRUCTION] = INST_READ
        txpacket[PKT_PARAMETER0 + 0] = address
        txpacket[PKT_PARAMETER0 + 1] = length

        result = self.txPacket(port, txpacket)

        # set packet timeout
        if result == COMM_SUCCESS:
            port.setPacketTimeout(length + 6)

        return result

    def readRx(self, port, scs_id, length):
        result = COMM_TX_FAIL
        error = 0

        rxpacket = None
        data = []

        while True:
            rxpacket, result = self.rxPacket(port)

            if result != COMM_SUCCESS or rxpacket[PKT_ID] == scs_id:
                break

        if result == COMM_SUCCESS and rxpacket[PKT_ID] == scs_id:
            error = rxpacket[PKT_ERROR]

            data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])

        return data, result, error

    def readTxRx(self, port, scs_id, address, length):
        txpacket = [0] * 8
        data = []

        if scs_id >= BROADCAST_ID:
            return data, COMM_NOT_AVAILABLE, 0

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = 4
        txpacket[PKT_INSTRUCTION] = INST_READ
        txpacket[PKT_PARAMETER0 + 0] = address
        txpacket[PKT_PARAMETER0 + 1] = length

        rxpacket, result, error = self.txRxPacket(port, txpacket)
        if result == COMM_SUCCESS:
            error = rxpacket[PKT_ERROR]

            data.extend(rxpacket[PKT_PARAMETER0: PKT_PARAMETER0 + length])

        return data, result, error

    def read1ByteTx(self, port, scs_id, address):
        return self.readTx(port, scs_id, address, 1)

    def read1ByteRx(self, port, scs_id):
        data, result, error = self.readRx(port, scs_id, 1)
        data_read = data[0] if (result == COMM_SUCCESS) else 0
        return data_read, result, error

    def read1ByteTxRx(self, port, scs_id, address):
        data, result, error = self.readTxRx(port, scs_id, address, 1)
        data_read = data[0] if (result == COMM_SUCCESS) else 0
        return data_read, result, error

    def read2ByteTx(self, port, scs_id, address):
        return self.readTx(port, scs_id, address, 2)

    def read2ByteRx(self, port, scs_id):
        data, result, error = self.readRx(port, scs_id, 2)
        data_read = SCS_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
        return data_read, result, error

    def read2ByteTxRx(self, port, scs_id, address):
        data, result, error = self.readTxRx(port, scs_id, address, 2)
        data_read = SCS_MAKEWORD(data[0], data[1]) if (result == COMM_SUCCESS) else 0
        return data_read, result, error

    def read4ByteTx(self, port, scs_id, address):
        return self.readTx(port, scs_id, address, 4)

    def read4ByteRx(self, port, scs_id):
        data, result, error = self.readRx(port, scs_id, 4)
        data_read = SCS_MAKEDWORD(SCS_MAKEWORD(data[0], data[1]),
                                  SCS_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
        return data_read, result, error

    def read4ByteTxRx(self, port, scs_id, address):
        data, result, error = self.readTxRx(port, scs_id, address, 4)
        data_read = SCS_MAKEDWORD(SCS_MAKEWORD(data[0], data[1]),
                                  SCS_MAKEWORD(data[2], data[3])) if (result == COMM_SUCCESS) else 0
        return data_read, result, error

    def writeTxOnly(self, port, scs_id, address, length, data):
        txpacket = [0] * (length + 7)

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = length + 3
        txpacket[PKT_INSTRUCTION] = INST_WRITE
        txpacket[PKT_PARAMETER0] = address

        txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]

        result = self.txPacket(port, txpacket)
        port.is_using = False

        return result

    def writeTxRx(self, port, scs_id, address, length, data):
        txpacket = [0] * (length + 7)

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = length + 3
        txpacket[PKT_INSTRUCTION] = INST_WRITE
        txpacket[PKT_PARAMETER0] = address

        txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]
        rxpacket, result, error = self.txRxPacket(port, txpacket)

        return result, error

    def write1ByteTxOnly(self, port, scs_id, address, data):
        data_write = [data]
        return self.writeTxOnly(port, scs_id, address, 1, data_write)

    def write1ByteTxRx(self, port, scs_id, address, data):
        data_write = [data]
        return self.writeTxRx(port, scs_id, address, 1, data_write)

    def write2ByteTxOnly(self, port, scs_id, address, data):
        data_write = [SCS_LOBYTE(data), SCS_HIBYTE(data)]
        return self.writeTxOnly(port, scs_id, address, 2, data_write)

    def write2ByteTxRx(self, port, scs_id, address, data):
        data_write = [SCS_LOBYTE(data), SCS_HIBYTE(data)]
        return self.writeTxRx(port, scs_id, address, 2, data_write)

    def write4ByteTxOnly(self, port, scs_id, address, data):
        data_write = [SCS_LOBYTE(SCS_LOWORD(data)),
                      SCS_HIBYTE(SCS_LOWORD(data)),
                      SCS_LOBYTE(SCS_HIWORD(data)),
                      SCS_HIBYTE(SCS_HIWORD(data))]
        return self.writeTxOnly(port, scs_id, address, 4, data_write)

    def write4ByteTxRx(self, port, scs_id, address, data):
        data_write = [SCS_LOBYTE(SCS_LOWORD(data)),
                      SCS_HIBYTE(SCS_LOWORD(data)),
                      SCS_LOBYTE(SCS_HIWORD(data)),
                      SCS_HIBYTE(SCS_HIWORD(data))]
        return self.writeTxRx(port, scs_id, address, 4, data_write)

    def regWriteTxOnly(self, port, scs_id, address, length, data):
        txpacket = [0] * (length + 7)

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = length + 3
        txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
        txpacket[PKT_PARAMETER0] = address

        txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]

        result = self.txPacket(port, txpacket)
        port.is_using = False

        return result

    def regWriteTxRx(self, port, scs_id, address, length, data):
        txpacket = [0] * (length + 7)

        txpacket[PKT_ID] = scs_id
        txpacket[PKT_LENGTH] = length + 3
        txpacket[PKT_INSTRUCTION] = INST_REG_WRITE
        txpacket[PKT_PARAMETER0] = address

        txpacket[PKT_PARAMETER0 + 1: PKT_PARAMETER0 + 1 + length] = data[0: length]

        _, result, error = self.txRxPacket(port, txpacket)

        return result, error

    def syncReadTx(self, port, start_address, data_length, param, param_length):
        txpacket = [0] * (param_length + 8)
        # 8: HEADER0 HEADER1 ID LEN INST START_ADDR DATA_LEN CHKSUM

        txpacket[PKT_ID] = BROADCAST_ID
        txpacket[PKT_LENGTH] = param_length + 4  # 7: INST START_ADDR DATA_LEN CHKSUM
        txpacket[PKT_INSTRUCTION] = INST_SYNC_READ
        txpacket[PKT_PARAMETER0 + 0] = start_address
        txpacket[PKT_PARAMETER0 + 1] = data_length

        txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + param_length] = param[0: param_length]

        result = self.txPacket(port, txpacket)
        if result == COMM_SUCCESS:
            port.setPacketTimeout((6 + data_length) * param_length)

        return result


    def syncWriteTxOnly(self, port, start_address, data_length, param, param_length):
        txpacket = [0] * (param_length + 8)
        # 8: HEADER0 HEADER1 ID LEN INST START_ADDR DATA_LEN ... CHKSUM

        txpacket[PKT_ID] = BROADCAST_ID
        txpacket[PKT_LENGTH] = param_length + 4  # 4: INST START_ADDR DATA_LEN ... CHKSUM
        txpacket[PKT_INSTRUCTION] = INST_SYNC_WRITE
        txpacket[PKT_PARAMETER0 + 0] = start_address
        txpacket[PKT_PARAMETER0 + 1] = data_length

        txpacket[PKT_PARAMETER0 + 2: PKT_PARAMETER0 + 2 + param_length] = param[0: param_length]

        _, result, _ = self.txRxPacket(port, txpacket)

        return result
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "rg -n \"def setup_motors\\(|def setup_motor\\(|def _find_single_motor_p0\\(|def broadcast_ping\\(|getRxPacketError|ERRBIT_VOLTAGE|def _read\\(|def _write\\(|class SOFollower|COMPATIBLE_DEVICES\" lerobot/src .venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py"
}
```

## Assistant

**Result** (success):
```
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:18:ERRBIT_VOLTAGE = 1
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:51:    def getRxPacketError(self, error):
.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py:52:        if error & ERRBIT_VOLTAGE:
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:50:COMPATIBLE_DEVICES = [
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:77:def setup_motors(cfg: SetupConfig):
lerobot/src/lerobot/scripts/lerobot_setup_motors.py:78:    if cfg.device.type not in COMPATIBLE_DEVICES:
lerobot/src/lerobot/robots/so_follower/config_so_follower.py:25:class SOFollowerConfig:
lerobot/src/lerobot/robots/so_follower/config_so_follower.py:48:class SOFollowerRobotConfig(RobotConfig, SOFollowerConfig):
lerobot/src/lerobot/robots/so_follower/so_follower.py:37:class SOFollower(Robot):
lerobot/src/lerobot/robots/so_follower/so_follower.py:171:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/openarm_follower/openarm_follower.py:216:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/omx_follower/omx_follower.py:161:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/lekiwi/lekiwi.py:201:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/homunculus/homunculus_glove.py:265:    def _read(
lerobot/src/lerobot/robots/koch_follower/koch_follower.py:178:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/bi_openarm_follower/bi_openarm_follower.py:133:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/homunculus/homunculus_arm.py:233:    def _read(
lerobot/src/lerobot/teleoperators/openarm_leader/openarm_leader.py:180:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/koch_leader/koch_leader.py:156:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/bi_openarm_leader/bi_openarm_leader.py:109:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/bi_so_leader/bi_so_leader.py:92:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/bi_so_follower/bi_so_follower.py:117:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/so_leader/so_leader.py:133:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/hope_jr/hope_jr_hand.py:153:    def setup_motors(self) -> None:
lerobot/src/lerobot/robots/hope_jr/hope_jr_arm.py:123:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/openarm_mini/openarm_mini.py:258:    def setup_motors(self) -> None:
lerobot/src/lerobot/teleoperators/omx_leader/omx_leader.py:145:    def setup_motors(self) -> None:
lerobot/src/lerobot/motors/feetech/feetech.py:175:    def _find_single_motor_p0(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
lerobot/src/lerobot/motors/feetech/feetech.py:405:    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
lerobot/src/lerobot/motors/feetech/feetech.py:421:            display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}
lerobot/src/lerobot/motors/motors_bus.py:215:    def getRxPacketError(self, error): ...
lerobot/src/lerobot/motors/motors_bus.py:576:    def setup_motor(
lerobot/src/lerobot/motors/motors_bus.py:958:                raise RuntimeError(self.packet_handler.getRxPacketError(error))
lerobot/src/lerobot/motors/motors_bus.py:965:    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
lerobot/src/lerobot/motors/motors_bus.py:1015:    def _read(
lerobot/src/lerobot/motors/motors_bus.py:1046:            raise RuntimeError(f"{err_msg} {self.packet_handler.getRxPacketError(error)}")
lerobot/src/lerobot/motors/motors_bus.py:1083:    def _write(
lerobot/src/lerobot/motors/motors_bus.py:1107:            raise RuntimeError(f"{err_msg} {self.packet_handler.getRxPacketError(error)}")
lerobot/src/lerobot/motors/dynamixel/dynamixel.py:249:    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "rg -n \"Max_Voltage_Limit|Min_Voltage_Limit|Present_Voltage|MODEL_PROTOCOL|sts3215\" lerobot/src/lerobot/motors/feetech/tables.py"
}
```

## Assistant

**Result** (success):
```
53:    "Max_Voltage_Limit": (14, 1),
54:    "Min_Voltage_Limit": (15, 1),
86:    "Present_Voltage": (62, 1),  # read-only
116:    "Max_Voltage_Limit": (14, 1),
117:    "Min_Voltage_Limit": (15, 1),
140:    "Present_Voltage": (62, 1),  # read-only
180:    "sts3215": STS_SMS_SERIES_CONTROL_TABLE,
190:    "sts3215": 4096,
201:    "sts3215": STS_SMS_SERIES_BAUDRATE_TABLE,
222:    "sts3215": STS_SMS_SERIES_ENCODINGS_TABLE,
243:    "sts3215": 777,
249:MODEL_PROTOCOL = {
253:    "sts3215": 0,
```

## Assistant

## Assistant

Great question — I traced the source in `./lerobot/`, and the key thing is:

**LeRobot is not hardcoding “7.4V motor with 12V supply” checks itself.**  
It relies on the **motor firmware status error bits** returned in Feetech status packets.

### Where this comes from in code

- CLI entrypoint:
  - `lerobot/pyproject.toml` → `lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"`
  - `lerobot/src/lerobot/scripts/lerobot_setup_motors.py`
- Robot setup flow:
  - `lerobot/src/lerobot/robots/so_follower/so_follower.py` → `SOFollower.setup_motors()`
  - Prompts motor-by-motor, then calls `self.bus.setup_motor(motor)`
- Bus setup flow:
  - `lerobot/src/lerobot/motors/motors_bus.py` → `SerialMotorsBus.setup_motor()`
  - Calls `_find_single_motor(...)`
- Feetech scan/ping logic:
  - `lerobot/src/lerobot/motors/feetech/feetech.py`
  - `_find_single_motor_p0()` → `broadcast_ping()`
  - `broadcast_ping()` collects per-motor status/error and logs:
    - `Some motors found returned an error status: ...`
    - exactly where your message comes from

### Why you see `[RxPacketError] Input voltage error!`

In SDK (not LeRobot core logic):

- `.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py`
  - `ERRBIT_VOLTAGE = 1`
  - `getRxPacketError()` maps that bit to:
    - `"[RxPacketError] Input voltage error!"`

So physically, the motor firmware detects out-of-range voltage, sets error bit in status byte, and LeRobot surfaces it.

---

## If you wrote this stack yourself, it would look like:

1. Send ping/read/write packet over UART
2. Parse status packet `{id, error_byte, payload}`
3. Decode error flags via bitmask (`error & 0x01` => voltage)
4. Decide policy:
   - warn + continue
   - fail hard
   - auto read `Present_Voltage`, `Min_Voltage_Limit`, `Max_Voltage_Limit`

LeRobot currently mostly does:
- log warning on broadcast ping errors
- throw runtime errors for read/write when error bit set (depending path)

---

If you want, next I can help you add a **debug patch** in LeRobot so `setup-motors` prints:
- raw error byte,
- decoded flags,
- live `Present_Voltage`,
- configured min/max voltage limits  
for each detected motor during setup. That’d give you exactly the hardware-level visibility you want.

## User

Okay so you can see the port that it's actually connected to. Can you write a small script and check with me before running it, one that shows me the exact voltage error we're getting and directly reads the Fetec Motor and investigates a bit? I don't want to override anything. I just want to see: is there more information being given to us? That the robot library is justifiably hiding for UX reasons

## Assistant

## Assistant

### Tool: write

**Args:**
```json
{
  "content": "#!/usr/bin/env python3\n\"\"\"\nRead-only Feetech/STS motor inspector for voltage-related diagnostics.\n\nThis script DOES NOT write anything to motors.\nIt only sends ping/read instructions and prints raw + decoded status.\n\nExample:\n  python scripts/inspect_feetech_voltage_readonly.py \\\n    --port /dev/tty.usbmodem59710814421 \\\n    --baudrate 1000000 \\\n    --protocol 0\n\nIf you already know the motor ID:\n  python scripts/inspect_feetech_voltage_readonly.py \\\n    --port /dev/tty.usbmodem59710814421 \\\n    --id 1\n\"\"\"\n\nfrom __future__ import annotations\n\nimport argparse\nfrom dataclasses import dataclass\n\nimport scservo_sdk as scs\n\n# Reuse LeRobot's timeout patch and control table definitions\nfrom lerobot.motors.feetech.feetech import patch_setPacketTimeout\nfrom lerobot.motors.feetech.tables import STS_SMS_SERIES_CONTROL_TABLE\n\n\n@dataclass\nclass ReadResult:\n    value: int\n    comm: int\n    error: int\n\n\ndef decode_error_bits(error: int) -> list[str]:\n    # SDK constants live in scservo_sdk.protocol_packet_handler.py\n    bits = [\n        (getattr(scs, \"ERRBIT_VOLTAGE\", 1), \"Input voltage\"),\n        (getattr(scs, \"ERRBIT_ANGLE\", 2), \"Angle\"),\n        (getattr(scs, \"ERRBIT_OVERHEAT\", 4), \"Overheat\"),\n        (getattr(scs, \"ERRBIT_OVERELE\", 8), \"Over-current/over-ele\"),\n        (getattr(scs, \"ERRBIT_OVERLOAD\", 32), \"Overload\"),\n    ]\n    return [name for bit, name in bits if error & bit]\n\n\ndef read_reg(packet_handler, port_handler, motor_id: int, addr: int, length: int) -> ReadResult:\n    if length == 1:\n        value, comm, error = packet_handler.read1ByteTxRx(port_handler, motor_id, addr)\n    elif length == 2:\n        value, comm, error = packet_handler.read2ByteTxRx(port_handler, motor_id, addr)\n    elif length == 4:\n        value, comm, error = packet_handler.read4ByteTxRx(port_handler, motor_id, addr)\n    else:\n        raise ValueError(f\"Unsupported register size: {length}\")\n    return ReadResult(value=value, comm=comm, error=error)\n\n\ndef fmt_comm(packet_handler, comm: int) -> str:\n    return packet_handler.getTxRxResult(comm)\n\n\ndef fmt_err(packet_handler, error: int) -> str:\n    if error == 0:\n        return \"none\"\n    primary = packet_handler.getRxPacketError(error)\n    all_flags = decode_error_bits(error)\n    return f\"{primary} | all_flags={all_flags} | raw=0x{error:02X}\"\n\n\ndef scan_ids(packet_handler, port_handler, max_id: int) -> list[tuple[int, int, int, int]]:\n    found = []\n    for motor_id in range(max_id + 1):\n        model, comm, error = packet_handler.ping(port_handler, motor_id)\n        if comm == scs.COMM_SUCCESS:\n            found.append((motor_id, model, comm, error))\n    return found\n\n\ndef main() -> None:\n    parser = argparse.ArgumentParser(description=\"Read-only Feetech voltage/error inspector\")\n    parser.add_argument(\"--port\", required=True, help=\"Serial port (e.g. /dev/tty.usbmodemXXXX)\")\n    parser.add_argument(\"--baudrate\", type=int, default=1_000_000)\n    parser.add_argument(\"--protocol\", type=int, default=0, choices=[0, 1])\n    parser.add_argument(\"--timeout-ms\", type=int, default=1000)\n    parser.add_argument(\"--id\", type=int, default=None, help=\"Specific motor ID to inspect\")\n    parser.add_argument(\"--scan-max-id\", type=int, default=20, help=\"ID scan upper bound when --id not given\")\n    args = parser.parse_args()\n\n    port_handler = scs.PortHandler(args.port)\n    # Same monkeypatch LeRobot applies, keeps timeout behavior sane for this SDK build\n    port_handler.setPacketTimeout = patch_setPacketTimeout.__get__(port_handler, scs.PortHandler)\n    packet_handler = scs.PacketHandler(args.protocol)\n\n    if not port_handler.openPort():\n        raise SystemExit(f\"Failed to open port: {args.port}\")\n\n    try:\n        if not port_handler.setBaudRate(args.baudrate):\n            raise SystemExit(f\"Failed to set baudrate: {args.baudrate}\")\n\n        port_handler.setPacketTimeoutMillis(args.timeout_ms)\n\n        print(\"=== Bus ===\")\n        print(f\"port       : {args.port}\")\n        print(f\"baudrate   : {port_handler.getBaudRate()}\")\n        print(f\"protocol   : {args.protocol}\")\n        print(f\"timeout_ms : {args.timeout_ms}\")\n\n        if args.id is None:\n            print(f\"\\n=== Scanning IDs 0..{args.scan_max_id} (read-only ping) ===\")\n            found = scan_ids(packet_handler, port_handler, args.scan_max_id)\n            if not found:\n                print(\"No responding IDs found in scan range.\")\n                return\n\n            for motor_id, model, comm, error in found:\n                print(\n                    f\"ID {motor_id:3d} | model={model:5d} | comm={fmt_comm(packet_handler, comm)} \"\n                    f\"| ping_error={fmt_err(packet_handler, error)}\"\n                )\n            target_ids = [m[0] for m in found]\n        else:\n            model, comm, error = packet_handler.ping(port_handler, args.id)\n            print(\"\\n=== Single-ID ping ===\")\n            print(\n                f\"ID {args.id:3d} | model={model:5d} | comm={fmt_comm(packet_handler, comm)} \"\n                f\"| ping_error={fmt_err(packet_handler, error)}\"\n            )\n            if comm != scs.COMM_SUCCESS:\n                return\n            target_ids = [args.id]\n\n        regs = [\n            (\"Status\", *STS_SMS_SERIES_CONTROL_TABLE[\"Status\"]),\n            (\"Present_Voltage\", *STS_SMS_SERIES_CONTROL_TABLE[\"Present_Voltage\"]),\n            (\"Min_Voltage_Limit\", *STS_SMS_SERIES_CONTROL_TABLE[\"Min_Voltage_Limit\"]),\n            (\"Max_Voltage_Limit\", *STS_SMS_SERIES_CONTROL_TABLE[\"Max_Voltage_Limit\"]),\n            (\"Present_Temperature\", *STS_SMS_SERIES_CONTROL_TABLE[\"Present_Temperature\"]),\n            (\"Present_Current\", *STS_SMS_SERIES_CONTROL_TABLE[\"Present_Current\"]),\n            (\"Unloading_Condition\", *STS_SMS_SERIES_CONTROL_TABLE[\"Unloading_Condition\"]),\n            (\"LED_Alarm_Condition\", *STS_SMS_SERIES_CONTROL_TABLE[\"LED_Alarm_Condition\"]),\n        ]\n\n        print(\"\\n=== Register reads (read-only) ===\")\n        for motor_id in target_ids:\n            print(f\"\\n-- ID {motor_id} --\")\n            for name, addr, length in regs:\n                rr = read_reg(packet_handler, port_handler, motor_id, addr, length)\n\n                # Common unit hints from Feetech docs:\n                # - Present_Voltage / Min/Max_Voltage_Limit are typically in 0.1V units.\n                unit_hint = \"\"\n                if \"Voltage\" in name:\n                    unit_hint = f\" (~{rr.value / 10:.1f}V if 0.1V units)\"\n                elif \"Temperature\" in name:\n                    unit_hint = f\" (~{rr.value}°C)\"\n\n                print(\n                    f\"{name:22s} addr={addr:>3} len={length} value={rr.value:>5}{unit_hint} \"\n                    f\"| comm={fmt_comm(packet_handler, rr.comm)} \"\n                    f\"| error={fmt_err(packet_handler, rr.error)}\"\n                )\n\n    finally:\n        port_handler.closePort()\n        print(\"\\nPort closed.\")\n\n\nif __name__ == \"__main__\":\n    main()\n",
  "path": "scripts/inspect_feetech_voltage_readonly.py"
}
```

## Assistant

**Result** (success):
```
Successfully wrote 6891 bytes to scripts/inspect_feetech_voltage_readonly.py
```

## Assistant

## Assistant

Perfect — I wrote a **read-only diagnostic script** here:

- `scripts/inspect_feetech_voltage_readonly.py`

It does **not write** any registers. It only:
- opens the serial port,
- pings motor IDs,
- reads voltage/error-related registers,
- prints raw SDK comm status + decoded error bits.

It should help answer exactly your question: *is there extra info beyond the UX-level message?*  
(especially raw error byte and multiple flags, not just the first mapped string).

### Suggested command (scan mode)
```bash
python scripts/inspect_feetech_voltage_readonly.py \
  --port /dev/tty.usbmodem59710814421 \
  --baudrate 1000000 \
  --protocol 0
```

### If you already know the ID
```bash
python scripts/inspect_feetech_voltage_readonly.py \
  --port /dev/tty.usbmodem59710814421 \
  --id 1
```

If you want, I can run one of these now — but I’ll wait for your go-ahead as requested.

## User

Yeah we're on the scan mode.

## User

Yeah run  the scan mode.

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "python scripts/inspect_feetech_voltage_readonly.py --port /dev/tty.usbmodem59710814421 --baudrate 1000000 --protocol 0",
  "timeout": 120
}
```

## Assistant

**Result** (success):
```
=== Bus ===
port       : /dev/tty.usbmodem59710814421
baudrate   : 1000000
protocol   : 0
timeout_ms : 1000

=== Scanning IDs 0..20 (read-only ping) ===
ID   1 | model=  777 | comm=[TxRxResult] Communication success! | ping_error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01

=== Register reads (read-only) ===

-- ID 1 --
Status                 addr= 65 len=1 value=    1 | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01
Present_Voltage        addr= 62 len=1 value=  122 (~12.2V if 0.1V units) | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01
Min_Voltage_Limit      addr= 15 len=1 value=   40 (~4.0V if 0.1V units) | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01
Max_Voltage_Limit      addr= 14 len=1 value=   80 (~8.0V if 0.1V units) | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01
Present_Temperature    addr= 63 len=1 value=   24 (~24°C) | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01
Present_Current        addr= 69 len=2 value=    0 | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01
Unloading_Condition    addr= 19 len=1 value=   44 | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01
LED_Alarm_Condition    addr= 20 len=1 value=   47 | comm=[TxRxResult] Communication success! | error=[RxPacketError] Input voltage error! | all_flags=['Input voltage'] | raw=0x01

Port closed.
```

## Assistant

## Assistant

Done — scan mode ran successfully.

### What we found (key points)

- Found motor: **ID 1**, model **777** (`sts3215`)
- Error byte everywhere: **`0x01`** = **Input voltage error**
- `Present_Voltage`: **122** → ~**12.2V**
- `Max_Voltage_Limit`: **80** → ~**8.0V**
- `Min_Voltage_Limit`: **40** → ~**4.0V**

So yes, there **is more information** than the UX message:
- LeRobot surfaces the high-level error string,
- but raw register reads confirm the exact mismatch:
  **bus voltage (~12.2V) is above motor max limit (~8.0V).**

Also confirmed: this script performed only **ping/read** operations (no writes).

If you want, next I can extend the script to decode bitfields like `Unloading_Condition` and `LED_Alarm_Condition` into human-readable flags too.

## User

Can you write a PR against the robot library updating the error? When it gives this voltage mismatch error, actually fetch this extra information and pass that back into the library and then also do it nicely in a way that:
1. First get the error; let the robot library surface that error.
2. If it is a voltage error, say, "Hey we're gonna go check for this data." Actually, is it? Do we get all this information in one round trip? If in that case then just display it as a secondary message but if we don't get it then you should first finish displaying the first round before fetching the second round to get the extra min max present because that gives the end user a lot more information.

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "limit": 180,
  "offset": 340,
  "path": "lerobot/src/lerobot/motors/feetech/feetech.py"
}
```

## Assistant

**Result** (success):
```
rx_length = 0
        wait_length = status_length * scs.MAX_ID

        txpacket = [0] * 6

        tx_time_per_byte = (1000.0 / self.port_handler.getBaudRate()) * 10.0

        txpacket[scs.PKT_ID] = scs.BROADCAST_ID
        txpacket[scs.PKT_LENGTH] = 2
        txpacket[scs.PKT_INSTRUCTION] = scs.INST_PING

        result = self.packet_handler.txPacket(self.port_handler, txpacket)
        if result != scs.COMM_SUCCESS:
            self.port_handler.is_using = False
            return data_list, result

        # set rx timeout
        self.port_handler.setPacketTimeoutMillis((wait_length * tx_time_per_byte) + (3.0 * scs.MAX_ID) + 16.0)

        rxpacket = []
        while not self.port_handler.isPacketTimeout() and rx_length < wait_length:
            rxpacket += self.port_handler.readPort(wait_length - rx_length)
            rx_length = len(rxpacket)

        self.port_handler.is_using = False

        if rx_length == 0:
            return data_list, scs.COMM_RX_TIMEOUT

        while True:
            if rx_length < status_length:
                return data_list, scs.COMM_RX_CORRUPT

            # find packet header
            for idx in range(0, (rx_length - 1)):
                if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
                    break

            if idx == 0:  # found at the beginning of the packet
                # calculate checksum
                checksum = 0
                for idx in range(2, status_length - 1):  # except header & checksum
                    checksum += rxpacket[idx]

                checksum = ~checksum & 0xFF
                if rxpacket[status_length - 1] == checksum:
                    result = scs.COMM_SUCCESS
                    data_list[rxpacket[scs.PKT_ID]] = rxpacket[scs.PKT_ERROR]

                    del rxpacket[0:status_length]
                    rx_length = rx_length - status_length

                    if rx_length == 0:
                        return data_list, result
                else:
                    result = scs.COMM_RX_CORRUPT
                    # remove header (0xFF 0xFF)
                    del rxpacket[0:2]
                    rx_length = rx_length - 2
            else:
                # remove unnecessary packets
                del rxpacket[0:idx]
                rx_length = rx_length - idx

    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
        self._assert_protocol_is_compatible("broadcast_ping")
        for n_try in range(1 + num_retry):
            ids_status, comm = self._broadcast_ping()
            if self._is_comm_success(comm):
                break
            logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")
            logger.debug(self.packet_handler.getTxRxResult(comm))

        if not self._is_comm_success(comm):
            if raise_on_error:
                raise ConnectionError(self.packet_handler.getTxRxResult(comm))
            return None

        ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
        if ids_errors:
            display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}
            logger.error(f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}")

        return self._read_model_number(list(ids_status), raise_on_error)

    def _read_firmware_version(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, str]:
        firmware_versions = {}
        for id_ in motor_ids:
            firm_ver_major, comm, error = self._read(
                *FIRMWARE_MAJOR_VERSION, id_, raise_on_error=raise_on_error
            )
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            firm_ver_minor, comm, error = self._read(
                *FIRMWARE_MINOR_VERSION, id_, raise_on_error=raise_on_error
            )
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            firmware_versions[id_] = f"{firm_ver_major}.{firm_ver_minor}"

        return firmware_versions

    def _read_model_number(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]:
        model_numbers = {}
        for id_ in motor_ids:
            model_nb, comm, error = self._read(*MODEL_NUMBER, id_, raise_on_error=raise_on_error)
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            model_numbers[id_] = model_nb

        return model_numbers
```

## Assistant

## Assistant

### Tool: edit

**Args:**
```json
{
  "newText": "NORMALIZED_DATA = [\"Goal_Position\", \"Present_Position\"]\n\n# Register addresses used for second-pass voltage diagnostics when a status packet\n# reports a voltage-related error bit.\nVOLTAGE_DIAGNOSTIC_REGISTERS = {\n    \"Status\": (65, 1),\n    \"Present_Voltage\": (62, 1),\n    \"Min_Voltage_Limit\": (15, 1),\n    \"Max_Voltage_Limit\": (14, 1),\n}\n\nlogger = logging.getLogger(__name__)\n",
  "oldText": "NORMALIZED_DATA = [\"Goal_Position\", \"Present_Position\"]\n\nlogger = logging.getLogger(__name__)\n",
  "path": "lerobot/src/lerobot/motors/feetech/feetech.py"
}
```

## Assistant

**Result** (success):
```
Successfully replaced text in lerobot/src/lerobot/motors/feetech/feetech.py.
```

## Assistant

## Assistant

### Tool: edit

**Args:**
```json
{
  "newText": "    def _has_voltage_error(self, error: int) -> bool:\n        import scservo_sdk as scs\n\n        return bool(error & scs.ERRBIT_VOLTAGE)\n\n    def _read_voltage_diagnostics(self, motor_ids: list[int]) -> dict[int, dict[str, dict[str, int | float | str]]]:\n        \"\"\"Best-effort, read-only diagnostics for voltage-related status errors.\"\"\"\n        diagnostics = {}\n        for id_ in motor_ids:\n            diagnostics[id_] = {}\n            for reg_name, (addr, length) in VOLTAGE_DIAGNOSTIC_REGISTERS.items():\n                value, comm, error = self._read(addr, length, id_, raise_on_error=False)\n                entry: dict[str, int | float | str] = {\n                    \"value\": int(value),\n                    \"comm\": self.packet_handler.getTxRxResult(comm),\n                    \"error\": self.packet_handler.getRxPacketError(error),\n                    \"error_raw\": f\"0x{error:02X}\",\n                }\n                if \"Voltage\" in reg_name:\n                    entry[\"volts\"] = round(value / 10.0, 2)\n                diagnostics[id_][reg_name] = entry\n\n        return diagnostics\n\n    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:\n        self._assert_protocol_is_compatible(\"broadcast_ping\")\n        for n_try in range(1 + num_retry):\n            ids_status, comm = self._broadcast_ping()\n            if self._is_comm_success(comm):\n                break\n            logger.debug(f\"Broadcast ping failed on port '{self.port}' ({n_try=})\")\n            logger.debug(self.packet_handler.getTxRxResult(comm))\n\n        if not self._is_comm_success(comm):\n            if raise_on_error:\n                raise ConnectionError(self.packet_handler.getTxRxResult(comm))\n            return None\n\n        ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}\n        if ids_errors:\n            display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}\n            logger.error(f\"Some motors found returned an error status:\\n{pformat(display_dict, indent=4)}\")\n\n            voltage_error_ids = [id_ for id_, err in ids_errors.items() if self._has_voltage_error(err)]\n            if voltage_error_ids:\n                logger.error(\n                    \"Voltage-related motor error(s) detected on ids=%s. \"\n                    \"Collecting additional read-only diagnostics...\",\n                    voltage_error_ids,\n                )\n                diagnostics = self._read_voltage_diagnostics(voltage_error_ids)\n                logger.error(\"Additional voltage diagnostics:\\n%s\", pformat(diagnostics, indent=4))\n\n        return self._read_model_number(list(ids_status), raise_on_error)\n",
  "oldText": "    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:\n        self._assert_protocol_is_compatible(\"broadcast_ping\")\n        for n_try in range(1 + num_retry):\n            ids_status, comm = self._broadcast_ping()\n            if self._is_comm_success(comm):\n                break\n            logger.debug(f\"Broadcast ping failed on port '{self.port}' ({n_try=})\")\n            logger.debug(self.packet_handler.getTxRxResult(comm))\n\n        if not self._is_comm_success(comm):\n            if raise_on_error:\n                raise ConnectionError(self.packet_handler.getTxRxResult(comm))\n            return None\n\n        ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}\n        if ids_errors:\n            display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}\n            logger.error(f\"Some motors found returned an error status:\\n{pformat(display_dict, indent=4)}\")\n\n        return self._read_model_number(list(ids_status), raise_on_error)\n",
  "path": "lerobot/src/lerobot/motors/feetech/feetech.py"
}
```

## Assistant

**Result** (success):
```
Successfully replaced text in lerobot/src/lerobot/motors/feetech/feetech.py.
```

## Assistant

## Assistant

### Tool: edit

**Args:**
```json
{
  "newText": "def test_broadcast_ping(mock_motors, dummy_motors):\n    models = {m.id: m.model for m in dummy_motors.values()}\n    addr, length = MODEL_NUMBER\n    ping_stub = mock_motors.build_broadcast_ping_stub(list(models))\n    mobel_nb_stubs = []\n    expected_model_nbs = {}\n    for id_, model in models.items():\n        model_nb = MODEL_NUMBER_TABLE[model]\n        stub = mock_motors.build_read_stub(addr, length, id_, model_nb)\n        expected_model_nbs[id_] = model_nb\n        mobel_nb_stubs.append(stub)\n    bus = FeetechMotorsBus(\n        port=mock_motors.port,\n        motors=dummy_motors,\n    )\n    bus.connect(handshake=False)\n\n    ping_model_nbs = bus.broadcast_ping()\n\n    assert ping_model_nbs == expected_model_nbs\n    assert mock_motors.stubs[ping_stub].called\n    assert all(mock_motors.stubs[stub].called for stub in mobel_nb_stubs)\n\n\ndef test_broadcast_ping_voltage_error_logs_extra_diagnostics(caplog, mock_motors, dummy_motors):\n    id_ = 1\n    ping_stub = mock_motors.build_broadcast_ping_stub([id_])\n\n    # First model-number read pass done by broadcast_ping; keep error bit set to mimic undervoltage/overvoltage.\n    model_addr, model_len = MODEL_NUMBER\n    model_nb = MODEL_NUMBER_TABLE[dummy_motors[f\"dummy_{id_}\"].model]\n    model_read_stub = mock_motors.build_read_stub(model_addr, model_len, id_, model_nb, error=scs.ERRBIT_VOLTAGE)\n\n    status_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Status\"], id_, 1)\n    pv_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Present_Voltage\"], id_, 122)\n    min_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Min_Voltage_Limit\"], id_, 40)\n    max_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Max_Voltage_Limit\"], id_, 80)\n\n    bus = FeetechMotorsBus(\n        port=mock_motors.port,\n        motors=dummy_motors,\n    )\n    bus.connect(handshake=False)\n\n    with caplog.at_level(\"ERROR\"):\n        bus.broadcast_ping()\n\n    assert mock_motors.stubs[ping_stub].called\n    assert mock_motors.stubs[model_read_stub].called\n    assert mock_motors.stubs[status_read_stub].called\n    assert mock_motors.stubs[pv_read_stub].called\n    assert mock_motors.stubs[min_v_read_stub].called\n    assert mock_motors.stubs[max_v_read_stub].called\n\n    assert \"Some motors found returned an error status\" in caplog.text\n    assert \"Collecting additional read-only diagnostics\" in caplog.text\n    assert \"Additional voltage diagnostics\" in caplog.text\n    assert \"Present_Voltage\" in caplog.text\n\n\n@pytest.mark.parametrize(\n",
  "oldText": "def test_broadcast_ping(mock_motors, dummy_motors):\n    models = {m.id: m.model for m in dummy_motors.values()}\n    addr, length = MODEL_NUMBER\n    ping_stub = mock_motors.build_broadcast_ping_stub(list(models))\n    mobel_nb_stubs = []\n    expected_model_nbs = {}\n    for id_, model in models.items():\n        model_nb = MODEL_NUMBER_TABLE[model]\n        stub = mock_motors.build_read_stub(addr, length, id_, model_nb)\n        expected_model_nbs[id_] = model_nb\n        mobel_nb_stubs.append(stub)\n    bus = FeetechMotorsBus(\n        port=mock_motors.port,\n        motors=dummy_motors,\n    )\n    bus.connect(handshake=False)\n\n    ping_model_nbs = bus.broadcast_ping()\n\n    assert ping_model_nbs == expected_model_nbs\n    assert mock_motors.stubs[ping_stub].called\n    assert all(mock_motors.stubs[stub].called for stub in mobel_nb_stubs)\n\n\n@pytest.mark.parametrize(\n",
  "path": "lerobot/tests/motors/test_feetech.py"
}
```

## Assistant

**Result** (success):
```
Successfully replaced text in lerobot/tests/motors/test_feetech.py.
```

## Assistant

## Assistant

### Tool: edit

**Args:**
```json
{
  "newText": "def test_broadcast_ping_voltage_error_logs_extra_diagnostics(caplog, mock_motors, dummy_motors):\n    id_ = 1\n\n    # First model-number read pass done by broadcast_ping; keep error bit set to mimic undervoltage/overvoltage.\n    model_addr, model_len = MODEL_NUMBER\n    model_nb = MODEL_NUMBER_TABLE[dummy_motors[f\"dummy_{id_}\"].model]\n    model_read_stub = mock_motors.build_read_stub(model_addr, model_len, id_, model_nb, error=scs.ERRBIT_VOLTAGE)\n\n    status_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Status\"], id_, 1)\n    pv_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Present_Voltage\"], id_, 122)\n    min_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Min_Voltage_Limit\"], id_, 40)\n    max_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Max_Voltage_Limit\"], id_, 80)\n\n    bus = FeetechMotorsBus(\n        port=mock_motors.port,\n        motors=dummy_motors,\n    )\n    bus.connect(handshake=False)\n    bus._broadcast_ping = MagicMock(return_value=({id_: scs.ERRBIT_VOLTAGE}, scs.COMM_SUCCESS))\n\n    with caplog.at_level(\"ERROR\"):\n        bus.broadcast_ping()\n\n    assert mock_motors.stubs[model_read_stub].called\n    assert mock_motors.stubs[status_read_stub].called\n    assert mock_motors.stubs[pv_read_stub].called\n    assert mock_motors.stubs[min_v_read_stub].called\n    assert mock_motors.stubs[max_v_read_stub].called\n\n    assert \"Some motors found returned an error status\" in caplog.text\n    assert \"Collecting additional read-only diagnostics\" in caplog.text\n    assert \"Additional voltage diagnostics\" in caplog.text\n    assert \"Present_Voltage\" in caplog.text\n",
  "oldText": "def test_broadcast_ping_voltage_error_logs_extra_diagnostics(caplog, mock_motors, dummy_motors):\n    id_ = 1\n    ping_stub = mock_motors.build_broadcast_ping_stub([id_])\n\n    # First model-number read pass done by broadcast_ping; keep error bit set to mimic undervoltage/overvoltage.\n    model_addr, model_len = MODEL_NUMBER\n    model_nb = MODEL_NUMBER_TABLE[dummy_motors[f\"dummy_{id_}\"].model]\n    model_read_stub = mock_motors.build_read_stub(model_addr, model_len, id_, model_nb, error=scs.ERRBIT_VOLTAGE)\n\n    status_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Status\"], id_, 1)\n    pv_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Present_Voltage\"], id_, 122)\n    min_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Min_Voltage_Limit\"], id_, 40)\n    max_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE[\"Max_Voltage_Limit\"], id_, 80)\n\n    bus = FeetechMotorsBus(\n        port=mock_motors.port,\n        motors=dummy_motors,\n    )\n    bus.connect(handshake=False)\n\n    with caplog.at_level(\"ERROR\"):\n        bus.broadcast_ping()\n\n    assert mock_motors.stubs[ping_stub].called\n    assert mock_motors.stubs[model_read_stub].called\n    assert mock_motors.stubs[status_read_stub].called\n    assert mock_motors.stubs[pv_read_stub].called\n    assert mock_motors.stubs[min_v_read_stub].called\n    assert mock_motors.stubs[max_v_read_stub].called\n\n    assert \"Some motors found returned an error status\" in caplog.text\n    assert \"Collecting additional read-only diagnostics\" in caplog.text\n    assert \"Additional voltage diagnostics\" in caplog.text\n    assert \"Present_Voltage\" in caplog.text\n",
  "path": "lerobot/tests/motors/test_feetech.py"
}
```

## Assistant

**Result** (success):
```
Successfully replaced text in lerobot/tests/motors/test_feetech.py.
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "cd lerobot && pytest -q tests/motors/test_feetech.py -q",
  "timeout": 120
}
```

## Assistant

**Result** (error):
```
ImportError while loading conftest '/Users/mukund/dev/robot-1/lerobot/tests/conftest.py'.
tests/conftest.py:20: in <module>
    from serial import SerialException
E   ModuleNotFoundError: No module named 'serial'


Command exited with code 4
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "./.venv/bin/python -m pytest -q lerobot/tests/motors/test_feetech.py -q",
  "timeout": 120
}
```

## Assistant

**Result** (error):
```
/Users/mukund/dev/robot-1/.venv/bin/python: No module named pytest


Command exited with code 1
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "./.venv/bin/python -m py_compile lerobot/src/lerobot/motors/feetech/feetech.py lerobot/tests/motors/test_feetech.py"
}
```

## Assistant

**Result** (success):
```
(no output)
```

## Assistant

## Assistant

### Tool: bash

**Args:**
```json
{
  "command": "git -C lerobot status --short"
}
```

## Assistant

**Result** (success):
```
M src/lerobot/motors/feetech/feetech.py
 M tests/motors/test_feetech.py
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "limit": 140,
  "offset": 1,
  "path": "lerobot/src/lerobot/motors/feetech/feetech.py"
}
```

## Assistant

**Result** (success):
```
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from copy import deepcopy
from enum import Enum
from pprint import pformat

from ..encoding_utils import decode_sign_magnitude, encode_sign_magnitude
from ..motors_bus import Motor, MotorCalibration, NameOrID, SerialMotorsBus, Value, get_address
from .tables import (
    FIRMWARE_MAJOR_VERSION,
    FIRMWARE_MINOR_VERSION,
    MODEL_BAUDRATE_TABLE,
    MODEL_CONTROL_TABLE,
    MODEL_ENCODING_TABLE,
    MODEL_NUMBER,
    MODEL_NUMBER_TABLE,
    MODEL_PROTOCOL,
    MODEL_RESOLUTION,
    SCAN_BAUDRATES,
)

DEFAULT_PROTOCOL_VERSION = 0
DEFAULT_BAUDRATE = 1_000_000
DEFAULT_TIMEOUT_MS = 1000

NORMALIZED_DATA = ["Goal_Position", "Present_Position"]

# Register addresses used for second-pass voltage diagnostics when a status packet
# reports a voltage-related error bit.
VOLTAGE_DIAGNOSTIC_REGISTERS = {
    "Status": (65, 1),
    "Present_Voltage": (62, 1),
    "Min_Voltage_Limit": (15, 1),
    "Max_Voltage_Limit": (14, 1),
}

logger = logging.getLogger(__name__)


class OperatingMode(Enum):
    # position servo mode
    POSITION = 0
    # The motor is in constant speed mode, which is controlled by parameter 0x2e, and the highest bit 15 is
    # the direction bit
    VELOCITY = 1
    # PWM open-loop speed regulation mode, with parameter 0x2c running time parameter control, bit11 as
    # direction bit
    PWM = 2
    # In step servo mode, the number of step progress is represented by parameter 0x2a, and the highest bit 15
    # is the direction bit
    STEP = 3


class DriveMode(Enum):
    NON_INVERTED = 0
    INVERTED = 1


class TorqueMode(Enum):
    ENABLED = 1
    DISABLED = 0


def _split_into_byte_chunks(value: int, length: int) -> list[int]:
    import scservo_sdk as scs

    if length == 1:
        data = [value]
    elif length == 2:
        data = [scs.SCS_LOBYTE(value), scs.SCS_HIBYTE(value)]
    elif length == 4:
        data = [
            scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
            scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
            scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
            scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
        ]
    return data


def patch_setPacketTimeout(self, packet_length):  # noqa: N802
    """
    HACK: This patches the PortHandler behavior to set the correct packet timeouts.

    It fixes https://gitee.com/ftservo/SCServoSDK/issues/IBY2S6
    The bug is fixed on the official Feetech SDK repo (https://gitee.com/ftservo/FTServo_Python)
    but because that version is not published on PyPI, we rely on the (unofficial) on that is, which needs
    patching.
    """
    self.packet_start_time = self.getCurrentTime()
    self.packet_timeout = (self.tx_time_per_byte * packet_length) + (self.tx_time_per_byte * 3.0) + 50


class FeetechMotorsBus(SerialMotorsBus):
    """
    The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the
    python feetech sdk to communicate with the motors, which is itself based on the dynamixel sdk.
    """

    apply_drive_mode = True
    available_baudrates = deepcopy(SCAN_BAUDRATES)
    default_baudrate = DEFAULT_BAUDRATE
    default_timeout = DEFAULT_TIMEOUT_MS
    model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
    model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
    model_encoding_table = deepcopy(MODEL_ENCODING_TABLE)
    model_number_table = deepcopy(MODEL_NUMBER_TABLE)
    model_resolution_table = deepcopy(MODEL_RESOLUTION)
    normalized_data = deepcopy(NORMALIZED_DATA)

    def __init__(
        self,
        port: str,
        motors: dict[str, Motor],
        calibration: dict[str, MotorCalibration] | None = None,
        protocol_version: int = DEFAULT_PROTOCOL_VERSION,
    ):
        super().__init__(port, motors, calibration)
        self.protocol_version = protocol_version
        self._assert_same_protocol()
        import scservo_sdk as scs

        self.port_handler = scs.PortHandler(self.port)
        # HACK: monkeypatch
        self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__(  # type: ignore[method-assign]
            self.port_handler, scs.PortHandler
        )

[358 more lines in file. Use offset=141 to continue.]
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "limit": 170,
  "offset": 380,
  "path": "lerobot/src/lerobot/motors/feetech/feetech.py"
}
```

## Assistant

**Result** (success):
```
if rx_length < status_length:
                return data_list, scs.COMM_RX_CORRUPT

            # find packet header
            for idx in range(0, (rx_length - 1)):
                if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF):
                    break

            if idx == 0:  # found at the beginning of the packet
                # calculate checksum
                checksum = 0
                for idx in range(2, status_length - 1):  # except header & checksum
                    checksum += rxpacket[idx]

                checksum = ~checksum & 0xFF
                if rxpacket[status_length - 1] == checksum:
                    result = scs.COMM_SUCCESS
                    data_list[rxpacket[scs.PKT_ID]] = rxpacket[scs.PKT_ERROR]

                    del rxpacket[0:status_length]
                    rx_length = rx_length - status_length

                    if rx_length == 0:
                        return data_list, result
                else:
                    result = scs.COMM_RX_CORRUPT
                    # remove header (0xFF 0xFF)
                    del rxpacket[0:2]
                    rx_length = rx_length - 2
            else:
                # remove unnecessary packets
                del rxpacket[0:idx]
                rx_length = rx_length - idx

    def _has_voltage_error(self, error: int) -> bool:
        import scservo_sdk as scs

        return bool(error & scs.ERRBIT_VOLTAGE)

    def _read_voltage_diagnostics(self, motor_ids: list[int]) -> dict[int, dict[str, dict[str, int | float | str]]]:
        """Best-effort, read-only diagnostics for voltage-related status errors."""
        diagnostics = {}
        for id_ in motor_ids:
            diagnostics[id_] = {}
            for reg_name, (addr, length) in VOLTAGE_DIAGNOSTIC_REGISTERS.items():
                value, comm, error = self._read(addr, length, id_, raise_on_error=False)
                entry: dict[str, int | float | str] = {
                    "value": int(value),
                    "comm": self.packet_handler.getTxRxResult(comm),
                    "error": self.packet_handler.getRxPacketError(error),
                    "error_raw": f"0x{error:02X}",
                }
                if "Voltage" in reg_name:
                    entry["volts"] = round(value / 10.0, 2)
                diagnostics[id_][reg_name] = entry

        return diagnostics

    def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
        self._assert_protocol_is_compatible("broadcast_ping")
        for n_try in range(1 + num_retry):
            ids_status, comm = self._broadcast_ping()
            if self._is_comm_success(comm):
                break
            logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")
            logger.debug(self.packet_handler.getTxRxResult(comm))

        if not self._is_comm_success(comm):
            if raise_on_error:
                raise ConnectionError(self.packet_handler.getTxRxResult(comm))
            return None

        ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
        if ids_errors:
            display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}
            logger.error(f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}")

            voltage_error_ids = [id_ for id_, err in ids_errors.items() if self._has_voltage_error(err)]
            if voltage_error_ids:
                logger.error(
                    "Voltage-related motor error(s) detected on ids=%s. "
                    "Collecting additional read-only diagnostics...",
                    voltage_error_ids,
                )
                diagnostics = self._read_voltage_diagnostics(voltage_error_ids)
                logger.error("Additional voltage diagnostics:\n%s", pformat(diagnostics, indent=4))

        return self._read_model_number(list(ids_status), raise_on_error)

    def _read_firmware_version(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, str]:
        firmware_versions = {}
        for id_ in motor_ids:
            firm_ver_major, comm, error = self._read(
                *FIRMWARE_MAJOR_VERSION, id_, raise_on_error=raise_on_error
            )
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            firm_ver_minor, comm, error = self._read(
                *FIRMWARE_MINOR_VERSION, id_, raise_on_error=raise_on_error
            )
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            firmware_versions[id_] = f"{firm_ver_major}.{firm_ver_minor}"

        return firmware_versions

    def _read_model_number(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]:
        model_numbers = {}
        for id_ in motor_ids:
            model_nb, comm, error = self._read(*MODEL_NUMBER, id_, raise_on_error=raise_on_error)
            if not self._is_comm_success(comm) or self._is_error(error):
                continue

            model_numbers[id_] = model_nb

        return model_numbers
```

## Assistant

## Assistant

### Tool: read

**Args:**
```json
{
  "limit": 120,
  "offset": 110,
  "path": "lerobot/tests/motors/test_feetech.py"
}
```

## Assistant

**Result** (success):
```
def test_abc_implementation(dummy_motors):
    """Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
    FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors)


@pytest.mark.parametrize("id_", [1, 2, 3])
def test_ping(id_, mock_motors, dummy_motors):
    expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model]
    addr, length = MODEL_NUMBER
    ping_stub = mock_motors.build_ping_stub(id_)
    mobel_nb_stub = mock_motors.build_read_stub(addr, length, id_, expected_model_nb)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    ping_model_nb = bus.ping(id_)

    assert ping_model_nb == expected_model_nb
    assert mock_motors.stubs[ping_stub].called
    assert mock_motors.stubs[mobel_nb_stub].called


def test_broadcast_ping(mock_motors, dummy_motors):
    models = {m.id: m.model for m in dummy_motors.values()}
    addr, length = MODEL_NUMBER
    ping_stub = mock_motors.build_broadcast_ping_stub(list(models))
    mobel_nb_stubs = []
    expected_model_nbs = {}
    for id_, model in models.items():
        model_nb = MODEL_NUMBER_TABLE[model]
        stub = mock_motors.build_read_stub(addr, length, id_, model_nb)
        expected_model_nbs[id_] = model_nb
        mobel_nb_stubs.append(stub)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    ping_model_nbs = bus.broadcast_ping()

    assert ping_model_nbs == expected_model_nbs
    assert mock_motors.stubs[ping_stub].called
    assert all(mock_motors.stubs[stub].called for stub in mobel_nb_stubs)


def test_broadcast_ping_voltage_error_logs_extra_diagnostics(caplog, mock_motors, dummy_motors):
    id_ = 1

    # First model-number read pass done by broadcast_ping; keep error bit set to mimic undervoltage/overvoltage.
    model_addr, model_len = MODEL_NUMBER
    model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model]
    model_read_stub = mock_motors.build_read_stub(model_addr, model_len, id_, model_nb, error=scs.ERRBIT_VOLTAGE)

    status_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Status"], id_, 1)
    pv_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Present_Voltage"], id_, 122)
    min_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Min_Voltage_Limit"], id_, 40)
    max_v_read_stub = mock_motors.build_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Voltage_Limit"], id_, 80)

    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)
    bus._broadcast_ping = MagicMock(return_value=({id_: scs.ERRBIT_VOLTAGE}, scs.COMM_SUCCESS))

    with caplog.at_level("ERROR"):
        bus.broadcast_ping()

    assert mock_motors.stubs[model_read_stub].called
    assert mock_motors.stubs[status_read_stub].called
    assert mock_motors.stubs[pv_read_stub].called
    assert mock_motors.stubs[min_v_read_stub].called
    assert mock_motors.stubs[max_v_read_stub].called

    assert "Some motors found returned an error status" in caplog.text
    assert "Collecting additional read-only diagnostics" in caplog.text
    assert "Additional voltage diagnostics" in caplog.text
    assert "Present_Voltage" in caplog.text


@pytest.mark.parametrize(
    "addr, length, id_, value",
    [
        (0, 1, 1, 2),
        (10, 2, 2, 999),
        (42, 4, 3, 1337),
    ],
)
def test__read(addr, length, id_, value, mock_motors, dummy_motors):
    stub = mock_motors.build_read_stub(addr, length, id_, value)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    read_value, _, _ = bus._read(addr, length, id_)

    assert mock_motors.stubs[stub].called
    assert read_value == value


@pytest.mark.parametrize("raise_on_error", (True, False))
def test__read_error(raise_on_error, mock_motors, dummy_motors):
    addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE)
    stub = mock_motors.build_read_stub(addr, length, id_, value, error=error)
    bus = FeetechMotorsBus(
        port=mock_motors.port,
        motors=dummy_motors,
    )
    bus.connect(handshake=False)

    if raise_on_error:
        with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
            bus._read(addr, length, id_, raise_on_error=raise_on_error)
    else:
        _, _, read_error = bus._read(addr, length, id_, raise_on_error=raise_on_error)

[266 more lines in file. Use offset=230 to continue.]
```

## Assistant

## Assistant

Implemented ✅ — I added a PR-ready change in LeRobot to do exactly what you asked:

## What changed

### 1) Keep current error UX first
In `broadcast_ping()`, it still logs the original message first:

- `"Some motors found returned an error status: ..."`

### 2) Then run a second pass only for voltage errors
If any motor has `ERRBIT_VOLTAGE`, it now logs:

- that it detected voltage-related errors
- that it is collecting additional **read-only** diagnostics

Then it performs extra register reads (second round trip) and logs:

- `Status`
- `Present_Voltage`
- `Min_Voltage_Limit`
- `Max_Voltage_Limit`

with raw comm/error fields and derived volts.

So this is exactly the staged behavior you requested:
1. surface first error
2. then fetch & display richer info

---

## Files updated

- `lerobot/src/lerobot/motors/feetech/feetech.py`
  - Added `VOLTAGE_DIAGNOSTIC_REGISTERS`
  - Added `_has_voltage_error(...)`
  - Added `_read_voltage_diagnostics(...)`
  - Updated `broadcast_ping(...)` to do two-stage reporting

- `lerobot/tests/motors/test_feetech.py`
  - Added `test_broadcast_ping_voltage_error_logs_extra_diagnostics(...)`

---

## Note on round trips

You were exactly right to ask this:  
The initial broadcast ping status packet does **not** include min/max/present voltage register values.  
So we must do a **second read pass** to fetch those details.

---

## Validation

I compiled modified files successfully (`py_compile`), but couldn’t run full pytest locally because this environment is missing test tooling/deps (`pytest`, `serial` in that test env).

---

If you want, I can also draft the exact PR title + description text for you to paste into GitHub.

## User

And how did you come up with these values and all this information and shit? How did you find all of this that you told me about? Can you make me just a really quick bullet point of sources?

## Assistant

Yep — quick source list:

- **CLI entrypoint**
  - `lerobot/pyproject.toml`  
    - `lerobot-setup-motors="lerobot.scripts.lerobot_setup_motors:main"`

- **Command flow**
  - `lerobot/src/lerobot/scripts/lerobot_setup_motors.py`
  - `lerobot/src/lerobot/robots/so_follower/so_follower.py` (`setup_motors`)
  - `lerobot/src/lerobot/motors/motors_bus.py` (`setup_motor`)

- **Where your exact log comes from**
  - `lerobot/src/lerobot/motors/feetech/feetech.py` (`broadcast_ping`)
  - It logs `Some motors found returned an error status: ...`

- **Why message string is exactly that**
  - `.venv/lib/python3.12/site-packages/scservo_sdk/protocol_packet_handler.py`
    - `ERRBIT_VOLTAGE = 1`
    - `getRxPacketError()` → `"[RxPacketError] Input voltage error!"`

- **Where voltage registers/addresses come from**
  - `lerobot/src/lerobot/motors/feetech/tables.py`
    - `Present_Voltage` = `(62, 1)`
    - `Min_Voltage_Limit` = `(15, 1)`
    - `Max_Voltage_Limit` = `(14, 1)`
    - `Status` = `(65, 1)`

- **Evidence in tests that voltage error is expected behavior**
  - `lerobot/tests/motors/test_feetech.py`
    - asserts on `"[RxPacketError] Input voltage error!"` for `ERRBIT_VOLTAGE`

- **Hardware confirmation from your actual device**
  - Run output from `scripts/inspect_feetech_voltage_readonly.py` on your port:
    - `Present_Voltage = 122` (~12.2V)
    - `Max_Voltage_Limit = 80` (~8.0V)
    - error raw `0x01` (voltage bit)
