ros1-development

SKILL.md

ROS1 Development Skill

When to Use This Skill

  • Building or maintaining ROS1 packages and nodes
  • Writing launch files, message types, or services
  • Debugging ROS1 communication (topics, services, actions)
  • Configuring catkin workspaces and build systems
  • Working with tf/tf2 transforms, URDF, or robot models
  • Using actionlib for long-running tasks
  • Optimizing nodelets for zero-copy transport
  • Planning ROS1 → ROS2 migration

Core Architecture Principles

1. Node Design

Single Responsibility Nodes: Each node should do ONE thing well. Resist the temptation to build monolithic "do-everything" nodes.

# BAD: Monolithic node
class RobotNode:
    def __init__(self):
        self.sub_camera = rospy.Subscriber('/camera/image', Image, self.camera_cb)
        self.sub_lidar = rospy.Subscriber('/lidar/points', PointCloud2, self.lidar_cb)
        self.pub_cmd = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
        self.pub_map = rospy.Publisher('/map', OccupancyGrid, queue_size=1)
        # This node does perception, planning, AND control

# GOOD: Decomposed nodes
class PerceptionNode:    # Fuses sensor data → publishes /obstacles
class PlannerNode:       # Subscribes /obstacles → publishes /path
class ControllerNode:    # Subscribes /path → publishes /cmd_vel

Node Initialization Pattern:

#!/usr/bin/env python
import rospy
from std_msgs.msg import String

class MyNode:
    def __init__(self):
        rospy.init_node('my_node', anonymous=False)

        # 1. Load parameters FIRST
        self.rate = rospy.get_param('~rate', 10.0)
        self.frame_id = rospy.get_param('~frame_id', 'base_link')

        # 2. Set up publishers BEFORE subscribers
        #    (prevents callbacks firing before publisher is ready)
        self.pub = rospy.Publisher('~output', String, queue_size=10)

        # 3. Set up subscribers LAST
        self.sub = rospy.Subscriber('~input', String, self.callback)

        rospy.loginfo(f"[{rospy.get_name()}] Initialized with rate={self.rate}")

    def callback(self, msg):
        # Process and republish
        result = String(data=msg.data.upper())
        self.pub.publish(result)

    def run(self):
        rate = rospy.Rate(self.rate)
        while not rospy.is_shutdown():
            # Periodic work here
            rate.sleep()

if __name__ == '__main__':
    try:
        node = MyNode()
        node.run()
    except rospy.ROSInterruptException:
        pass

2. Topic Design

Naming Conventions:

/robot_name/sensor_type/data_type

# Examples:
/ur5/joint_states              # Robot joint states
/realsense/color/image_raw     # Camera color image
/realsense/depth/points        # Depth point cloud
/mobile_base/cmd_vel           # Velocity commands
/gripper/command               # Gripper commands

Queue Sizes Matter:

# For sensor data (high frequency, OK to drop old messages):
rospy.Subscriber('/camera/image', Image, self.cb, queue_size=1)

# For commands (don't want to miss any):
rospy.Publisher('/cmd_vel', Twist, queue_size=10)

# For large data (point clouds, images) - use small queues to prevent memory bloat:
rospy.Subscriber('/lidar/points', PointCloud2, self.cb, queue_size=1)

# NEVER use queue_size=0 (infinite) for high-frequency topics
# This WILL cause memory leaks under load

Latched Topics for data that changes infrequently:

# Robot description, static maps, calibration data
pub = rospy.Publisher('/robot_description', String, queue_size=1, latch=True)

3. Launch File Best Practices

<launch>
  <!-- ALWAYS use args for configurability -->
  <arg name="robot_name" default="ur5"/>
  <arg name="sim" default="false"/>
  <arg name="debug" default="false"/>

  <!-- Group by subsystem with namespaces -->
  <group ns="$(arg robot_name)">

    <!-- Conditional loading based on sim vs real -->
    <group if="$(arg sim)">
      <include file="$(find my_pkg)/launch/sim_drivers.launch"/>
    </group>
    <group unless="$(arg sim)">
      <include file="$(find my_pkg)/launch/real_drivers.launch"/>
    </group>

    <!-- Node with proper remapping -->
    <node pkg="my_pkg" type="perception_node.py" name="perception"
          output="screen" respawn="true" respawn_delay="5">
      <param name="rate" value="30.0"/>
      <param name="frame_id" value="$(arg robot_name)_base_link"/>
      <remap from="~input_image" to="/$(arg robot_name)/camera/image_raw"/>
      <remap from="~output_detections" to="detections"/>
      <!-- Load a YAML param file -->
      <rosparam file="$(find my_pkg)/config/perception.yaml" command="load"/>
    </node>

  </group>

  <!-- Debug tools (conditionally loaded) -->
  <group if="$(arg debug)">
    <node pkg="rviz" type="rviz" name="rviz"
          args="-d $(find my_pkg)/rviz/debug.rviz"/>
    <node pkg="rqt_graph" type="rqt_graph" name="rqt_graph"/>
  </group>
