Intro

This is demonstration of a ROS 2 interface to an Arduino running differential-drive motor control code.

Components

The serial_motor_demo package consists of two nodes, driver.py and gui.py. The idea is that the driver can be run on an onboard PC inside a robot (e.g. a Raspberry Pi), interfacing with the lower-level hardware. The driver exposes motor control through ROS topics (see below), which are to be published by the user’s software.

The GUI provides a simple interface for development and testing of such a system. It publishes and subscribes to the appropriate topics.

Driver configuration & usage

The driver has a few parameters:

  • encoder_cpr - Encoder counts per revolution
  • loop_rate - Execution rate of the Arduino code (see Arduino side documentation for details)
  • serial_port - Serial port to connect to (default /dev/ttyUSB0)
  • baud_rate - Serial baud rate (default 57600)
  • serial_debug - Enables debugging of serial commands (default false)

To run, e.g.

ros2 run serial_motor_demo driver --ros-args -p encoder_cpr:=3440 -p loop_rate:=30 -p serial_port:=/dev/ttyUSB0 -p baud_rate:=57600

It makes use of the following topics

  • motor_command - Subscribes a MotorCommand, in rads/sec for each of the two motors
  • motor_vels - Publishes a MotorVels, motor velocities in rads/sec
  • encoder_vals - Publishes an EncoderVals, raw encoder counts for each motor

GUI Usage

Has two modes, one for raw PWM input (-255 to 255) and one for closed-loop control. In this mode you must first set the limits for the sliders.


driver.py

driver.py
import rclpy
from rclpy.node import Node
from serial_motor_demo_msgs.msg import MotorCommand
from serial_motor_demo_msgs.msg import MotorVels
from serial_motor_demo_msgs.msg import EncoderVals
import time
import math
import serial
from threading import Lock
 
 
 
class MotorDriver(Node):
 
    def __init__(self):
        super().__init__('motor_driver')
 
 
        # Setup parameters
 
        self.declare_parameter('encoder_cpr', value=0)
        if (self.get_parameter('encoder_cpr').value == 0):
            print("WARNING! ENCODER CPR SET TO 0!!")
 
 
        self.declare_parameter('loop_rate', value=0)
        if (self.get_parameter('loop_rate').value == 0):
            print("WARNING! LOOP RATE SET TO 0!!")
 
 
        self.declare_parameter('serial_port', value="/dev/ttyUSB0")
        self.serial_port = self.get_parameter('serial_port').value
 
 
        self.declare_parameter('baud_rate', value=57600)
        self.baud_rate = self.get_parameter('baud_rate').value
 
 
        self.declare_parameter('serial_debug', value=False)
        self.debug_serial_cmds = self.get_parameter('serial_debug').value
        if (self.debug_serial_cmds):
            print("Serial debug enabled")
 
 
 
        # Setup topics & services
 
        self.subscription = self.create_subscription(
            MotorCommand,
            'motor_command',
            self.motor_command_callback,
            10)
 
        self.speed_pub = self.create_publisher(MotorVels, 'motor_vels', 10)
 
        self.encoder_pub = self.create_publisher(EncoderVals, 'encoder_vals', 10)
        
 
        # Member Variables
 
        self.last_enc_read_time = time.time()
        self.last_m1_enc = 0
        self.last_m2_enc = 0
        self.m1_spd = 0.0
        self.m2_spd = 0.0
 
        self.mutex = Lock()
 
 
        # Open serial comms
 
        print(f"Connecting to port {self.serial_port} at {self.baud_rate}.")
        self.conn = serial.Serial(self.serial_port, self.baud_rate, timeout=1.0)
        print(f"Connected to {self.conn}")
        
 
        
 
 
    # Raw serial commands
    
    def send_pwm_motor_command(self, mot_1_pwm, mot_2_pwm):
        self.send_command(f"o {int(mot_1_pwm)} {int(mot_2_pwm)}")
 
    def send_feedback_motor_command(self, mot_1_ct_per_loop, mot_2_ct_per_loop):
        self.send_command(f"m {int(mot_1_ct_per_loop)} {int(mot_2_ct_per_loop)}")
 
    def send_encoder_read_command(self):
        resp = self.send_command(f"e")
        if resp:
            return [int(raw_enc) for raw_enc in resp.split()]
        return []
 
 
    # More user-friendly functions
 
    def motor_command_callback(self, motor_command):
        if (motor_command.is_pwm):
            self.send_pwm_motor_command(motor_command.mot_1_req_rad_sec, motor_command.mot_2_req_rad_sec)
        else:
            # counts per loop = req rads/sec X revs/rad X counts/rev X secs/loop 
            scaler = (1 / (2*math.pi)) * self.get_parameter('encoder_cpr').value * (1 / self.get_parameter('loop_rate').value)
            mot1_ct_per_loop = motor_command.mot_1_req_rad_sec * scaler
            mot2_ct_per_loop = motor_command.mot_2_req_rad_sec * scaler
            self.send_feedback_motor_command(mot1_ct_per_loop, mot2_ct_per_loop)
 
    def check_encoders(self):
        resp = self.send_encoder_read_command()
        if (resp):
 
            new_time = time.time()
            time_diff = new_time - self.last_enc_read_time
            self.last_enc_read_time = new_time
 
            m1_diff = resp[0] - self.last_m1_enc
            self.last_m1_enc = resp[0]
            m2_diff = resp[1] - self.last_m2_enc
            self.last_m2_enc = resp[1]
 
            rads_per_ct = 2*math.pi/self.get_parameter('encoder_cpr').value
            self.m1_spd = m1_diff*rads_per_ct/time_diff
            self.m2_spd = m2_diff*rads_per_ct/time_diff
 
            spd_msg = MotorVels()
            spd_msg.mot_1_rad_sec = self.m1_spd
            spd_msg.mot_2_rad_sec = self.m2_spd
            self.speed_pub.publish(spd_msg)
 
            enc_msg = EncoderVals()
            enc_msg.mot_1_enc_val = self.last_m1_enc
            enc_msg.mot_2_enc_val = self.last_m2_enc
            self.encoder_pub.publish(enc_msg)
 
 
 
    # Utility functions
 
    def send_command(self, cmd_string):
        
        self.mutex.acquire()
        try:
            cmd_string += "\r"
            self.conn.write(cmd_string.encode("utf-8"))
            if (self.debug_serial_cmds):
                print("Sent: " + cmd_string)
 
            ## Adapted from original
            c = ''
            value = ''
            while c != '\r':
                c = self.conn.read(1).decode("utf-8")
                if (c == ''):
                    print("Error: Serial timeout on command: " + cmd_string)
                    return ''
                value += c
 
            value = value.strip('\r')
 
            if (self.debug_serial_cmds):
                print("Received: " + value)
            return value
        finally:
            self.mutex.release()
 
    def close_conn(self):
        self.conn.close()
 
 
 
