Intro
This is demonstration of a ROS 2 interface to an Arduino running differential-drive motor control code.
Components
The serial_motor_demo package consists of two nodes, driver.py and gui.py. The idea is that the driver can be run on an onboard PC inside a robot (e.g. a Raspberry Pi), interfacing with the lower-level hardware. The driver exposes motor control through ROS topics (see below), which are to be published by the user’s software.
The GUI provides a simple interface for development and testing of such a system. It publishes and subscribes to the appropriate topics.
Driver configuration & usage
The driver has a few parameters:
encoder_cpr- Encoder counts per revolutionloop_rate- Execution rate of the Arduino code (see Arduino side documentation for details)serial_port- Serial port to connect to (default/dev/ttyUSB0)baud_rate- Serial baud rate (default57600)serial_debug- Enables debugging of serial commands (defaultfalse)
To run, e.g.
ros2 run serial_motor_demo driver --ros-args -p encoder_cpr:=3440 -p loop_rate:=30 -p serial_port:=/dev/ttyUSB0 -p baud_rate:=57600
It makes use of the following topics
motor_command- Subscribes aMotorCommand, in rads/sec for each of the two motorsmotor_vels- Publishes aMotorVels, motor velocities in rads/secencoder_vals- Publishes anEncoderVals, raw encoder counts for each motor
GUI Usage
Has two modes, one for raw PWM input (-255 to 255) and one for closed-loop control. In this mode you must first set the limits for the sliders.
driver.py
driver.py
import rclpy
from rclpy.node import Node
from serial_motor_demo_msgs.msg import MotorCommand
from serial_motor_demo_msgs.msg import MotorVels
from serial_motor_demo_msgs.msg import EncoderVals
import time
import math
import serial
from threading import Lock
class MotorDriver(Node):
def __init__(self):
super().__init__('motor_driver')
# Setup parameters
self.declare_parameter('encoder_cpr', value=0)
if (self.get_parameter('encoder_cpr').value == 0):
print("WARNING! ENCODER CPR SET TO 0!!")
self.declare_parameter('loop_rate', value=0)
if (self.get_parameter('loop_rate').value == 0):
print("WARNING! LOOP RATE SET TO 0!!")
self.declare_parameter('serial_port', value="/dev/ttyUSB0")
self.serial_port = self.get_parameter('serial_port').value
self.declare_parameter('baud_rate', value=57600)
self.baud_rate = self.get_parameter('baud_rate').value
self.declare_parameter('serial_debug', value=False)
self.debug_serial_cmds = self.get_parameter('serial_debug').value
if (self.debug_serial_cmds):
print("Serial debug enabled")
# Setup topics & services
self.subscription = self.create_subscription(
MotorCommand,
'motor_command',
self.motor_command_callback,
10)
self.speed_pub = self.create_publisher(MotorVels, 'motor_vels', 10)
self.encoder_pub = self.create_publisher(EncoderVals, 'encoder_vals', 10)
# Member Variables
self.last_enc_read_time = time.time()
self.last_m1_enc = 0
self.last_m2_enc = 0
self.m1_spd = 0.0
self.m2_spd = 0.0
self.mutex = Lock()
# Open serial comms
print(f"Connecting to port {self.serial_port} at {self.baud_rate}.")
self.conn = serial.Serial(self.serial_port, self.baud_rate, timeout=1.0)
print(f"Connected to {self.conn}")
# Raw serial commands
def send_pwm_motor_command(self, mot_1_pwm, mot_2_pwm):
self.send_command(f"o {int(mot_1_pwm)} {int(mot_2_pwm)}")
def send_feedback_motor_command(self, mot_1_ct_per_loop, mot_2_ct_per_loop):
self.send_command(f"m {int(mot_1_ct_per_loop)} {int(mot_2_ct_per_loop)}")
def send_encoder_read_command(self):
resp = self.send_command(f"e")
if resp:
return [int(raw_enc) for raw_enc in resp.split()]
return []
# More user-friendly functions
def motor_command_callback(self, motor_command):
if (motor_command.is_pwm):
self.send_pwm_motor_command(motor_command.mot_1_req_rad_sec, motor_command.mot_2_req_rad_sec)
else:
# counts per loop = req rads/sec X revs/rad X counts/rev X secs/loop
scaler = (1 / (2*math.pi)) * self.get_parameter('encoder_cpr').value * (1 / self.get_parameter('loop_rate').value)
mot1_ct_per_loop = motor_command.mot_1_req_rad_sec * scaler
mot2_ct_per_loop = motor_command.mot_2_req_rad_sec * scaler
self.send_feedback_motor_command(mot1_ct_per_loop, mot2_ct_per_loop)
def check_encoders(self):
resp = self.send_encoder_read_command()
if (resp):
new_time = time.time()
time_diff = new_time - self.last_enc_read_time
self.last_enc_read_time = new_time
m1_diff = resp[0] - self.last_m1_enc
self.last_m1_enc = resp[0]
m2_diff = resp[1] - self.last_m2_enc
self.last_m2_enc = resp[1]
rads_per_ct = 2*math.pi/self.get_parameter('encoder_cpr').value
self.m1_spd = m1_diff*rads_per_ct/time_diff
self.m2_spd = m2_diff*rads_per_ct/time_diff
spd_msg = MotorVels()
spd_msg.mot_1_rad_sec = self.m1_spd
spd_msg.mot_2_rad_sec = self.m2_spd
self.speed_pub.publish(spd_msg)
enc_msg = EncoderVals()
enc_msg.mot_1_enc_val = self.last_m1_enc
enc_msg.mot_2_enc_val = self.last_m2_enc
self.encoder_pub.publish(enc_msg)
# Utility functions
def send_command(self, cmd_string):
self.mutex.acquire()
try:
cmd_string += "\r"
self.conn.write(cmd_string.encode("utf-8"))
if (self.debug_serial_cmds):
print("Sent: " + cmd_string)
## Adapted from original
c = ''
value = ''
while c != '\r':
c = self.conn.read(1).decode("utf-8")
if (c == ''):
print("Error: Serial timeout on command: " + cmd_string)
return ''
value += c
value = value.strip('\r')
if (self.debug_serial_cmds):
print("Received: " + value)
return value
finally:
self.mutex.release()
def close_conn(self):
self.conn.close()
def main(args=None):
rclpy.init(args=args)
motor_driver = MotorDriver()
rate = motor_driver.create_rate(2)
while rclpy.ok():
rclpy.spin_once(motor_driver)
motor_driver.check_encoders()
motor_driver.close_conn()
motor_driver.destroy_node()
rclpy.shutdown()
Subscribed Topics:
/motor_command(type:MotorCommand) Receives either PWM values or desired angular velocity for each motor.
Published Topics:
-
/motor_vels(type:MotorVels) Publishes calculated motor velocities (radians/sec) based on encoder deltas. -
/encoder_vals(type:EncoderVals) Publishes raw encoder counts.
gui.py
gui.py
import rclpy
from rclpy.node import Node
import time
from tkinter import *
import math
from serial_motor_demo_msgs.msg import MotorCommand
from serial_motor_demo_msgs.msg import MotorVels
from serial_motor_demo_msgs.msg import EncoderVals
class MotorGui(Node):
def __init__(self):
super().__init__('motor_gui')
self.publisher = self.create_publisher(MotorCommand, 'motor_command', 10)
self.speed_sub = self.create_subscription(
MotorVels,
'motor_vels',
self.motor_vel_callback,
10)
self.encoder_sub = self.create_subscription(
EncoderVals,
'encoder_vals',
self.encoder_val_callback,
10)
self.tk = Tk()
self.tk.title("Serial Motor GUI")
root = Frame(self.tk)
root.pack(fill=BOTH, expand=True)
Label(root, text="Serial Motor GUI").pack()
mode_frame = Frame(root)
mode_frame.pack(fill=X)
self.mode_lbl = Label(mode_frame, text="ZZZZ")
self.mode_lbl.pack(side=LEFT)
self.mode_btn = Button(mode_frame, text="ZZZZ", command=self.switch_mode)
self.mode_btn.pack(expand=True)
slider_max_frame = Frame(root)
slider_max_frame.pack(fill=X)
self.slider_max_label = Label(slider_max_frame, text="Max Rev/sec", state="disabled")
self.slider_max_label.pack(side=LEFT)
self.slider_max_val_box = Entry(slider_max_frame, state="disabled")
self.slider_max_val_box.pack(side=LEFT)
self.max_val_update_btn = Button(slider_max_frame, text='Update', command=self.update_scale_limits, state="disabled")
self.max_val_update_btn.pack(side=LEFT)
m1_frame = Frame(root)
m1_frame.pack(fill=X)
Label(m1_frame, text="Motor 1").pack(side=LEFT)
self.m1 = Scale(m1_frame, from_=-255, to=255, orient=HORIZONTAL)
self.m1.pack(side=LEFT, fill=X, expand=True)
m2_frame = Frame(root)
m2_frame.pack(fill=X)
Label(m2_frame, text="Motor 2").pack(side=LEFT)
self.m2 = Scale(m2_frame, from_=-255, to=255, resolution=1, orient=HORIZONTAL)
self.m2.pack(side=LEFT, fill=X, expand=True)
self.m2.config(to=10)
motor_btns_frame = Frame(root)
motor_btns_frame.pack()
Button(motor_btns_frame, text='Send Once', command=self.send_motor_once).pack(side=LEFT)
Button(motor_btns_frame, text='Send Cont.', command=self.show_values, state="disabled").pack(side=LEFT)
Button(motor_btns_frame, text='Stop Send', command=self.show_values, state="disabled").pack(side=LEFT)
Button(motor_btns_frame, text='Stop Mot', command=self.stop_motors).pack(side=LEFT)
enc_frame = Frame(root)
enc_frame.pack(fill=X)
self.enc_lbl = Label(enc_frame, text="Encoders: ")
self.enc_lbl.pack(side=LEFT)
self.mot_1_enc_lbl = Label(enc_frame, text="XXX")
self.mot_1_enc_lbl.pack(side=LEFT)
self.mot_2_enc_lbl = Label(enc_frame, text="XXX")
self.mot_2_enc_lbl.pack(side=LEFT)
speed_frame = Frame(root)
speed_frame.pack(fill=X)
self.spd_lbl = Label(enc_frame, text="Speed rev/s: ")
self.spd_lbl.pack(side=LEFT)
self.mot_1_spd_lbl = Label(enc_frame, text="XXX")
self.mot_1_spd_lbl.pack(side=LEFT)
self.mot_2_spd_lbl = Label(enc_frame, text="XXX")
self.mot_2_spd_lbl.pack(side=LEFT)
self.set_mode(True)
def show_values(self):
print (self.m1.get(), self.m2.get())
def send_motor_once(self):
msg = MotorCommand()
msg.is_pwm = self.pwm_mode
if (self.pwm_mode):
msg.mot_1_req_rad_sec = float(self.m1.get())
msg.mot_2_req_rad_sec = float(self.m2.get())
else:
msg.mot_1_req_rad_sec = float(self.m1.get()*2*math.pi)
msg.mot_2_req_rad_sec = float(self.m2.get()*2*math.pi)
self.publisher.publish(msg)
def stop_motors(self):
msg = MotorCommand()
msg.is_pwm = self.pwm_mode
msg.mot_1_req_rad_sec = 0.0
msg.mot_2_req_rad_sec = 0.0
self.publisher.publish(msg)
def set_mode(self, new_mode):
self.pwm_mode = new_mode
if (self.pwm_mode):
self.mode_lbl.config(text="Current Mode: PWM")
self.mode_btn.config(text="Switch to Feedback Mode")
self.slider_max_label.config(state="disabled")
self.slider_max_val_box.config(state="disabled")
self.max_val_update_btn.config(state="disabled")
else:
self.mode_lbl.config(text="Current Mode: Feedback")
self.mode_btn.config(text="Switch to PWM Mode")
self.slider_max_label.config(state="normal")
self.slider_max_val_box.config(state="normal")
self.max_val_update_btn.config(state="normal")
self.update_scale_limits()
def motor_vel_callback(self, motor_vels):
mot_1_spd_rev_sec = motor_vels.mot_1_rad_sec / (2*math.pi)
mot_2_spd_rev_sec = motor_vels.mot_2_rad_sec / (2*math.pi)
self.mot_1_spd_lbl.config(text=f"{mot_1_spd_rev_sec:.2f}")
self.mot_2_spd_lbl.config(text=f"{mot_2_spd_rev_sec:.2f}")
def encoder_val_callback(self, encoder_vals):
self.mot_1_enc_lbl.config(text=f"{encoder_vals.mot_1_enc_val}")
self.mot_2_enc_lbl.config(text=f"{encoder_vals.mot_2_enc_val}")
def switch_mode(self):
self.set_mode(not self.pwm_mode)
def update_scale_limits(self):
if (self.pwm_mode):
self.m1.config(from_=-255, to=255, resolution=1)
self.m2.config(from_=-255, to=255, resolution=1)
else:
lim = float(self.slider_max_val_box.get())
self.m1.config(from_=-lim, to=lim, resolution=0.1)
self.m2.config(from_=-lim, to=lim, resolution=0.1)
def update(self):
self.tk.update()
def main(args=None):
rclpy.init(args=args)
motor_gui = MotorGui()
rate = motor_gui.create_rate(20)
while rclpy.ok():
rclpy.spin_once(motor_gui)
motor_gui.update()
motor_gui.destroy_node()
rclpy.shutdown()Subscribed Topics:
-
/motor_vels(type:MotorVels) receives angular speeds of both motors. -
/encoder_vals(type:EncoderVals) receives encoder count feedback.
Published Topics:
/motor_command(type:MotorCommand) sends commands to the motor driver node (in PWM or velocity mode).




