This post is all about project design.
But before getting into UML's and other modeling tools, let us have a quick rewind:
A few months back I went on a [recently paused] quest for building my own autonomous drone. This quest was quite pretentious I know.
"I have a TelloDrone, and the INTERNET so,
Hell, how hard could it be?"
After all, at work I write programs for autonomous drones all the time:
Yet, After facing some reality checks, such as:
- Autonomous drones are the hottest song in the market right now, there are startups and millions of dollars around this subject. So maybe doing such a thing alone is not that easy.
- At work, we are a team of 7 brilliant experienced developers (all heaving a Master degree), and me, a young padawan :)
The tools I thought I would find online are not up to the task:
- I wanted to use my good old Tello (via the unofficial SDK)
- I wanted to use ROS2
- I wanted To upscale my CPP skills a notch.
Sadly, there is no such ROS2-CPP-Tello-driver out there, that supports the unofficial SDK (that provides localization solution).
- Even better, there is no such thing as a CPP-Tello-driver that would support the unofficial DSK
I realized I had to build one on my own :)
Building such a driver from scratch is not an easy task.
I took this opportunity to fortify some software development principles I think every Software engineer should work with:
- This Driver design has S.O.L.I.D principals
- I Stick to Design Patterns wherever I can
- This library must be friendly, reusable, and well documented.
Using this CPP TelloDriver should be as simple as:
#include "TelloDriver/TelloDriver.hpp"
TelloDriver tello;
tello.Connect();
if (!tello.WaitForConnection(10)) // secs
{
tello.GetLogger()->error("Connection error. exiting!");
return 1;
}
tello.GetLogger()->info("Sending Takeoff command!");
tello.Takeoff();
std::this_thread::sleep_for(std::chrono::seconds(10));
tello.GetLogger()->info("Sending Land command!");
tello.Land();
std::this_thread::sleep_for(std::chrono::seconds(5));
tello.GetLogger()->info("Scenario over!");
return 0;
Part1 - Driver Design
To be honest, all I ever wanted from this tello was its localization solution, I.E:
# Pseudo
current_location[x] = tello.get_localization[0]
current_location[y] = tello.get_localization[1]
current_location[z] = tello.get_localization[2]
But the (brilliant) TelloPy driver, provides much more data, for example, this is its FlightData:
class FlightData(object):
def __init__(self, data):
self.battery_low = 0
self.battery_lower = 0
self.battery_percentage = 0
self.battery_state = 0
self.camera_state = 0
self.down_visual_state = 0
self.drone_battery_left = 0
self.drone_fly_time_left = 0
self.drone_hover = 0
self.em_open = 0
self.em_sky = 0
self.em_ground = 0
self.east_speed = 0
self.electrical_machinery_state = 0
self.factory_mode = 0
self.fly_mode = 0
self.fly_speed = 0
self.fly_time = 0
self.front_in = 0
self.front_lsc = 0
self.front_out = 0
self.gravity_state = 0
self.ground_speed = 0
self.height = 0
self.imu_calibration_state = 0
self.imu_state = 0
self.light_strength = 0
self.north_speed = 0
self.outage_recording = 0
self.power_state = 0
self.pressure_state = 0
self.smart_video_exit_mode = 0
self.temperature_height = 0
self.throw_fly_timer = 0
self.wifi_disturb = 0
self.wifi_strength = 0
self.wind_state = 0
So to be more accurately, my CPP TelloDriver needs to provide the following information:
- IMU
- Attitude
- Gyro
- Localization solution (aka position)
- FlightData (TOF, battery levels, built-in-tests, drone-mode, etc..)
Note: This data is crucial for decision making, path-planning, and anti-collision systems.
To get all this data, I need to expand Example 1:
This is the way the TelloPy is processing raw_data from the drone:
# Pseudo
def socket_thread(self):
while(keep_running):
raw_data = socket.receive(drone_ip)
process_data(raw_data)
def process_data(self, data)
""" ***************
*
* Some initializations
*
*************** """
cmd = uint16(data[5], data[6])
if cmd == LOG_HEADER_MSG:
id = uint16(data[9], data[10])
log.info("recv: log_header: id=%04x, '%s'" % (id, str(data[28:54])))
log.debug("recv: log_header: %s" % byte_to_hexstring(data[9:]))
self.__send_ack_log(id)
self.__publish(event=self.EVENT_LOG_HEADER, data=data[9:])
if self.log_data_file and not self.log_data_header_recorded:
self.log_data_file.write(data[12:-2])
self.log_data_header_recorded = True
elif cmd == LOG_DATA_MSG:
log.debug("recv: log_data: length=%d, %s" % (len(data[9:]), byte_to_hexstring(data[9:])))
self.__publish(event=self.EVENT_LOG_RAWDATA, data=data[9:])
try:
self.log_data.update(data[10:])
if self.log_data_file:
self.log_data_file.write(data[10:-2])
except Exception as ex:
log.error('%s' % str(ex))
self.__publish(event=self.EVENT_LOG_DATA, data=self.log_data)
""" ***************
*
* Many more elif's comes here.
*
*************** """
Can you see it?
TelloPy's developer has broken a S.O.L.I.D principle (OMG!!):
The function process_data(self, data)
is supposed to process data.
Yet inside the if cmd == LOG_HEADER_MSG
condition the function also sends commands back to the drone:
if cmd == LOG_HEADER_MSG:
id = uint16(data[9], data[10])
log.info("recv: log_header: id=%04x, '%s'" % (id, str(data[28:54])))
log.debug("recv: log_header: %s" % byte_to_hexstring(data[9:]))
self.__send_ack_log(id) #<------------------Breaking Single responsibility (The S of the S.O.L.I.D )
self.__publish(event=self.EVENT_LOG_HEADER, data=data[9:])
if self.log_data_file and not self.log_data_header_recorded:
self.log_data_file.write(data[12:-2])
self.log_data_header_recorded = True
It is true though. In order to start a LOGGING session with the drone, the TelloDriver, must send back an ACK_LOG message that contains the same ID received from the drone within a time limit,
basically telling the drone to switch from 1.3 SDK, to the unofficial SDK.
According to this behavior, it sounds that we need some sort of an Observer, that observes new data. And if it is a LOG_HEADER_MSG then it will send back ACK_LOG (with the given ID).
That's the only responsibility this Observer has!
Something like that:
# Pseudo
class SendAckLogObserver():
def __init__(self, sender_socket):
self._raw_msg = None
self._commands_sender = sender_socket
def update(self, data):
cmd = uint16(data[5], data[6])
if cmd == LOG_HEADER_MSG:
id = uint16(data[9], data[10])
self._commands_sender(id)
class TelloExample()
def __init__(self, observers):
self._attached_observer = observers
def socket_thread(self):
while(keep_running):
raw_data = socket.receive(drone_ip)
# process_data(raw_data) #<-------------- This is old implementation
self.notiy(raw_data)
def notify(self, data):
for observer in self._attached_observer:
observer.update(data)
That's a classic GoF example for the Observer design pattern.
Using this design we can strip this spaghetti process_data(raw_data)
function into many attached observers.
Another design improvement can be done:
We saw in Example 2 that all the data is being saved in self.log_data
. But when using an observer, this data will only be available to this specific observer, I.E:
# Pseudo
class LogDataObserver():
def __init__(self):
self.log_data = SomeLogData()
def update(self, data):
cmd = uint16(data[5], data[6])
if cmd == LOG_DATA_MSG:
try:
self.log_data.update(data[10:])
if self.log_data_file:
self.log_data_file.write(data[10:-2])
except Exception as ex:
log.error('%s' % str(ex))
Now we can add this observer to the observers list:
if __name__ == '__main__':
some_socket = socket('192.168.10.1', 8889)
observers = []
observers.append(SendAckLogObserver(some_socket))
observers.append(LogDataObserver())
drone = TelloExample(observers)
drone.connect()
The TelloPy driver has implemented a publish mechanism that notify the user about new data, in Example 2 we can see this line:
self.__publish(event=self.EVENT_LOG_DATA, data=self.log_data)
That is nice, publishing data to the user, but then it is tight-coupled to TelloDriver. So implementing a LogDataObserver
will break this coupling and the user would not receive log_data
.
So by now, we have something like that:
In order to supply LogData to the user,
we could add a Getter to LogDataObserver().
But it will break SOLID! This is LogDataObserver, not LogDataProvider.
This suggested new design helps attaching new observers in NO TIME, and keeps the driver's code clean. But:
All the observers keep their data to themselves!
This problem yet again screams for a Design pattern solution, luckily, there's the Facade design pattern. Using Facade, I implemented a shared DataManager, that has many interfaces. One for each observer. And, more importantly, it also supplies data to the user also!
# Pseudo
class InserNewLogDataMsgInterface:
def set_log_data(self, new_log_data):
"""Sets New log data msg."""
pass
class DataManager(InserNewLogDataMsgInterface):
def __init__(self):
self._log_data = None
def set_log_data(self, new_log_data):
self._log_data = new_log_data
This DataManager is injected via an implemented interface to every observer that needs it:
# Pseudo
# Minor change to LogDataObserver:
class LogDataObserver():
def __init__(self, shared_data_mgr):
if not isinstance(shared_data_mgr,InserNewLogDataMsgInterface):
throw("LogDataObserver not support this shared_data_mgr")
self._shared_data_mgr = shared_data_mgr
self.log_data = SomeLogData()
def update(self, data):
cmd = uint16(data[5], data[6])
if cmd == LOG_DATA_MSG:
try:
self.log_data.update(data[10:])
"""
That's the only function exposed from DataManager to this LogDataObserver.
That's the Facade design.
"""
self._shared_data_mgr.set_log_data(self.log_data)
if self.log_data_file:
self.log_data_file.write(data[10:-2])
except Exception as ex:
log.error('%s' % str(ex))
# Minor change to TelloExample:
class TelloExample()
def __init__(self, observers, data_mahager):
# Store shared data manager for later use.
self._data_mahager = data_mahager
# Observers are expected to share the same data_mahager instance.
# Each observer accesses it via a different interface.
self._attached_observer = observers
def socket_thread(self):
while(keep_running):
raw_data = socket.receive(drone_ip)
# process_data(raw_data) #<-------------- This is old implementation
self.notiy(raw_data)
def notify(self, data):
for observer in self._attached_observer:
observer.update(data)
Now our main will look like:
if __name__ == '__main__':
some_socket = socket('192.168.10.1', 8889)
data_mgr = DataManager()
observers = []
observers.append(SendAckLogObserver(some_socket))
observers.append(LogDataObserver(data_mgr)) #<------- Dependency injection
drone = TelloExample(observers,data_mgr)
drone.connect()
Now The TelloExample supports many observers, that observe for the new data, parse it, then store it inside a shared data manager, via a provided interface. Well, something like that:
You are probably asking:
But still, how can the user get the data?
Well, that's where the magic happens.
By now I have shown an Observer and a Facade.
It is time to combine them!
I want that the TelloExample will have the ability to notify attached users about new data that has arrived, something just like TelloPy's developer did:
self.__publish(event=self.EVENT_LOG_DATA, data=self.log_data)
But decoupled to the TelloExample itself, actually, it will act only as a Bootstrapper between the user implemented observer, and the DataManager:
Behold, the Observabale Facade (drums....)
# Pseudo
class IObservableDataManager:
def attach(self, OBSERVE_DATA_TYPE ,observable):
"""Attach the user observable."""
pass
class InserNewLogDataMsgInterface:
def set_log_data(self, new_log_data):
"""Sets New log data msg."""
pass
class InserNewWifiDataMsgInterface:
def set_wifi_data(self, new_wifi_data):
"""Sets New log data msg."""
pass
class DataManager(IObservableDataManager,InserNewLogDataMsgInterface,InserNewWifiDataMsgInterface):
def __init__(self):
self._log_data = None
self._wifi_data = None
self._observers = {OBSERVE_DATA_TYPE:[],}
def attach(self, OBSERVE_DATA_TYPE ,observable):
self._observers[OBSERVE_DATA_TYPE].append(observable)
def set_log_data(self, new_log_data):
self._log_data = new_log_data
notify(LOG_DATA)
def set_wifi_data(self, new_wifi_data):
self._wifi_data = new_wifi_data
notify(WIFI_DATA)
def notify(self, OBSERVE_DATA_TYPE):
if OBSERVE_DATA_TYPE == WIFI_DATA:
for observer in self._observers[WIFI_DATA]:
observer.update(self._wifi_data)
if OBSERVE_DATA_TYPE == LOG_DATA:
for observer in self._observers[LOG_DATA]:
observer.update(self._log_data)
Now that we have an observable faced, lets us see how it is used:
# Pseudo
# Minor changes to TelloExample
class TelloExample()
def __init__(self, observers, data_mahager):
# Store shared data manager for later use.
self._data_mahager = data_mahager
# Observers are expected to share the same data_mahager instance.
# Each observer accesses it via a different interface.
self._attached_observer = observers
def socket_thread(self):
while(keep_running):
raw_data = socket.receive(drone_ip)
# process_data(raw_data) #<-------------- This is old implementation
self.notiy(raw_data)
def notify(self, data):
for observer in self._attached_observer:
observer.update(data)
# Add attach capabilities to the user
def attach(self,OBSERVE_DATA_TYPE ,observable):
# Using the IObservableDataManager provided interface.
self._data_mahager.attach(self,OBSERVE_DATA_TYPE ,observable)
class My_wifi_ovserver()
def __init__(self):
self._wifi_strength = None
def print_wifi_data():
if self._wifi_strength != None:
print(self._wifi_strength)
def update(self, new_wifi_strength):
self._wifi_strength = new_wifi_strength
if __name__ == '__main__':
my_observer = My_wifi_ovserver()
some_socket = socket('192.168.10.1', 8889)
data_mgr = DataManager()
observers = []
observers.append(SendAckLogObserver(some_socket))
observers.append(LogDataObserver(data_mgr)) #<------- Dependency injection
drone = TelloExample(observers,data_mgr)
drone.attach(WIFI_DATA,my_observer)
drone.connect()
while(True):
my_observer.print_wifi_data()
So now this TelloExample can provide data back to the user:
That's great and all, but what about sending commands to the drone?
That's an excellent question. I've shown before that the Single responsibility is not held in Example 2. Hence, the TelloExample needs a Commander, and only this commander will be able to send commands to the drone!
something like that:
class Commander()
def __init__(self, sender_socket):
self._raw_msg = None
self._commands_sender = sender_socket
def send_takeoff()
self._commands_sender(takeoff_pkt)
def send_land()
self._commands_sender(land_pkt)
# Minor changes to TelloExample
class TelloExample()
def __init__(self, observers, data_mahager, commander):
# Store shared data manager for later use.
self._data_mahager = data_mahager
self._commander = commander
# Observers are expected to share the same data_mahager instance.
# Each observer accesses it via a different interface.
self._attached_observer = observers
def socket_thread(self):
while(keep_running):
raw_data = socket.receive(drone_ip)
# process_data(raw_data) #<-------------- This is old implementation
self.notiy(raw_data)
def notify(self, data):
for observer in self._attached_observer:
observer.update(data)
# Add attach capabilities to the user
def attach(self,OBSERVE_DATA_TYPE ,observable):
# Using the IObservableDataManager provided interface.
self._data_mahager.attach(self,OBSERVE_DATA_TYPE ,observable)
def takeoff(self):
self._commander.takeoff()
while(self._data_mahager.get_flight_data().height < TAKEOFF_THRESH) #<------------------- This get_flight_data method should be exposed via IDataManager.
print("taking off")
def takeoff(land):
self._commander.land()
if __name__ == '__main__':
my_observer = My_wifi_ovserver()
some_socket = socket('192.168.10.1', 8889)
data_mgr = DataManager()
observers = []
observers.append(SendAckLogObserver(some_socket))
observers.append(LogDataObserver(data_mgr)) #<------- Dependency injection
drone = TelloExample(observers,data_mgr)
drone.attach(WIFI_DATA,my_observer)
drone.connect()
drone.takeoff()
while(True):
my_observer.print_wifi_data()
This design should be enough.
I did add some abstraction layers for the socket itself. But for the scope of this post, that'll do.
My final designs for the new CPP TelloDriver
High level overview
User <-> Driver interaction
Overall class relation
Wraping up
Don't get me wrong. The TelloPy driver is amazing. And has many many users. I love it and used it my self back when I made the HalloPy project.
Writing code can become a mess very fast. Designing first, writing later approach will guarantee a clean, efficient, and elegant code. That's the engineer way.
Cheers,
Gal.