def main(args=None):
    
    rclpy.init(args=args)
 
    motor_driver = MotorDriver()
 
    rate = motor_driver.create_rate(2)
    while rclpy.ok():
        rclpy.spin_once(motor_driver)
        motor_driver.check_encoders()
 
 
    motor_driver.close_conn()
    motor_driver.destroy_node()
    rclpy.shutdown()
 
 

Subscribed Topics:

  • /motor_command (type: MotorCommand) Receives either PWM values or desired angular velocity for each motor.

Published Topics:

  • /motor_vels (type: MotorVels) Publishes calculated motor velocities (radians/sec) based on encoder deltas.

  • /encoder_vals (type: EncoderVals) Publishes raw encoder counts.

gui.py

gui.py
import rclpy
from rclpy.node import Node
import time
from tkinter import *
import math
 
from serial_motor_demo_msgs.msg import MotorCommand
from serial_motor_demo_msgs.msg import MotorVels
from serial_motor_demo_msgs.msg import EncoderVals
 
 
class MotorGui(Node):
 
    def __init__(self):
        super().__init__('motor_gui')
 
        self.publisher = self.create_publisher(MotorCommand, 'motor_command', 10)
 
        self.speed_sub = self.create_subscription(
            MotorVels,
            'motor_vels',
            self.motor_vel_callback,
            10)
 
        self.encoder_sub = self.create_subscription(
            EncoderVals,
            'encoder_vals',
            self.encoder_val_callback,
            10)
 
        self.tk = Tk()
        self.tk.title("Serial Motor GUI")
        root = Frame(self.tk)
        root.pack(fill=BOTH, expand=True)
 
        
 
        Label(root, text="Serial Motor GUI").pack()
 
        mode_frame = Frame(root)
        mode_frame.pack(fill=X)
 
 
        self.mode_lbl = Label(mode_frame, text="ZZZZ")
        self.mode_lbl.pack(side=LEFT)
        self.mode_btn = Button(mode_frame, text="ZZZZ", command=self.switch_mode)
        self.mode_btn.pack(expand=True)
 
 
        slider_max_frame = Frame(root)
        slider_max_frame.pack(fill=X)
        self.slider_max_label = Label(slider_max_frame, text="Max Rev/sec", state="disabled")
        self.slider_max_label.pack(side=LEFT)
        self.slider_max_val_box = Entry(slider_max_frame, state="disabled")
        self.slider_max_val_box.pack(side=LEFT)
        self.max_val_update_btn = Button(slider_max_frame, text='Update', command=self.update_scale_limits, state="disabled")
        self.max_val_update_btn.pack(side=LEFT)
 
 
        m1_frame = Frame(root)
        m1_frame.pack(fill=X)
        Label(m1_frame, text="Motor 1").pack(side=LEFT)
        self.m1 = Scale(m1_frame, from_=-255, to=255, orient=HORIZONTAL)
        self.m1.pack(side=LEFT, fill=X, expand=True)
 
        m2_frame = Frame(root)
        m2_frame.pack(fill=X)
        Label(m2_frame, text="Motor 2").pack(side=LEFT)
        self.m2 = Scale(m2_frame, from_=-255, to=255, resolution=1, orient=HORIZONTAL)
        self.m2.pack(side=LEFT, fill=X, expand=True)
 
        self.m2.config(to=10)
 
        motor_btns_frame = Frame(root)
        motor_btns_frame.pack()
        Button(motor_btns_frame, text='Send Once', command=self.send_motor_once).pack(side=LEFT)
        Button(motor_btns_frame, text='Send Cont.', command=self.show_values, state="disabled").pack(side=LEFT)
        Button(motor_btns_frame, text='Stop Send', command=self.show_values, state="disabled").pack(side=LEFT)
        Button(motor_btns_frame, text='Stop Mot', command=self.stop_motors).pack(side=LEFT)
        
 
        enc_frame = Frame(root)
        enc_frame.pack(fill=X)
 
        self.enc_lbl = Label(enc_frame, text="Encoders: ")
        self.enc_lbl.pack(side=LEFT)
        self.mot_1_enc_lbl = Label(enc_frame, text="XXX")
        self.mot_1_enc_lbl.pack(side=LEFT)
        self.mot_2_enc_lbl = Label(enc_frame, text="XXX")
        self.mot_2_enc_lbl.pack(side=LEFT)
 
        speed_frame = Frame(root)
        speed_frame.pack(fill=X)
 
        self.spd_lbl = Label(enc_frame, text="Speed rev/s: ")
        self.spd_lbl.pack(side=LEFT)
        self.mot_1_spd_lbl = Label(enc_frame, text="XXX")
        self.mot_1_spd_lbl.pack(side=LEFT)
        self.mot_2_spd_lbl = Label(enc_frame, text="XXX")
        self.mot_2_spd_lbl.pack(side=LEFT)
 
 
        self.set_mode(True)
 
 
    def show_values(self):
        print (self.m1.get(), self.m2.get())
 
    def send_motor_once(self):
        msg = MotorCommand()
        msg.is_pwm = self.pwm_mode
        if (self.pwm_mode):
            msg.mot_1_req_rad_sec = float(self.m1.get())
            msg.mot_2_req_rad_sec = float(self.m2.get())
        else:
            msg.mot_1_req_rad_sec = float(self.m1.get()*2*math.pi)
            msg.mot_2_req_rad_sec = float(self.m2.get()*2*math.pi)
 
        self.publisher.publish(msg)
 
    def stop_motors(self):
        msg = MotorCommand()
        msg.is_pwm = self.pwm_mode
        msg.mot_1_req_rad_sec = 0.0
        msg.mot_2_req_rad_sec = 0.0
        self.publisher.publish(msg)
 
    def set_mode(self, new_mode):
        self.pwm_mode = new_mode
        if (self.pwm_mode):
            self.mode_lbl.config(text="Current Mode: PWM")
            self.mode_btn.config(text="Switch to Feedback Mode")
            self.slider_max_label.config(state="disabled")
            self.slider_max_val_box.config(state="disabled")
            self.max_val_update_btn.config(state="disabled")
        else:
            self.mode_lbl.config(text="Current Mode: Feedback")
            self.mode_btn.config(text="Switch to PWM Mode")
            self.slider_max_label.config(state="normal")
            self.slider_max_val_box.config(state="normal")
            self.max_val_update_btn.config(state="normal")
 
        self.update_scale_limits()
 
    def motor_vel_callback(self, motor_vels):
        mot_1_spd_rev_sec = motor_vels.mot_1_rad_sec / (2*math.pi)
        mot_2_spd_rev_sec = motor_vels.mot_2_rad_sec / (2*math.pi)
        self.mot_1_spd_lbl.config(text=f"{mot_1_spd_rev_sec:.2f}")
        self.mot_2_spd_lbl.config(text=f"{mot_2_spd_rev_sec:.2f}")
 
    def encoder_val_callback(self, encoder_vals):
        self.mot_1_enc_lbl.config(text=f"{encoder_vals.mot_1_enc_val}")
        self.mot_2_enc_lbl.config(text=f"{encoder_vals.mot_2_enc_val}")
 
 
 
    def switch_mode(self):
        self.set_mode(not self.pwm_mode)
 
    def update_scale_limits(self):
        if (self.pwm_mode):
            self.m1.config(from_=-255, to=255, resolution=1)
            self.m2.config(from_=-255, to=255, resolution=1)
        else:
            lim = float(self.slider_max_val_box.get())
            self.m1.config(from_=-lim, to=lim, resolution=0.1)
            self.m2.config(from_=-lim, to=lim, resolution=0.1)
 
 
 
    def update(self):
        self.tk.update()
 
 
 
 
 
def main(args=None):
    
    rclpy.init(args=args)
 
    motor_gui = MotorGui()
 
    rate = motor_gui.create_rate(20)    
    while rclpy.ok():
        rclpy.spin_once(motor_gui)
        motor_gui.update()
 
 
    motor_gui.destroy_node()
    rclpy.shutdown()

Subscribed Topics:

  • /motor_vels (type: MotorVels) receives angular speeds of both motors.

  • /encoder_vals (type: EncoderVals) receives encoder count feedback.

Published Topics:

  • /motor_command (type: MotorCommand) sends commands to the motor driver node (in PWM or velocity mode).