</launch>

4. TF Transform Tree

Rules:

  • Every frame has EXACTLY one parent (tree, not graph)
  • Static transforms use static_transform_publisher
  • Dynamic transforms publish at consistent rates
  • ALWAYS set timestamps correctly
import tf2_ros

# Publishing transforms
br = tf2_ros.TransformBroadcaster()
t = TransformStamped()
t.header.stamp = rospy.Time.now()  # CRITICAL: Use current time
t.header.frame_id = "odom"
t.child_frame_id = "base_link"
t.transform.translation.x = x
t.transform.translation.y = y
t.transform.rotation = quaternion_from_euler(0, 0, theta)
br.sendTransform(t)

# Listening for transforms (with timeout and exception handling)
tf_buffer = tf2_ros.Buffer()
listener = tf2_ros.TransformListener(tf_buffer)

try:
    trans = tf_buffer.lookup_transform(
        'map', 'base_link',
        rospy.Time(0),          # Get latest available
        rospy.Duration(1.0)     # Wait up to 1 second
    )
except (tf2_ros.LookupException,
        tf2_ros.ConnectivityException,
        tf2_ros.ExtrapolationException) as e:
    rospy.logwarn(f"TF lookup failed: {e}")

5. Actionlib for Long-Running Tasks

import actionlib
from my_msgs.msg import PickPlaceAction, PickPlaceGoal, PickPlaceResult

# Server
class PickPlaceServer:
    def __init__(self):
        self.server = actionlib.SimpleActionServer(
            'pick_place',
            PickPlaceAction,
            execute_cb=self.execute,
            auto_start=False  # ALWAYS set auto_start=False
        )
        self.server.start()

    def execute(self, goal):
        feedback = PickPlaceFeedback()

        # Check for preemption INSIDE your loop
        for step in self.plan_steps(goal):
            if self.server.is_preempt_requested():
                self.server.set_preempted()
                return
            self.execute_step(step)
            feedback.progress = step.progress
            self.server.publish_feedback(feedback)

        result = PickPlaceResult(success=True)
        self.server.set_succeeded(result)

Common Pitfalls & Failure Modes

Time Synchronization

# BAD: Comparing timestamps from different clocks
if camera_msg.header.stamp == lidar_msg.header.stamp:  # Almost never true

# GOOD: Use message_filters for approximate time sync
import message_filters
sub_cam = message_filters.Subscriber('/camera/image', Image)
sub_lidar = message_filters.Subscriber('/lidar/points', PointCloud2)
sync = message_filters.ApproximateTimeSynchronizer(
    [sub_cam, sub_lidar], queue_size=10, slop=0.05  # 50ms tolerance
)
sync.registerCallback(self.synced_callback)

Callback Threading

# ROS1 uses a single-threaded spinner by default.
# Long-running callbacks BLOCK all other callbacks.

# BAD:
def callback(self, msg):
    result = self.expensive_computation(msg)  # Blocks for 2 seconds!
    self.pub.publish(result)

# GOOD: Use a MultiThreadedSpinner or process in a separate thread
rospy.init_node('my_node')
# ... setup ...
spinner = rospy.MultiThreadedSpinner(num_threads=4)
spinner.spin()

# Or use a processing thread:
import threading, queue
class MyNode:
    def __init__(self):
        self.work_queue = queue.Queue(maxsize=1)
        self.worker = threading.Thread(target=self._process_loop, daemon=True)
        self.worker.start()

    def callback(self, msg):
        try:
            self.work_queue.put_nowait(msg)  # Non-blocking
        except queue.Full:
            pass  # Drop old data

    def _process_loop(self):
        while not rospy.is_shutdown():
            msg = self.work_queue.get()
            result = self.expensive_computation(msg)
            self.pub.publish(result)

Parameter Server Anti-Patterns

# BAD: Hardcoded values
self.threshold = 0.5

# BAD: Global params without namespace
self.threshold = rospy.get_param('threshold', 0.5)  # Collides across nodes

# GOOD: Private params with defaults
self.threshold = rospy.get_param('~threshold', 0.5)

# GOOD: Dynamic reconfigure for runtime tuning
from dynamic_reconfigure.server import Server
from my_pkg.cfg import MyNodeConfig
self.dyn_server = Server(MyNodeConfig, self.dyn_callback)

Nodelets for Zero-Copy Transport

When nodes exchange large data (images, point clouds) within the same process, nodelets eliminate serialization overhead:

// my_nodelet.h
#include <nodelet/nodelet.h>
#include <pluginlib/class_list_macros.h>

class MyNodelet : public nodelet::Nodelet {
  virtual void onInit() {
    ros::NodeHandle& nh = getNodeHandle();
    ros::NodeHandle& pnh = getPrivateNodeHandle();
    // Use shared_ptr for zero-copy: pass pointers, not copies
    pub_ = nh.advertise<sensor_msgs::Image>("output", 1);
    sub_ = nh.subscribe("input", 1, &MyNodelet::callback, this);
  }
};
PLUGINLIB_EXPORT_CLASS(MyNodelet, nodelet::Nodelet)

Package Structure

my_robot_pkg/
├── CMakeLists.txt
├── package.xml
├── setup.py                    # For Python packages
├── config/
│   ├── robot_params.yaml       # Default parameters
│   └── dynamic_reconfigure/    # .cfg files
├── launch/
│   ├── robot.launch            # Top-level launcher
│   ├── drivers.launch          # Hardware drivers
│   └── perception.launch       # Perception pipeline
├── msg/                        # Custom message definitions
│   └── Detection.msg
├── srv/                        # Service definitions
│   └── GetPose.srv
├── action/                     # Action definitions
│   └── PickPlace.action
├── src/                        # C++ source
│   └── my_node.cpp
├── scripts/                    # Python nodes (executable)
│   └── perception_node.py
├── include/my_robot_pkg/       # C++ headers
│   └── my_node.h
├── rviz/                       # RViz configs
│   └── debug.rviz
├── urdf/                       # Robot model
│   └── robot.urdf.xacro
└── test/                       # Unit and integration tests
    ├── test_perception.py
    └── test_perception.test    # rostest launch file

Debugging Toolkit

# Essential diagnostic commands
rostopic list                     # See all active topics
rostopic hz /camera/image_raw     # Check publish rate
rostopic bw /lidar/points         # Check bandwidth
rostopic echo /joint_states -n 1  # Inspect one message

rosnode list                      # Active nodes
rosnode info /perception          # Connections and subscriptions

roswtf                            # Automated diagnostics

rqt_graph                         # Visual node/topic graph
rqt_console                       # Log viewer with filtering

# TF debugging
rosrun tf tf_monitor              # Monitor TF tree health
rosrun tf view_frames             # Generate TF tree PDF
rosrun tf tf_echo map base_link   # Print transform continuously

# Bag file operations
rosbag record -a                  # Record everything (careful with disk!)
rosbag record /camera/image /tf   # Record specific topics
rosbag info recording.bag         # Inspect bag contents
rosbag play recording.bag --clock # Playback with simulated time

ROS1 → ROS2 Migration Checklist

When planning a migration, note these key differences:

  • rospyrclpy, roscpprclcpp
  • catkin_makecolcon build
  • roslaunch XML → ROS2 Python launch files
  • Global parameter server → Per-node parameters
  • rospy.Ratenode.create_timer()
  • Single roscore → DDS discovery (no central master)
  • message_filters works in both, but API differs
  • Custom messages: same .msg format, different build system
  • Nodelets → ROS2 Components (intra-process communication)
  • dynamic_reconfigure → ROS2 parameters with callbacks

Start migration from leaf nodes (sensors, actuators) and work inward. Use the ros1_bridge package to run both stacks simultaneously during transition.

Weekly Installs
8
GitHub Stars
149
First Seen
Mar 3, 2026
Installed on
codex7
github-copilot6
kimi-cli6
gemini-cli6
amp6
cline6