"""
Vehicle classes for AUV simulation with swarm coordination.
Implements abstract and concrete vehicle classes for autonomous underwater
vehicle simulation, including 6-DOF dynamics, modular GNC architecture, sensor
integration, and multi-agent communication capabilities.
Classes
-------
Vehicle
Abstract base class for vehicle hierarchy.
AUV
Abstract AUV class with sensor management and GNC interfaces.
Remus100s
Concrete Remus 100 AUV implementation with full dynamics.
Model
Lightweight vehicle state model for swarm coordination.
Functions
---------
buildGroup(num, gid, hasLeader, vehType, kwargs)
Create a list of vehicle instances for multi-agent simulation.
Notes
-----
Based on Fossen's marine vehicle dynamics formulation and Python Vehicle
Simulator.
References
----------
[1] Fossen, T.I. Python Vehicle Simulator. GitHub repository.
https://github.com/cybergalactic/PythonVehicleSimulator
[2] Fossen, T.I. (2021). Handbook of Marine Craft Hydrodynamics and Motion
Control. 2nd Edition, Wiley. https://www.fossen.biz/wiley
"""
from typing import Any, Dict, List, Optional, Tuple, Union
from numpy.typing import NDArray
from abc import ABC, abstractmethod
import numpy as np
import os
import inspect
import math
from munetauvsim import communication as comm
from munetauvsim import navigation as nav
from munetauvsim import environment as env
from munetauvsim import guidance as guid
from munetauvsim import control as ctrl
from munetauvsim import aquanet_lib as aqnet
from munetauvsim import gnc
from munetauvsim import logger
###############################################################################
# Type Aliases
NPFltArr = NDArray[np.float64]
# Global Variables
log = logger.addLog('veh')
###############################################################################
[docs]class Vehicle(ABC):
"""Abstract base class for structural hierarchy of vehicle classes."""
[docs] @abstractmethod
def __init__(self)->None:
"""Define and set vehicle attributes."""
###############################################################################
[docs]class Model(Vehicle):
"""
Lightweight data model representing another vehicle's state in multi-agent
simulations.
The Model class stores data about another vehicle for use in swarm
coordination where vehicles maintain internal models of their neighbors'
states based on received communication messages. Includes historical logging
of position and velocity reports with automatic capacity expansion.
Parameters
----------
vehObj : Vehicle
The source vehicle to create a model from. Copies the following
attributes into the model: callSign, id, groupId, isLeader, eta,
velocity, nextEta, nextVel.
Attributes
----------
**Identity and Status:**
callsign : str
Vehicle's unique identifier
id : int
Vehicle ID number
groupId : str
Swarm group identifier
isLeader : bool
True if vehicle is a group leader
**State Vectors:**
eta : ndarray, shape (3,)
Vehicle position [x,y,z] in END frame (m)
velocity : ndarray, shape (3,)
Vehicle velocity [vx, vy, vz] in END frame (m/s)
nextEta : ndarray
Projected next position (if available)
nextVel : ndarray
Projected next velocity (if available)
**Communication:**
nodeAddr : int
Simulation comms address.
timeLastMsg : float
Timestamp the last received message was sent from this vehicle (s)
delayLastMsg : float
Time elapsed since last received message (s)
rprtRecv : bool
Flag indicating last report successfully received
**Data Logs:**
etaLog : ndarray, shape (logCap, 4)
Log of received position reports, [x, y, z, time]
velLog : ndarray, shape (logCap, 5)
Log of received velocity reports, [vx, vy, vz, speed, time]
logSize : int
Current number of filled log entries, (0 to logCap)
logCap : int
Current capacity of preallocated log arrays, starts at 512 and doubles
when full
Notes
-----
- Uses __slots__ to minimize memory overhead. New attributes cannot be
dynamically added, they must be defined in the class and added to the
__slots__ declaration.
- When logs reach capacity, both are automatically doubled in size by the
communication.writeEtaVelLogs() function.
"""
## Define slots
__slots__ = ('callSign','id','groupId','nodeAddr','isLeader',
'eta','velocity','nextEta','nextVel',
'timeLastMsg','delayLastMsg','etaLog','velLog','rprtRecv',
'_logSize','_logCap')
## Constructor
[docs] def __init__(self,vehObj:Vehicle)->None:
"""
Create vehicle model from existing vehicle object.
Parameters
----------
vehObj : Vehicle
Source vehicle to copy attributes from (callSign, id, groupId,
nodeAddr, isLeader, eta, velocity, nextEta, nextVel).
Notes
-----
Initializes communication tracking (timeLastMsg, delayLastMsg) and
preallocates log arrays with 512-row capacity.
"""
# Copy Existing Attributes
self.callSign = vehObj.callSign
self.id = vehObj.id
self.groupId = vehObj.groupId
self.nodeAddr = vehObj.nodeAddr
self.isLeader = vehObj.isLeader
self.eta = np.copy(vehObj.eta[:3])
self.velocity = np.copy(vehObj.velocity[:3])
self.nextEta = np.copy(vehObj.nextEta)
self.nextVel = np.copy(vehObj.nextVel)
# New Attributes for Modelling
self._logSize = 0
self._logCap = 512
self.timeLastMsg = 0.0
self.delayLastMsg = 0.0
self.etaLog = np.zeros((self._logCap, 4))
self.velLog = np.zeros((self._logCap, 5))
self.rprtRecv = False
## Special Methods
[docs] def __repr__(self)->str:
"""Return concise string representation of Model object."""
return f"<{self.__class__.__name__} {self.id} at {hex(id(self))}>"
###############################################################################
[docs]class AUV(Vehicle):
"""
Abstract base class for autonomous underwater vehicles.
Provides sensor management, GNC interfaces, and common vehicle attributes.
Cannot be instantiated directly; must be subclassed by concrete vehicle
types.
Attributes
----------
id : int
Unique vehicle identifier.
groupId : str
Swarm group identifier.
isLeader : bool
Leadership status in swarm.
callSign : str
Custom unique vehicle identifier.
eta : ndarray, shape (6,)
Position and attitude [x, y, z, phi, theta, psi].
nu : ndarray, shape (6,)
Body-frame velocities [u, v, w, p, q, r].
sensors : dict
Installed sensor objects keyed by name.
GuidSystem : callable
Guidance system function (assigned via load methods).
CommNetwork : object
Communication network manager.
Methods
-------
addSensor(name, sensor)
Install a sensor on the vehicle.
readSensor(name, kwargs)
Read data from a specific sensor.
collectSensorData(i, ocean)
Update vehicle state from sensors (abstract method).
"""
## Constructor ===========================================================#
[docs] @abstractmethod
def __init__(self)->None:
"""
Initialize base AUV attributes.
Sets up identity attributes, state vectors, sensor dictionary, and
GNC function handle placeholders. All assigned method attributes
(GuidSystem, CommNetwork, etc.) are initialized to None.
Notes
-----
This is an abstract class constructor called by subclass __init__.
Concrete vehicles (e.g., Remus100s) must call super().__init__().
"""
#---------------------------------------------------------------------#
# Identity #
#---------------------------------------------------------------------#
self.info = {} # instance data dictionary
self.id = None # id number
self.groupId = None # swarm group ID
self.isLeader = False # leader status
self.callSign = None # identification string
#---------------------------------------------------------------------#
# Assigned Methods #
#---------------------------------------------------------------------#
## Communication
self.CommNetwork = None # Comms Network Manager
self.CommSched = None # Communications Scheduler
## Guidance
self.GuidSystem = None # Guidance System
self.GuidLaw = None # Guidance Law
## Navigation
self.DepthObs = None # Depth Observer
self.HeadingObs = None # Heading Observer
self.Attitude = nav.attitudeEuler # Attitude Propagation
## Control
self.DepthAP = None # Depth Auto Pilot
self.HeadingAP = None # Heading Auto Pilot
self.PropCmd = None # Propeller Command
#---------------------------------------------------------------------#
# Communication #
#---------------------------------------------------------------------#
## Network
self.txAddr = None # Transmission Address
self.nodeAddr = None # Local Node Address
self.aqBaseDir = None # Aquanet Files Base Directory
self.monitorThread = None # Incoming Message Monitor (thread)
## Message Schedules
self.epsdDur = None # Episode Cycle Duration (s)
self.bcrqDur = None # BCST/RQST Duration (s)
self.rspnRqst = None # RSPN REQUEST Received (Follower)(T/F)
self.rspnSched = None # RSPN Schedule
self.rspnFrame = None # RSPN Frame Start Time (s)
self.rspnSlot = None # RSPN Slot Start Time (s)
self.rspnDur = None # RSPN Slot Duration (s)
## Data Integrity
self.dataRepair = {} # Corrupt data handling strategies
#---------------------------------------------------------------------#
# Guidance #
#---------------------------------------------------------------------#
## Waypoints
self.wpt = guid.Waypoint(0,0,0) # waypoint database
self.wpt_k = 0 # index to previous active waypoint
self.R_switch = 10.0 # waypoint acceptance radius (m)
## Heading
self.beta_c_hat = 0 # estimate of the crab angle
self.Delta = 10.0 # positive look-ahead distance (m)
self.gamma = 0.0006 # positive adaptive gain constant
## Swarm
self.target = None # target vehicle for tracking
self.group = None # list of swarm followers
self.nextEta = None # next position
self.nextVel = None # next velocity
self.r_safe = 10.0 # minimum safe vehicle distance (m)
self.r_avoid = 40.0 # avoidance radius (m)
self.r_inner = 80.0 # inner radius of neutral zone(m)
self.r_follow = 100.0 # following distance (m)
## Environment
self.z_safe = 5 # safety distance from ocean floor (m)
#---------------------------------------------------------------------#
# Navigation #
#---------------------------------------------------------------------#
## State
"""
eta=[e,n,d,phi,theta,psi]: x, y, z, roll, pitch, yaw
nu=[u,v,w,p,q,r]: linear & angular velocities: BODY frame
velocity=[x_dot,y_dot,z_dot] linear velocities: NED frame
"""
self.eta = np.zeros(6) # position & attitude vector
self.nu = np.zeros(6) # BODY velocity vector
self.velocity = np.zeros(3) # NED velocity vector
self.clock = 0 # simulation time
self.immobilized = False # mobility status flag
self.sensors = {} # installed sensors
## State Estimators
self.sampleTime = 0.02 # simulation iteration time step
self.wn_d_z = 1/self.sampleTime #1/20# desired natural frequency, depth
"""wn_d_z also gets recalculated by the @sampleTime.setter"""
self.z_d = 0.0 # desired depth command
self.theta_d = 0.0 # desired pitch angle command
self.psi_d = 0.0 # desired heading angle command
self.r_d = 0.0 # desired yaw rate (rad/s)
## Environment
self.drift_c = 0 # sensed ocean current speed (m/s)
self.set_c = 0 # sensed ocean current direction (rad)
self.seabed_z = 6000 # sensed ocean floor depth (m)
#---------------------------------------------------------------------#
# Physics #
#---------------------------------------------------------------------#
## Constants
self.D2R = math.pi / 180 # deg2rad
self.rho = 1026 # density of water (kg/m^3)
self.g = 9.81 # acceleration of gravity (m/s^2)
## Environment
self.V_c = 0 # ocean current speed (m/s)
self.beta_V_c = 0 # ocean current direction (rad)
self.z_floor = 6000 # ocean floor depth (m)
## Properties ============================================================#
@property
def id(self)->int:
"""Unique vehicle ID number"""
return self._id
@id.setter
def id(self, uid:int)->None:
"""Set unique vehicle ID number. Can only be set once."""
if (('_id' not in self.__dict__) or
((self._id is None) and (uid is not None))):
self._id = uid
#--------------------------------------------------------------------------
@property
def groupId(self)->str:
"""The vehicle group ID"""
return self._groupId
@groupId.setter
def groupId(self, gid:str)->None:
"""Set vehicle group ID and clear cached default callsign"""
self._defaultCallSign = None
self._groupId = gid
#--------------------------------------------------------------------------
@property
def isLeader(self)->bool:
"""The vehicle Leader status"""
return self._isLeader
@isLeader.setter
def isLeader(self, val:bool)->None:
"""Set vehicle Leader status and clear cached default callsign"""
self._defaultCallSign = None
self._isLeader = val
#--------------------------------------------------------------------------
@property
def callSign(self)->str:
"""Unique vehicle identifier"""
if (self._callSign is None):
if (('_defaultCallSign' not in self.__dict__) or
(self._defaultCallSign is None)):
idGroup = str(self.groupId).upper() if (self.groupId) else ''
idNum = f'{self.id:02}'
isLeader = '-LEADER'if (self.isLeader) else ''
self._defaultCallSign = idGroup+idNum+isLeader
return self._defaultCallSign
return self._callSign
@callSign.setter
def callSign(self, identifier:str)->None:
"""Set the vehicle identifier"""
self._callSign = identifier
if (identifier is not None):
self.info.update([("Callsign", f"{identifier}")])
#--------------------------------------------------------------------------
@property
def sampleTime(self)->float:
"""The iteration loop time step (seconds), default: 0.02"""
return self._sampleTime
@sampleTime.setter
def sampleTime(self, n:float)->None:
"""Set the time step for simulation iteration loop (seconds)"""
self._sampleTime = n
self.wn_d_z = 1/n #NOTE: This may be unique to Remus100s
#--------------------------------------------------------------------------
@property
def Delta(self):
"""Positive look-ahead distance used by ALOS (m), typically 5-20 m"""
return self._Delta
@Delta.setter
def Delta(self, delta:float)->None:
"""Set the positive look-ahead distance used by ALOS (m)"""
if (delta <= 0):
msg = "Delta must be greater than zero to avoid division error"
log.critical(msg)
raise ValueError(msg)
self._Delta = delta
#--------------------------------------------------------------------------
@property
def target(self)->Optional[Union[Model,Vehicle]]:
"""The vehicle target for tracking"""
return self._target
@target.setter
def target(self, v:Optional[Union[Model,Vehicle]])->None:
"""Set the vehicle target for tracking"""
if ((self.CommNetwork is not None) and
(not isinstance(v, Model))):
self._target = Model(v)
else:
self._target = v
#--------------------------------------------------------------------------
@property
def CommNetwork(self)->Optional[Union[aqnet.AquaNetManager, comm.MuNode]]:
"""The Communication Network Manager"""
return self._CommNetwork
@CommNetwork.setter
def CommNetwork(self, comnet:Any)->None:
"""Set the Communication Network Manager"""
self._CommNetwork = comnet
# Re-set target to ensure properly using Model as required
if (("_target" in self.__dict__) and
(self._target is not None)):
self.target = self._target
#--------------------------------------------------------------------------
@property
def fullDetails(self)->str:
"""The full details of the all the AUV attributes"""
out = []
attrs = vars(self)
for key,value in sorted(attrs.items()):
out.append(f"{key} = {value!r}")
return f"{self.__class__.__name__}(\n " + ",\n ".join(out) + "\n)"
#--------------------------------------------------------------------------
@property
def immobilized(self)->bool:
"""Mobility status of AUV"""
return self._immobilized
@immobilized.setter
def immobilized(self, status:bool)->None:
"""Set the mobility status of the AUV"""
if (('_immobilized' in self.__dict__) and
(not self._immobilized) and status):
log.info('%s: **IMMOBILIZED! Dynamics inoperable...', self.callSign)
elif (('_immobilized' in self.__dict__) and
self._immobilized and (not status)):
log.info('%s: **MOBILIZED! Dynamics operable...', self.callSign)
self._immobilized = status
## Special Methods =======================================================#
[docs] def __repr__(self)->str:
"""Return concise string representation of AUV object."""
return f"<{self.__class__.__name__} {self.id} at {hex(id(self))}>"
#--------------------------------------------------------------------------
[docs] def __str__(self)->str:
"""Return user-friendly string representation with vehicle info."""
out = []
# Identity
inGroup = f' - Group {self.groupId}' if (self.groupId) else ''
isLeader = ' Leader' if (self.isLeader) else ''
out.append(f'AUV {self.id:02}'+inGroup+isLeader)
# Add info from dictionary
if (not self.info):
return out[0]
# Field column width
cw = max(len(k) for k in self.info) + 1
# Field: value data
for key,value in self.info.items():
k = f"{key}:"
out.append(f"{k:{cw}} {value}")
# Total width
tw = max([len(o) for o in out])
# Add line decorators
line = '-' * tw
out = [out[0],line,*out[1:],line]
return '\n'+'\n'.join(out)
## Methods ===============================================================#
[docs] def syncEnvironmentState(self, i:int, ocean:env.Ocean)->None:
"""
Inject true environmental state for the current simulation step.
Called by the Simulator each iteration to inject true environmental
conditions directly into the vehicle for dynamics calculations. These
values represent the physical reality of the environment and are used
exclusively by the vehicle dynamics method for accurate force and
collision computations.
Parameters
----------
i : int
Current simulation iteration counter. Used to index time-varying
environmental arrays (e.g., current speed and direction).
ocean : env.Ocean
Ocean environment object containing:
- current : env.Current1D or None
Time-varying current model with speed and angle arrays.
- floor : env.Floor or None
Bathymetric floor model with callable floor(x, y).
Notes
-----
**Side Effects:**
Updates the following vehicle attributes:
- V_c : float
True ocean current speed (m/s).
- beta_V_c : float
True ocean current direction (rad).
- z_floor : float
True ocean floor depth at vehicle position (m).
These attributes feed directly into dynamics() and are not intended
for use by guidance or navigation algorithms. For sensed estimates of
environmental conditions, see collectSensorData().
**Separation from Sensor Pathway:**
This method and collectSensorData() represent two distinct pathways
for environmental data on the vehicle:
- syncEnvironmentState() -- true environment state injection from the
simulator (e.g z_floor, V_c, beta_V_c) for vehicle dynamics
calculations.
- collectSensorData() -- sensor-based observations (e.g. seabed_z,
drift_c, set_c) for use by guidance and navigation algorithms or
environmental data collection.
See Also
--------
collectSensorData : Sensor-based environmental observation pathway.
dynamics : Utilizes V_c, beta_V_c, z_floor for physics evaluations.
"""
if (ocean is not None):
# Current
if (ocean.current is not None):
self.V_c = ocean.current.speed[i]
self.beta_V_c = ocean.current.angle[i]
# Depth
if (ocean.floor is not None):
self.z_floor = ocean.floor(self.eta[0], self.eta[1])
#--------------------------------------------------------------------------
[docs] def addSensor(self, name:str, sensor:nav.Sensor)->None:
"""
Install or replace a sensor in the vehicle's sensor suite.
Adds a sensor object to the vehicles internal sensor dictionary, making
it available for data collection during simulation. If a sensor with the
specified name already exists, it is replaced. The vehicle's info
dictionary is automatically updated to reflect the current sensor
configuration.
Parameters
----------
name : str
Unique identifier for the sensor. This name is used as the
dictionary key and for subsequent sensor operations (read, remove).
Names are case-sensitive and should be descriptive.
Common conventions:
- 'current': Ocean currentr sensor
- 'depth': Ocean floor depth sensor
sensor : nav.Sensor
Sensor object to install on the vehicle. The sensor's collectData
method will be called during simulation with relevant parameters
(ocean, eta, etc.) passed as keyword arguments.
Notes
-----
**Side Effects:**
- Calls _updateSensorInfo() to update self.info['Installed Sensors']
**Sensor Interface Requirements:**
All sensors must inherit from nav.Sensor and implement collectData()
method with the signature:
``def collectData(self, **kwargs)->Any``
The collectData method should:
- Extract only needed parameters from kwargs
- Return sensor measurement(s) in appropriate format
- Handle missing parameters gracefully (return None or raise ValueError)
**Data Collection Timing:**
Sensors are automatically read during simulation via the vehicle's
collectSensorData() method, which is called once per simulation
iteration. The simulator provides ocean state and other context via
kwargs.
**Sensor Data Flow:**
1. Simulation calls vehicle.collectSensorData(i, ocean)
2. Vehicle calls vehicle.readAllSensors() or readSensor(name)
3. Sensor.collectData() extracts parameters and returns measurement
4. Vehicle updates internal state (e.g., seabed_z, etc.)
See Also
--------
addSensors : Add multiple sensors at once from dictionary
removeSensor : Remove sensor by name
removeSelectedSensors : Remove multiple sensors by name list
removeAllSensors : Remove all sensors at once
readSensor : Read data from specific sensor
readSelectedSensors : Read multiple sensors by name list
readAllSensors : Read data from all installed sensors
collectSensorData : Main sensor data collection method called by
simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Install default ocean sensors:
>>> auv = Remus100s()
>>> auv.addSensor('current', nav.OceanCurrentSensor())
>>> auv.addSensor('depth', nav.OceanDepthSensor())
>>> print(auv.info['Installed Sensors'])
current, depth
### Install a custom depth sensor:
>>> class CustomDepthSensor(nav.Sensor):
... def collectData(self, **kwargs):
... ocean = kwargs.get('ocean')
... eta = kwargs.get('eta')
... # custom sensor logic
... return customData
>>> auv.addSensor('depth', CustomDepthSensor())
### Read sensor during simulation:
>>> speed, angle = auv.readSensor('current', ocean, i)
>>> print(f"Current: {speed:.2f} m/s at {np.degrees(angle):.1f} deg")
"""
self.sensors[name] = sensor
self._updateSensorInfo()
#--------------------------------------------------------------------------
[docs] def addSensors(self, sensors:Dict[str,nav.Sensor])->None:
"""
Install or replace multiple sensors on the vehicle simultaneously.
Batch operation for adding several sensors at once from a dictionary.
The vehicle's info dictionary is updated once after all sensors are
processed.
Parameters
----------
sensors : dict {str : nav.Sensor}
Dictionary mapping sensor names to sensor objects. Keys are unique
identifier strings (same conventions as addSensor), values must be
nav.Sensor instances.
Notes
-----
**Side Effects:**
- Calls _updateSensorInfo() to update self.info['Installed Sensors']
- Invalid entries (not a nav.Sensor instance) are reported and skipped
**Validation:**
Each sensor value is checked for being an instance of nav.Sensor.
Invalid objects generate an error log message and are not installed.
This allows partial installation of valid sensors even if some entries
are invalid.
See Also
--------
addSensor : Add single sensor
removeSensor : Remove sensor by name
removeSelectedSensors : Remove multiple sensors by name list
removeAllSensors : Remove all sensors at once
readSensor : Read data from specific sensor
readSelectedSensors : Read multiple sensors by name list
readAllSensors : Read data from all installed sensors
collectSensorData : Main sensor data collection method called by
simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Install complete sensor kit all at once:
>>> auv = Remus100s()
>>> sensors = {
... 'current': nav.OceanCurrentSensor(),
... 'depth': nav.OceanCurrentSensor(),
... 'custom': someCustomSensor(),
... }
>>> auv.addSensors(sensors)
>>> print(auv.info['Installed Sensors'])
current, depth, custom
"""
for k,v in sensors.items():
if (isinstance(v, nav.Sensor)):
self.sensors[k] = v
else:
log.error("Invalid sensor for '%s'", k)
self._updateSensorInfo()
#--------------------------------------------------------------------------
[docs] def removeSensor(self, name:str)->None:
"""
Remove a sensor from the vehicle's sensor suite by name.
Deletes the specified sensor from the vehicle's internal sensor
dictionary and updates the info display.
Parameters
----------
name : str
Unique identifier of the sensor to remove. Must match a key in the
self.sensors dictionary exactly (case-sensitive).
Notes
-----
**Side Effects:**
- Calls _updateSensorInfo() to update self.info['Installed Sensors']
**Safe Operation:**
Attempting to remove a non-existent sensor does not raise an exception.
Instead, a warning is reported and operation continues. This allows
safely calling removeSensor without checking existence first.
See Also
--------
addSensor : Add single sensor
addSensors : Add multiple sensors at once from dictionary
removeSelectedSensors : Remove multiple sensors by name list
removeAllSensors : Remove all sensors at once
readSensor : Read data from specific sensor
readSelectedSensors : Read multiple sensors by name list
readAllSensors : Read data from all installed sensors
collectSensorData : Main sensor data collection method called by
simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Remove a single sensor:
>>> auv = Remus100s()
>>> auv.addSensor('current', nav.OceanCurrentSensor())
>>> auv.addSensor('depth', nav.OceanDepthSensor())
>>> print(auv.info['Installed Sensors'])
current, depth
>>> auv.removeSensor('current')
>>> print(auv.info['Installed Sensors'])
depth
"""
if (name in self.sensors):
del self.sensors[name]
self._updateSensorInfo()
else:
log.warning("Sensor '%s' not installed on vehicle", name)
#--------------------------------------------------------------------------
[docs] def removeSelectedSensors(self, names:List[str])->None:
"""
Remove multiple sensors from the vehicle by providing a list of names.
Batch removal operation that processes a list of sensor names. The
vehicle's info dictionary is updated once after all sensors are
processed.
Parameters
----------
names : list of str
List of sensor identifier strings to remove. Each string should
match a key in self.sensors dictionary (case-sensitive).
Notes
-----
**Side Effects:**
- Calls _updateSensorInfo() to update self.info['Installed Sensors']
**Partial Removal:**
If some sensor names in the list don't exist they are reported and
skipped, while valid names are still removed.
See Also
--------
addSensor : Add single sensor
addSensors : Add multiple sensors at once from dictionary
removeSensor : Remove sensor by name
removeAllSensors : Remove all sensors at once
readSensor : Read data from specific sensor
readSelectedSensors : Read multiple sensors by name list
readAllSensors : Read data from all installed sensors
collectSensorData : Main sensor data collection method called by
simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Remove multiple sensors at once:
>>> auv = Remus100s()
>>> auv.addSensors({
... 'current': nav.OceanCurrentSensor(),
... 'depth': nav.OceanDepthSensor(),
... 'custom': someCustomSensor(),
... })
>>> auv.removeSelectedSensors(['current', 'custom'])
>>> print(auv.info['Installed Sensors'])
depth
"""
removedAny = False
for name in names:
if (name in self.sensors):
del self.sensors[name]
removedAny = True
else:
log.warning("Sensor '%s' not installed on vehicle", name)
if (removedAny):
self._updateSensorInfo()
#--------------------------------------------------------------------------
[docs] def removeAllSensors(self)->None:
"""
Remove all installed sensors from the vehicle's sensor suite.
Clears the entire sensor dictionary and updates the info display.
Notes
-----
Calls _updateSensorInfo() to remove sensor list from self.info
See Also
--------
addSensor : Add single sensor
addSensors : Add multiple sensors at once from dictionary
removeSensor : Remove sensor by name
removeSelectedSensors : Remove multiple sensors by name list
readSensor : Read data from specific sensor
readSelectedSensors : Read multiple sensors by name list
readAllSensors : Read data from all installed sensors
collectSensorData : Main sensor data collection method called by
simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Reconfigure sensor suite:
>>> auv.removeAllSensors()
>>> auv.addSensors({
... 'custom_current': CustomCurrentSensor(),
... 'custom_depth': CustomDepthSensor(),
... })
"""
if (self.sensors):
self.sensors.clear()
self._updateSensorInfo()
#--------------------------------------------------------------------------
[docs] def readSensor(self, name:str, *args, **kwargs)->Optional[Any]:
"""
Read data from a single sensor by name with provided parameters.
Calls the collectData method of the specified sensor, passing along any
provided arguments. Returns the sensor's data output or None if the
sensor is not found or if data collection fails.
Parameters
----------
name : str
Unique identifier of the sensor to read from. Must match a key in
self.sensors dictionary exactly (case-sensitive).
*args : tuple, optional
Positional arguments to pass to the sesnor's collectData method.
Since this reads a single known sensor, positional arguments are
supported for convenience.
**kwargs : dict, optional
Keyword arguments to pass to the sensor's collectData method. The
sensor extracts only the parameters it needs.
Common kwargs include:
- ocean : env.Ocean
Ocean environment object for current and depth data
- eta : ndarray
Vehicle position for location-dependent sensing
- i : int
Simulation iteration counter for time-dependent sensing
Returns
-------
data: Any or None
Sensor data defined by the sensor's collectData method.
Common return types:
- tuple : (value1, value2, ...) for multi-valued sensors
- float : Single scalar measurement
- ndarray : Vector or array measurement
- None : Sensor not found or data collection failed
Notes
-----
**Side Effects:**
Calls sensor.collectData() for named sensor
**Argument Passing:**
Supports positional arguments ``(*args)`` for convenience since the method
only calls a single sensor.
**Error Handling:**
Returns None and reports issue if a sensor is not found or there is an
error during data collection, allowing the simulation to continue.
**Return Type Variability:**
Return type depends on specific sensor implementation. Check sensor
documentation for expected return format.
See Also
--------
addSensor : Add single sensor
addSensors : Add multiple sensors at once from dictionary
removeSensor : Remove sensor by name
removeSelectedSensors : Remove multiple sensors by name list
removeAllSensors : Remove all sensors at
readSelectedSensors : Read multiple sensors by name list
readAllSensors : Read data from all installed sensors
collectSensorData : Main sensor data collection method called by
simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Read ocean current sensor with keyword arguments
>>> auv = Remus100s()
>>> ocean = env.Ocean(spd=0.5, ang=np.pi/4)
>>> speed, angle = auv.readSensor('current', ocean=ocean, i=100)
>>> print(f"Current: {speed:.2f} m/s at {np.degrees(angle):.1f} deg")
Current: 0.50 m/s at 45.0 deg
### Read ocean depth sensor with positional arguments
>>> depth = auv.readSensor('depth', ocean, auv.eta)
>>> print(f"Ocean floor at {depth:.1f} m")
Ocean floor at 132.9 m
"""
if (name in self.sensors):
try:
return self.sensors[name].collectData(*args, **kwargs)
except ValueError as e:
log.error("Error reading sensor '%s': %s", name, e)
return None
log.error("Sensor '%s' not installed on vehicle", name)
return None
#--------------------------------------------------------------------------
[docs] def readSelectedSensors(self, names:List[str], **kwargs)->Dict[str,Any]:
"""
Read data from multiple sensors by providing a list of sensor names.
Batch operation that reads several sensors at once, collecting their
data into a dictionary. Keyword arguments are passed to all sensors,
with each sensor extracting only the parameters it needs. Missing
sensors are logged as errors.
Parameters
----------
names : list of str
List of sensor identifier strings to read from. Each string should
match a key in self.sensors dictionary. Names are case-sensitive.
**kwargs : dict
Keyword arguments passed to each sensor's collectData method. All
sensors receive the same kwargs dictionary but extract only needed
parameters.
Common kwargs:
- ocean : env.Ocean
Ocean environment object for current and depth data
- eta : ndarray
Vehicle position for location-dependent sensing
- i : int
Simulation iteration counter for time-dependent sensing
Returns
-------
data : dict of {str : Any}
Dictionary mapping sensor names to their collected data. Keys are
sensor names from the input list (only for successfully read
sensors). Values are dependent on what each sensor's collectData
method returns. Sensors that are not found are omitted from the
returned dictionary.
Notes
-----
**Side Effects:**
Calls collectData() on each sensor
**Keyword Argument Sharing:**
All sensors receive the same kwargs dictionary. Each sensor must be
careful to extract only what it needs via kwargs.get() and not
unintentionally modify shared kwargs.
**Partial Success:**
If some sensors in the list do not exist, data is still collected from
valid sensors. The returned dictionary contains only successfully read
sensor entries.
**No Positional Arguments:**
Unlike readSensor(), this method calls multiple sensors so does not
support positional arguments because there are no requirements that each
sensor takes the same set of arguments or takes them in the same order.
See Also
--------
addSensor : Add single sensor
addSensors : Add multiple sensors at once from dictionary
removeSensor : Remove sensor by name
removeSelectedSensors : Remove multiple sensors by name list
removeAllSensors : Remove all sensors at
readSensor : Read data from specific sensor
readAllSensors : Read data from all installed sensors
collectSensorData : Main sensor data collection method called by
simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Read multiple sensors
>>> auv.Remus100s()
>>> ocean = env.Ocean(spd=0.5, ang=np.pi/4)
>>> data = auv.readSelectedSensors(
... ['current','depth'],
... ocean=ocean, eta=auv.eta, i=100,
... )
>>> print(f"Ocean Current Speed: {data['current'][0]:.2f} m/s")
Ocean Current Speed: 0.50 m/s
"""
data = {}
for name in names:
if (name in self.sensors):
data[name] = self.sensors[name].collectData(**kwargs)
else:
log.error("Sensor '%s' not installed on vehicle", name)
return data
#--------------------------------------------------------------------------
[docs] def readAllSensors(self, **kwargs)->Dict[str,Any]:
"""
Read data from all installed sensors simultaneously.
Batch operation that collects data from every installed sensor in the
vehicle's sensor suite, returning a dictionary mapping sensor names to
their data outputs. Keyword arguments are passed to all sensors, with
each extracting only the parameters it needs.
Parameters
----------
**kwargs : dict
Keyword arguments passed to each sensor's collectData method. All
sensors receive the same kwargs dictionary.
Common kwargs:
- i : int
Simulation iteration counter for time-dependent sensing
- ocean : env.Ocean
Ocean environment object for current and depth data
- eta : ndarray
Vehicle position for location-dependent sensing
Returns
-------
data : dict of {str : Any}
Dictionary mapping sensor names to their collected data. Keys are
sensor names from the input list. Values are dependent on what each
sensor's collectData method returns. If no sensors are installed,
returns empty dictionary {}.
Notes
-----
**Side Effects:**
Calls collectData on each installed sensor
**Use in Simulation:**
This method is typically called by collectSensorData(), which is called
once per simulation iteration by the Simulator.
**Error Handling:**
Individual sensor errors are not caught here. Method assumes each sensor
is responsible for handling own exceptions, otherwise it propagates them
up. If more robust error handling is needed, use readSelectedSensors or
readSensor which catch errors.
See Also
--------
addSensor : Add single sensor
addSensors : Add multiple sensors at once from dictionary
removeSensor : Remove sensor by name
removeSelectedSensors : Remove multiple sensors by name list
removeAllSensors : Remove all sensors at
readSensor : Read data from specific sensor
readSelectedSensors : Read multiple sensors by name list
collectSensorData : Main sensor data collection method called by simulator
updateSensorInfo : Helper that updates info dictionary
Examples
--------
### Read all sensors
>>> auv.Remus100s()
>>> ocean = env.Ocean(spd=0.5, ang=np.pi/4)
>>> data = auv.readAllSensors(ocean=ocean, eta=auv.eta, i=100)
>>> for sensor,value in data.items():
... print(f"{sensor}: {value}")
current: [0.50, 0.79]
depth: 132.9
"""
return {name: sensor.collectData(**kwargs)
for name,sensor in self.sensors.items()}
#--------------------------------------------------------------------------
[docs] @abstractmethod
def collectSensorData(self)->None:
"""
Collect data from installed sensors and update the vehicle state.
Notes
-----
This method should be implemented by each vehicle class to collect data
from and update the vehicle state accordingly.
"""
## Helper Methods ========================================================#
[docs] def _updateSensorInfo(self)->None:
"""Update sensor information in the vehicle info dictionary."""
if (self.sensors):
names = sorted(self.sensors.keys())
sensors = (f"{', '.join(names)}") if len(names) > 1 else names[0]
self.info["Installed Sensors"] = sensors
else:
self.info.pop("Installed Sensors", None)
###############################################################################
[docs]class Remus100s(AUV):
"""
Remus 100 cylinder-shaped AUV model for multi-agent swarm simulation
research.
This class provides a physically-based Remus 100 model with complete 6-DOF
nonlinear dynamics, customized to support full swarm operation, modular GNC
architecture, networking communication, swarm coordination capabilities, and
sensor integration.
Parameters
----------
**kwargs : dict, optional
Keyword arguments for vehicle customization. Common options include:
groupId : str
Swarm group identifier, single character (e.g., "A")
isLeader : bool
True if vehicle is a swarm group leader
callSign : str
Custom vehicle identification string
Any vehicle attribute can be overridden via kwargs.
Attributes
----------
**State Vectors:**
eta : ndarray, shape (6,)
Position and attitude [x, y, z, roll, pitch, yaw]
nu : ndarray, shape (6,)
Body-frame velocities [u, v, w, p, q, r]
velocity : ndarray, shape (3,)
END-frame linear velocities [x_dot, y_dot, z_dot]
u_actual : ndarray, shape (3,)
Actual control inputs [rudder, stern, propeller]
**Control Gains (PID Controllers):**
Kp_z, Ki_z : float
Depth control proportional and integral gains
Kp_theta, Ki_theta, Kd_theta : float
Pitch control PID gains
Kp_psi, Ki_psi, Kd_psi : float
Heading control PID gains
K_f : float
Observer gain for desired yaw angle
**Swarm Coordination:**
target : Vehicle or Model
Target vehicle for tracking
group : list of Vehicle or Model
List of swarm group member vehicles
r_safe : float
Minimum safe vehicle separation distance, 10.0 m
r_avoid : float
Avoidance radius for APF repulsion, 40.0 m
r_inner : float
Inner radius of neutral APF around following distance, 80.0 m
r_follow : float
Preferred following distance, 100.0 m
**Waypoint Navigation:**
wpt : guidance.Waypoint
Waypoint database for path following
wpt_k : int
Index to previous active waypoint
R_switch : float
Waypoint acceptance radius, 10.0 m
Delta : float
Look-ahead distance for ALOS, 10.0 m
gamma : float
Adaptive gain constant for ALOS, 0.0006
Assigned Methods (Function Handles)
------------------------------------
These attributes are designed to hold function references that are assigned
via a custom load* method or manual assignment, enabling modular GNC
architecture:
CommNetwork : object
Communication network manager (AquaNet or MuNet)
CommSched : callable
Communication scheduling function
GuidSystem : callable
Main guidance coordinator that integrates components of the guidance
system at a high-level. Orchestrates observers and autopilots to
generate control commands based on specified guidance law.
GuidLaw : callable
Core guidance algorithm implementing specific mathematical
approaches for path generation. Returns either desired heading
angles (e.g., ALOS) or velocity vectors (e.g. APF) based on current
state and mission objectives.
DepthObs : callable
Depth command filter and state estimator. Processes desired depth
through smoothing algorithm to generate feasible depth command while
respecting vehicle and environmental constraints.
HeadingObs : callable
Heading state estimator and reference generator. Filters
guidance-generated heading commands and estimates desired yaw
rates.
Attitude : callable
Attitude propagation function (default: attitudeEuler)
DepthAP : callable
Depth autopilot implementing the control architecture, such as
PI/PID, for pitch or stern plane deflection commands.
HeadingAP : callable
Heading autopilot implementing the control arichitecture, such as
PI/PID, for rudder deflection commands.
PropCmd : callable
Propeller command generator for thrust management. Converts speed
requirements to RPM commands, supporting both constant thrust
profiles and dynamic speed control based on guidance demands
Methods
-------
dynamics(u_control):
Integrate 6-DOF equations of motion using Euler's method.
xferN2U(rpm):
Convert propeller RPM to vehicle speed (m/s).
xferU2N(speed):
Convert vehicle speed (m/s) to propeller RPM.
collectSensorData(i, ocean)
Read and update environmental sensor data.
loadPathFollowing():
Assign GNC for path following guidance system. Default is ALOS.
loadTargetTracking(target, law, att, rep)
Assign GNC for target tracking guidance system. Default is APF.
loadConstantProp(n_setpt):
Set constant propeller RPM command
loadAquaNetTdmaLF(epDur, bcDur, rpDur):
Configure TDMA communication via AquaNet for leader-follower systems.
loadMuNetLF(network, kwargs)
Configure communication via MuNet with FDMA or TDMA access modes.
Notes
-----
**Remus 100 Physical Properties:**
Controlled by a tail rudder, stern planes, and a propeller.
- Length: 1.6 m
- Diameter: 19 cm
- Mass: 31.9 kg
- Max Speed: 2.5 m/s @ 1525 rpm
- Max Fin Angle: 30 deg
- Max Depth: 100 m
**Vehicle Model:**
This implementation follows the physical and hydrodynamic parameters from
the Remus 100 AUV specifications. The dynamics model includes:
- Rigid body and added mass effects
- Coriolis and centripetal forces
- Linear and quadratic damping
- Propeller thrust
- Control surface lift and drag forces
- Environmental coupling (ocean currents, depth limits)
- Actuator dynamics with first-order time constants
- Saturation limits on all control surfaces
The modular GNC architecture allows arbitrary function assignment to the
Assigned Methods attributes, enabling flexible experimentation with
different guidance, navigation, and control algorithms.
References
----------
[1] Fossen, T.I. Python Vehicle Simulator. GitHub repository.
https://github.com/cybergalactic/PythonVehicleSimulator
[2] Fossen, T.I. (2021). Handbook of Marine Craft Hydrodynamics and Motion
Control. 2nd Edition, Wiley. ISBN: 978-1-119-57505-4
Examples
--------
### Create a basic AUV:
>>> auv = Remus100s()
>>> print(f"Max speed: {auv.u_max} m/s, Max depth: {auv.z_max} m")
Max speed: 2.5 m/s, Max depth: 100 m
### Assign custom function to guidance system:
>>> def custom_guidance(vehicle):
... # ... Custom guidance logic ...
... return np.array([rudder, stern, prop])
>>> auv.GuidSystem = custom_guidance
>>> delta_r, delta_s, n = auv.GuidSystem(auv)
### Create swarm leader with waypoint path following:
>>> leader = Remus100s(groupId="X", isLeader=True)
>>> leader.wpt = guid.Waypoint([0, 100, 200], [0, 100, 0], [10, 20, 30])
>>> leader.loadPathFollowing()
>>> leader.loadConstantProp(n_setpt=1200)
### Create swarm follower with APF target tracking:
>>> follower = Remus100s(groupId="X")
>>> follower.loadTargetTracking(leader, law="APF", att="linearCBZ",
... rep="varExp")
>>> print(follower)
AUV 03 - Group X
----------------
Guidance System: Target Tracking, APF
Target: X02-LEADER
"""
## Class Attributes ======================================================#
__num = 0 # number of Remus100s's created
## Constructor ===========================================================#
[docs] def __init__(self, **kwargs: Any)->None:
"""
Initialize Remus100s AUV with complete state, physics, and swarm
parameters.
Sets up all vehicle attributes including physical properties,
hydrodynamic coefficients, control system gains, sensor suite,
navigation parameters, swarm coordination settings, and communication
interfaces. All systems are initialized to safe default values based on
the Remus 100 specifications.
Parameters
----------
**kwargs : dict, optional
Arbitrary keyword arguments to override default attributes. Any
attribute defined in the class can be set via kwargs. Common
customizations:
- groupId : str
Swarm group identifier, single character (e.g., "A")
- isLeader : bool
Set True to designate as group leader (default: False)
- callSign : str
Custom vehicle identifier for display and logging (default:
id+groupId)
Any other vehicle attribute can be overridden, but use caution:
modifying physical parameters affects dynamics accuracy.
Notes
-----
**Attributes Initialized:**
The __init__ method initializes the following attribute categories:
- **Identity & Classification:**
- Increments class instance count variable `num`
- Inherits from parent AUV class
- Sets modelName, modelType, controls description
- Assigns unique id based on class instance count
- **Physical & Hydrodynamic Properties:**
- Dimensions: L (length), diam (diameter)
- Mass properties: rigid body mass, moments of inertia
- Hydrodynamic coefficients: added mass, damping, drag
- Propeller: thrust/torque coefficients (Wageningen B-series)
- Control surfaces: rudder and stern plane parameters
- See Fossen (2021) Section 8.4.2 for derivations
- **Control System Gains:**
- Depth control: Kp_z, Ki_z (outer PI loop)
- Pitch control: Kp_theta, Ki_theta, Kd_theta (inner PID loop)
- Heading control: Kp_psi, Ki_psi, Kd_psi (PID)
- Observer gains: K_f for heading estimation
- Propeller: n_setpt, n_rate for thrust management
- **State Vectors:**
- eta: position/attitude [x, y, z, phi, theta, psi]
- nu: body velocities [u, v, w, p, q, r]
- velocity: END velocities [x_dot, y_dot, z_dot]
- u_actual: control inputs [rudder, stern, propeller]
- **Sensor Suite:**
- OceanDepthSensor: measures ocean floor depth
- Additional sensors can be added via addSensor() method
- **Navigation Parameters:**
- z_max: maximum operating depth (100 m)
- z_safe: safety distance from sensed ocean floor (5 m)
- wn_d_z: desired natural frequency for depth (1/s)
- **Swarm Coordination:**
- r_safe: minimum safe separation (10 m)
- r_avoid: avoidance radius for APF (40 m)
- r_inner: inner radius of neutral follow zone (80 m)
- r_follow: preferred following distance (100 m)
- target: reference to leader vehicle (initially None)
- group: list of swarm neighbors, excludes self (initially None)
- **Waypoint Navigation:**
- wpt: waypoint database (initialized empty at origin)
- wpt_k: active waypoint index (0)
- R_switch: waypoint acceptance radius (10 m)
- Delta: look-ahead distance for ALOS (10 m)
- gamma: adaptive gain for ALOS (0.0006)
- betachat: crab angle estimate (0)
- **Communication:**
- All communication attributes initially None
- Configured via custom loading methods (e.g. loadMuNetLF())
- **GNC Function Handles:**
- All assigned method attributes initially None
- Configured via custom loading methods (e.g. loadPathFollowing())
**Side Effects:**
- Increments Remus100s.num class variable
- Calls parent AUV.__init__() to set up inherited attributes
- Installs default sensors (current and depth)
- Updates vehicle info dictionary with model details
**Initialization Order:**
1. Update class instance counter
2. Call parent class __init__
3. Set identity and model information
4. Initialize navigation parameters and sensors
5. Initialize control state vector (u_actual)
6. Set control gains and propeller parameters
7. Initialize physics and hydrodynamics (comprehensive)
8. Apply user-specified kwargs overrides
**Default Configuration:**
The vehicle is initialized in a neutral, safe state:
- Position at origin (0, 0, 0)
- Zero velocity and angular rates
- No active guidance, navigation, or control systems
- Default sensor suite installed
- Ready for configuration via manual assignment or custom loading
methods
**Kwargs Override Behavior:**
Any attribute can be overridden via kwargs, but this occurs AFTER all
default initialization, so computed attributes (like mass matrix) are
not automatically recalculated if you override component parameters. For
major physics changes, consider subclassing rather than using kwargs.
"""
#---------------------------------------------------------------------#
# General #
#---------------------------------------------------------------------#
self.__class__.__num += 1 # Update Vehicle Class Counts
super().__init__() # Inherit Attributes
#---------------------------------------------------------------------#
# Identity #
#---------------------------------------------------------------------#
self.modelName = "Remus 100s - Experimental Swarm Series"
self.modelType = "Cylinder-Shaped AUV"
self.controls = ["Tail rudder (deg)",
"Stern plane (deg)",
"Propeller revolution (rpm)"]
self.id = self.__class__.__num # ID number
#---------------------------------------------------------------------#
# Communication #
#---------------------------------------------------------------------#
## Data Integrity
self.dataRepair = {
'type': ['predict'],
'group_id': ['predict', 'previous'],
'reporter_id': ['predict'],
'reporter_pos': ['predict', 'previous'],
'reporter_vel': ['predict', 'previous', 'neutral'],
'time_stamp': ['predict', 'previous', 'neutral'],
'leader_pos': ['predict', 'previous'],
'leader_vel': ['predict', 'previous', 'neutral'],
'leader_next_pos': ['previous'],
'leader_next_vel': ['previous', 'neutral'],
'num_followers': ['skip'],
'followers_pos': ['predict', 'previous'],
'followers_vel': ['predict', 'previous', 'neutral'],
'followers_rspn': ['predict', 'previous', 'neutral'],
'rspn_sched': ['predict', 'previous', 'neutral'],
'rspn_start': ['predict', 'previous', 'neutral'],
} # Corrupted message data recovery strategies
#---------------------------------------------------------------------#
# Navigation #
#---------------------------------------------------------------------#
## State Estimators
self.z_max = 100 # max vehicle operating depth (m)
## Sensors
self.addSensor('depth', nav.OceanDepthSensor())
#---------------------------------------------------------------------#
# Control #
#---------------------------------------------------------------------#
## State
"""
u_actual=[rudder,stern,propeller]: radians, radians, rpm
"""
self.u_actual = np.array([0, 0, 0], float) # control input vector
## Propeller / Thrust
self.n_setpt = 0 # propeller rpm set-point
self.n_rate = 0 # propeller rpm rate (rpm/s)
self.u_max = 2.5 # maximum vehicle speed (m/s)
## Stern / Depth
self.z_int = 0 # depth position integral state
self.Kp_z = 0.054 # depth proportional gain (outer loop)
self.Ki_z = 0.00216 # depth integral gain (outer loop)
self.theta_int = 0 # pitch angle integral state
self.Kp_theta = -0.36 # pitch proportional gain
self.Ki_theta = -0.21 # pitch integral gain
self.Kd_theta = -0.13 # pitch derivative gain
## Rudder / Heading
self.psi_int = 0 # yaw angle integral state
self.K_f = 0.5 # observer gain for desired yaw angle
self.Kp_psi = -7.5 # yaw proportional gain
self.Ki_psi = -0.75 # yaw integral gain
self.Kd_psi = -15.0 # yaw derivative gain
#---------------------------------------------------------------------#
# Physics #
#---------------------------------------------------------------------#
## Dimensions
self.dimU = len(self.controls)
self.L = 1.6 # length (m)
self.diam = 0.19 # cylinder diameter (m)
## Actuator Dynamics
self.deltaMax_r = 30 * self.D2R # max rudder angle (rad)
self.deltaMax_s = 30 * self.D2R # max stern plane angle (rad)
self.nMax = 1525 # max propeller revolution (rpm)
self.T_delta = 1.0 # rudder/stern plane time constant (s)
self.T_n = 1.0 # propeller time constant (s)
## Hydrodynamics
"""(Fossen 2021, Section 8.4.2)"""
self.S = 0.7 * self.L * self.diam # S = 70% of rectangle L * diam
a = self.L/2 # semi-axes
b = self.diam/2
self.r_bg = np.array([0, 0, 0.02], float) # CG w.r.t. to the CO
self.r_bb = np.array([0, 0, 0], float) # CB w.r.t. to the CO
## Parasitic Drag Coefficient
"""CD_0, i.e. zero lift and alpha = 0
F_drag = 0.5 * rho * Cd * (pi * b^2)
F_drag = 0.5 * rho * CD_0 * S"""
Cd = 0.42 # Allen et al. (2000)
self.CD_0 = Cd * math.pi * b**2 / self.S
## Rigid-Body Mass Matrix: Expressed in CO
m = 4/3 * math.pi * self.rho * a * b**2 # mass of spheriod
Ix = (2/5) * m * b**2 # moment of inertia
Iy = (1/5) * m * (a**2 + b**2)
Iz = Iy
MRB_CG = np.diag([m, m, m, Ix, Iy, Iz]) # MRB expressed in the CG
H_rg = gnc.Hmtrx(self.r_bg)
self.MRB = H_rg.T @ MRB_CG @ H_rg # MRB expressed in the CO
## Weight and Buoyancy
self.W = m * self.g
self.B = self.W
## Added Moment of Inertia in Roll
"""A44 = r44 * Ix"""
r44 = 0.3
MA_44 = r44 * Ix
## Lamb's K-Factors
e = math.sqrt(1-(b/a)**2)
alpha_0 = (2 * (1-e**2)/pow(e,3)) * (0.5 * math.log((1+e)/(1-e)) - e)
beta_0 = 1/(e**2) - (1-e**2) / (2*pow(e,3)) * math.log((1+e)/(1-e))
k1 = alpha_0 / (2 - alpha_0)
k2 = beta_0 / (2 - beta_0)
k_prime = pow(e,4) * (beta_0-alpha_0) / (
(2-e**2) * (2*e**2 - (2-e**2) * (beta_0-alpha_0)))
## Added Mass System Matrix: Expressed in CO
self.MA = np.diag([m*k1, m*k2, m*k2, MA_44, k_prime*Iy, k_prime*Iy])
## Mass Matrix, Including Added Mass
self.M = self.MRB + self.MA
self.Minv = np.linalg.inv(self.M)
## Natural Frequencies in Roll and Pitch
self.w_roll = math.sqrt(self.W * (self.r_bg[2]-self.r_bb[2]) /
self.M[3][3])
self.w_pitch = math.sqrt(self.W * (self.r_bg[2]-self.r_bb[2]) /
self.M[4][4])
## Tail Rudder Parameters (single)
self.CL_delta_r = 0.5 # rudder lift coefficient
self.A_r = 2 * 0.10 * 0.05 # rudder area (m2)
self.x_r = -a # rudder x-position (m)
## Stern-Plane Paramaters (double)
self.CL_delta_s = 0.7 # stern-plane lift coefficient
self.A_s = 2 * 0.10 * 0.05 # stern-plane area (m2)
self.x_s = -a # stern-plane z-position (m)
## Propeller Parameters
self.D_prop = 0.14 # propeller diameter corresponding to 5.5 inches
self.t_prop = 0.1 # thrust deduction number
self.Ja_max = 0.6632 # from: 0.944 * 2.5 / (0.14 * 1525/60) = 0.6632
"""Single-screw propeller with 3 blades and blade-area ratio = 0.718.
Coefficients are computed using the Matlab MSS toolbox:
>> [KT_0, KQ_0] = wageningen(0,1,0.718,3)
>> [KT_max, KQ_max] = wageningen(0.6632,1,0.718,3)"""
self.KT_0 = 0.4566
self.KQ_0 = 0.0700
self.KT_max = 0.1798
self.KQ_max = 0.0312
## Low-Speed Linear Damping Matrix Parameters
self.T_surge = 20 # time constant in surge (s)
self.T_sway = 20 # time constant in sway (s)
self.T_heave = self.T_sway # equal for for a cylinder-shaped AUV
self.zeta_roll = 0.3 # relative damping ratio in roll
self.zeta_pitch = 0.8 # relative damping ratio in pitch
self.T_yaw = 5 # time constant in yaw (s)
#---------------------------------------------------------------------#
# User Specified #
#---------------------------------------------------------------------#
## User Keyword Attributes
for key,value in kwargs.items():
setattr(self, key, value)
## Methods ===============================================================#
[docs] def dynamics(self, u_control:NPFltArr)->Tuple[NPFltArr,NPFltArr]:
"""
Integrate 6-DOF nonlinear AUV equations of motion forward one time step.
Computes vehicle acceleration and control actuator responses under
current state, control commands, and environmental conditions.
Implements complete Fossen-based marine vehicle dynamics with rigid
body, added mass, hydrodynamic, and environmental coupling effects. Uses
Euler forward integration for computational efficiency.
Parameters
----------
u_control : ndarray, shape (3,)
Commanded control inputs [delta_r, delta_s, n]:
u_control[0] : float, delta_r
Commanded rudder angle in radians. Positive deflection creates
starboard turn (right when viewed from above). Limited to
deltaMax_r.
u_control[1] : float, delta_s
Commanded stern plane angle in radians. Positive deflection
creates bow-down pitch moment. Limited to deltaMax_s.
u_control[2] : float, n
Commanded propeller revolution rate in rpm. Positive values
produce forward thrust. Limited to nMax.
Returns
-------
nu : ndarray, shape (6,)
Updated vehicle velocity vector in body-fixed frame after the
integration step [u, v, w, p, q, r]:
nu[0] : float, u
Surge velocity (forward/aft) in m/s
nu[1] : float, v
Sway velocity (port/starboard) in m/s
nu[2] : float, w
Heave velocity (up/down) in m/s
nu[3] : float, p
Roll rate about x-axis in rad/s
nu[4] : float, q
Pitch rate about y-axis in rad/s
nu[5] : float, r
Yaw rate about z-axis in rad/s
u_actual : ndarray, shape (3,)
Actual control surface positions and propeller rate after actuator
dynamics, time constants, and saturation limits [delta_r, delta_s, n]:
u_actual[0] : float, delta_r
Actual rudder angle in radians
u_actual[1] : float, delta_s
Actual stern plane angle in radians
u_actual[2] : float, n
Actual propeller rpm
Notes
-----
**Equations of Motion:**
The dynamics follow Fossen's 6-DOF formulation:
M * d/dt nu_r + C(nu_r) * nu_r + D(nu_r) * nu_r + g(eta) = tau
where:
- M: Mass matrix, M_RB + M_A (rigid body plus added mass)
- d/dt nu_r: Relative acceleration
- C(nu_r): Coriolis and centripetal matrix (function of relative
velocity)
- D(nu_r): Damping matrix (linear + quadratic drag)
- g(eta): Restoring forces and moments (weight, buoyancy)
- tau: Generalized force vector (propulsion + control surfaces)
- nu_r: Relative velocity, nu - nu_c (accounting for ocean current)
**Force and Moment Components:**
1. **Propleller Thrust and Torque:**
- Wageningen B-series approximation
- Thrust depends on advance number J_a = V_a/(nD_prop)
- Coefficients: K_T0, K_Q0, K_Tmax, K_Qmax
- Thrust deduction factor t_prop accounts for hull interaction
2. **Control Surface Forces:**
- Rudder: Generates sway force Y_r and yaw moment N_r
- Stern planes: Generate heave force Z_s and pitch moment M_s
- Lift coefficient times dynamic pressure times surface area
- Includes drag penalty (parasitic drag)
3. **Lift and Drag:**
- Angle of attack alpha = atan2(w_r, u_r)
- Lift and drag coefficients from Beard & McLain (2012)
- Cross-flow drag for lateral motion
4. **Ocean Current Effects:**
- Ocean current velocity nu_c added in END frame
- Relative velocity nu_r = nu - R^(T(psi) * nu_c) used for
hydrodynamic forces
- Derivative of current (Coriolis-like term) included
**Actuator Dynamics:**
First-order lag models with time constants:
d/dt delta_r = (delta_r,cmd - delta_r) / T_delta
d/dt delta_s = (delta_s,cmd - delta_s) / T_delta
d/dt n = (n_cmd - n) / T_n
where:
- T_delta: rudder / stern plane time constant
- T_n: propeller time constant
**Safety and Collision Detection:**
- Surface breach (z < 0): Buoyancy set to zero, continues dynamics
- Floor collision (z > z_floor): Vehicle immobilized, returns zero
velocities
- Immobilization flag set to True on floor collision, persists for
simulation duration
**Integration Method:**
Forward Euler integration with step size h = sampleTime:
nu(k+1) = nu(k) + h * d/dt nu(k)
u_actual(k+1) = u_actual(k) + h * d/dt u_actual(k)
This is computationally efficient but requires sufficiently small time
steps, typically 0.02 s or smaller, for numerical stability.
**Coordinate Frames:**
- BODY frame: Fixed to vehicle, origin at center of origin (CO)
- END frame: East-North-Down inertial reference frame
- All forces / moments computed in BODY frame
- Velocity transformation via rotation matrix R(psi)
See Also
--------
gnc.m2c : Mass matrix to Coriolis matrix conversion
gnc.forceLiftDrag : Lift and drag force calculation
gnc.crossFlowDrag : Cross-flow drag calculation
gnc.gvect : Restoring force vector calculation
navigation.attitudeEuler : Attitude integration for position update
References
----------
[1] Fossen, T.I. (2021). Handbook of Marine Craft Hydrodynamics and Motion
Control. 2nd Edition, Wiley.
[2] Beard, R.W. and McLain, T.W. (2012). Small Unmanned Aircraft: Theory
and Practice. Princeton University Press.
Examples
--------
### Simulate one dynamics step with control commands:
>>> auv = Remus100s()
>>> auv.eta = np.array([0, 0, 10, 0, 0, 0]) # 10m depth
>>> auv.nu = np.array([2.0, 0, 0, 0, 0, 0]) # 2 m/s forward
>>> u_cmd = np.array([0.1, -0.05, 1200]) # rudder, stern, prop
>>> nu_new, u_act = auv.dynamics(u_cmd)
>>> print(f"New surge velocity: {nu_new[0]:.3f} m/s")
>>> print(f"Actual rudder angle: {u_act[0]:.3f} rad")
"""
# Check Status
if (self.immobilized):
return self.nu*0, self.u_actual*0
# Input Parameters Loaded on Vehicle
h = self.sampleTime
eta = self.eta
nu = self.nu
u_actual = self.u_actual
B = self.B
# Check Environment
if (eta[2] < 0): # surface breach
B = 0
if (eta[2] > self.z_floor): # floor collision
log.warning('**COLLISION with ocean floor! Vehicle: ' +
'%s at [%.1f,%.1f,%.1f] %.2f s',
self.callSign, eta[0], eta[1], eta[2], self.clock)
self.immobilized = True
return nu*0, u_actual*0
# Ocean Current Velocity
u_c = self.V_c * math.cos(self.beta_V_c - eta[5]) # surge
v_c = self.V_c * math.sin(self.beta_V_c - eta[5]) # sway
nu_c = np.array([u_c, v_c, 0, 0, 0, 0], float) # velocity
Dnu_c = np.array([nu[5]*v_c, -nu[5]*u_c, 0, 0, 0, 0],float) # d/dt
# Vehicle Velocity
nu_r = nu - nu_c # relative vehicle vel.
alpha = math.atan2(nu_r[2], nu_r[0]) # angle of attack
U = math.sqrt(nu[0]**2 + nu[1]**2 + nu[2]**2) # vehicle speed
U_r = math.sqrt(nu_r[0]**2 + nu_r[1]**2 + nu_r[2]**2) # relative speed
# Commands and Actual Control Signals
delta_r_c = u_control[0] # commanded tail rudder (rad)
delta_s_c = u_control[1] # commanded stern plane (rad)
n_c = u_control[2] # commanded propeller revolution (rpm)
delta_r = u_actual[0] # actual tail rudder (rad)
delta_s = u_actual[1] # actual stern plane (rad)
n = u_actual[2] # actual propeller revolution (rpm)
# Propeller Coefficients
"""KT and KQ are computed as a function of advance no.
Ja = Va/(n*D_prop) where
Va = (1-w)*U = 0.944 * U
Allen et al. (2000)"""
n_rps = n / 60 # propeller revolution (rps)
Va = U * 0.944 # advance speed (m/s)
# Propeller Thrust and Propeller-Induced Roll Moment
"""Linear approximations for positive Ja values
KT ~= KT_0 + (KT_max-KT_0)/Ja_max * Ja
KQ ~= KQ_0 + (KQ_max-KQ_0)/Ja_max * Ja """
## Forward Thrust
if (n_rps > 0):
X_prop = (self.rho * pow(self.D_prop,4) *
(self.KT_0 * abs(n_rps) * n_rps +
(self.KT_max-self.KT_0)/self.Ja_max * (Va/self.D_prop) *
abs(n_rps)))
K_prop = (self.rho * pow(self.D_prop,5) *
(self.KQ_0 * abs(n_rps) * n_rps +
(self.KQ_max-self.KQ_0)/self.Ja_max * (Va/self.D_prop) *
abs(n_rps)))
## Reverse Thrust
else:
X_prop = (self.rho * pow(self.D_prop,4) * self.KT_0 * abs(n_rps) *
n_rps)
K_prop = (self.rho * pow(self.D_prop,5) * self.KQ_0 * abs(n_rps) *
n_rps)
# Rigid-Body/Added Mass Coriolis/Centripetal Matrices: Expressed in CO
CRB = gnc.m2c(self.MRB, nu_r)
CA = gnc.m2c(self.MA, nu_r)
"""Nonlinear quadratic velocity terms in pitch and yaw (Munk moments)
are set to zero since only linear damping is used"""
CA[4][0] = 0
CA[4][3] = 0
CA[5][0] = 0
CA[5][1] = 0
C = CRB + CA
# Dissipative Forces and Moments
D = np.diag([
self.M[0][0] / self.T_surge,
self.M[1][1] / self.T_sway,
self.M[2][2] / self.T_heave,
self.M[3][3] * 2 * self.zeta_roll * self.w_roll,
self.M[4][4] * 2 * self.zeta_pitch * self.w_pitch,
self.M[5][5] / self.T_yaw])
"""For DOF 1,2,6 the D elements go to zero at higher speeds, i.e.
drag and lift/drag dominate"""
D[0][0] = D[0][0] * math.exp(-3*U_r)
D[1][1] = D[1][1] * math.exp(-3*U_r)
D[5][5] = D[5][5] * math.exp(-3*U_r)
tau_liftdrag = gnc.forceLiftDrag(self.diam,self.S,self.CD_0,alpha,U_r)
tau_crossflow = gnc.crossFlowDrag(self.L,self.diam,self.diam,nu_r)
# Restoring Forces and Moments
g = gnc.gvect(self.W,B,eta[4],eta[3],self.r_bg,self.r_bb)
# Horizontal and Vertical-Plane Relative Speed
U_rh = math.sqrt(nu_r[0]**2 + nu_r[1]**2)
U_rv = math.sqrt(nu_r[0]**2 + nu_r[2]**2)
# Rudder and Stern-Plane Drag
X_r = -0.5*self.rho * U_rh**2*self.A_r * self.CL_delta_r*delta_r**2
X_s = -0.5*self.rho * U_rv**2*self.A_s * self.CL_delta_s*delta_s**2
# Rudder sway force
Y_r = -0.5*self.rho * U_rh**2*self.A_r * self.CL_delta_r*delta_r
# Stern-plane heave force
Z_s = -0.5*self.rho * U_rv**2*self.A_s * self.CL_delta_s*delta_s
# Generalized Force Vector
tau = np.array([
(1-self.t_prop) * X_prop + X_r + X_s,
Y_r,
Z_s,
K_prop,
self.x_s * Z_s,
self.x_r * Y_r], float)
# AUV Dynamics
tau_sum = tau + tau_liftdrag + tau_crossflow - np.matmul(C+D,nu_r) - g
nu_dot = Dnu_c + np.matmul(self.Minv, tau_sum)
# Actuator dynamics
delta_r_dot = (delta_r_c - delta_r) / self.T_delta
delta_s_dot = (delta_s_c - delta_s) / self.T_delta
n_dot = (n_c - n) / self.T_n
# Forward Euler Integration: [k+1]
nu += h * nu_dot
delta_r += h * delta_r_dot
delta_s += h * delta_s_dot
n += h * n_dot
# Actual Control Response
"""Bounds of control signals are enforced by amplitude saturation"""
if (abs(delta_r) >= self.deltaMax_r):
delta_r = np.sign(delta_r) * self.deltaMax_r
if (abs(delta_s) >= self.deltaMax_s):
delta_s = np.sign(delta_s) * self.deltaMax_s
if (abs(n) >= self.nMax):
n = np.sign(n) * self.nMax
u_actual = np.array([delta_r, delta_s, n], float)
return nu, u_actual
#--------------------------------------------------------------------------
[docs] def xferN2U(self, rpm:float)->float:
"""
Transfer function from propller RPM to vehicle Speed.
Parameters
----------
rpm:
Number of propeller Rotations Per Minute.
Returns
-------
speed:
Vehicle speen (m/s).
Notes
-----
Based on a simple linear speed test in a zero ocean current
environment. The function domain is split into three regions with a
linear approximation for each.
"""
# Domain Divisions (Upper Boundary)
low = 100 # low: 0-100
mid = 400 # mid: 100-400
# high: 400-1525
# Y-intercept and Slope
if (rpm <= low):
b = 0.0
m = 0.000611108
elif (rpm <= mid):
b = -0.135849
m = 0.00196552
else:
b = -0.0273616
m = 0.0016922
return (m*rpm + b)
#--------------------------------------------------------------------------
[docs] def xferU2N(self, speed:float)->float:
"""
Transfer function from vehicle Speed to propeller RPM.
Parameters
----------
speed:
Vehicle speed (m/s).
Returns
-------
rpm:
Number of propller Rotations Per Minute.
Notes
-----
Based on a simple linear speed test in a zero ocean current
environment. The function domain is split into three regions with a
linear approximation for each.
"""
# Domain Divisions (Upper Boundary)
low = 0.0605 # low: 0-0.06
mid = 0.6383 # mid: 0.06-0.64
# high: 0.64-2.55
# Y-intercept and Slope
if (speed <= low):
b = 0.0
m = 1636.37
elif (speed <= mid):
b = 69.2435
m = 508.411
else:
b = 16.1836
m = 590.939
return (m*speed + b)
#--------------------------------------------------------------------------
[docs] def collectSensorData(self, i:int, ocean:env.Ocean)->None:
"""
Update vehicle environmental state from installed sensors.
Main sensor data collection method called once per simulation iteration.
Reads installed sensors and updates vehicle attributes with
environmental measurements. This is the primary interface between the
simulation environment and the vehicle's internal state representation.
Parameters
----------
i : int
Current simulation iteration counter. Used for time-dependent
sensing, logging, and sensor dynamics. Typically ranges from 0 to
N-1 where N is total simulation iterations.
ocean : env.Ocean
Ocean environment object containing:
- current : env.OceanCurrent or None
Ocean current model with speed and direction
- floor : env.OceanFloor or None
Ocean floor depth model (bathymetry)
- Additional environment features as available
Notes
-----
**Side Effects:**
Updates the following vehicle attributes based on sensor readings:
- drift_c : float
Ocean current speed in m/s (from 'current' sensor if installed)
- set_c : float
Ocean current direction in radians (from 'current' sensor if
installed)
- seabed_z : float
Ocean floor depth in meters (from 'depth' sensor if installed)
**Implementation:**
Calls `readAllSensors` with the current environment kwargs, then maps
the returned data dictionary to the corresponding vehicle attributes.
Only sensors that are installed and return data will update vehicle
state. The default sensors and their state mappings are:
- 'current' -> `self.drift_c`, `self.set_c`
- 'depth' -> `self.seabed_z`
**Extensibility:**
To add additional sensors and state updates:
1. Define a sensor that inherits `nav.Sensor`
2. Install sensor via `addSensor()`
3. Add a corresponding attribute update below
**Simulator Integration:**
This method is called by the Simulator during each iteration of the
simulation loop, specifically in simulate(), simulateMuNet(), and
simulateAquaNet() methods. The call occurs after communication updates
but before GNC computations.
See Also
--------
addSensor : Add single sensor
addSensors : Add multiple sensors at once from dictionary
removeSensor : Remove sensor by name
removeSelectedSensors : Remove multiple sensors by name list
removeAllSensors : Remove all sensors at
readSensor : Read data from specific sensor
readSelectedSensors : Read multiple sensors by name list
readAllSensors : Read data from all installed sensors
updateSensorInfo : Helper that updates info dictionary
"""
# Read All Installed Sensors
if (ocean is not None):
data = self.readAllSensors(i=i, ocean=ocean, eta=self.eta)
if ('current' in data):
self.drift_c, self.set_c = data['current']
if ('depth' in data):
self.seabed_z = data['depth']
#--------------------------------------------------------------------------
[docs] def loadPathFollowing(self)->None:
"""
Configure vehicle for waypoint-based path following guidance system.
Sets up a complete autonomous navigation system for following a
predefined sequence of waypoints. Configures all necessary guidance,
navigation, and control components for integrated path-following
behavior.
Notes
-----
**Side Effects:**
- Assigns function handles to the following vehicle attributes:
- **Guidance System:**
- self.GuidSystem : callable
Main guidance system to coordinate components at a high-level.
- self.GuidLaw : callable
Core guidance algorithm implementing specific approach for path
generation.
- **Navigation Observers:**
- self.DepthObs : callable
Depth command filter and state estimator.
- self.HeadingObs:
Heading state estimator and reference generator.
- **Control Autopilots:**
- self.DepthAP : callable
Depth autopilot implementing control commands for stern plane
deflection.
- self.HeadingAP : callable
Heading autopilot implementing control commands for rudder
defleciton.
- Updates the vehicle's info dictionary with configuration details for
display and logging.
**Configuration Options:**
Currently there is only one set of configuration options available for
the path following guidance system, the Adaptive Line-of-Sight (ALOS)
guidance law. See guidance.ALOSlaw for specific details.
Attributes used by the assigned components can be customized by setting
them manually. For example:
### Modify Guidance System Parameters:
>>> auv.loadPathFollowing()
>>> auv.Delta = 15.0 # Larger look-ahead distance
>>> auv.R_switch = 5.0 # Tighter waypoint acceptance distance
See each assigned function for more details and additional configuration
options.
See Also
--------
guidance.pathFollow : Main path-following guidance function
guidance.ALOSlaw : Adaptive line-of-sight guidance law
navigation.depthFilter : Depth measurement filter
navigation.headingFilterLOS : LOS angle observer
control.depthPID : Depth autopilot with cascade control
control.headingPID : Heading autopilot
guidance.Waypoint : Waypoint database class
loadTargetTracking : Alternative guidance for swarm following
loadConstantProp : Propeller command configuration
References
----------
[1] Fossen, T. I., "An Adaptive Line-of-Sight (ALOS) Guidance Law for
Path Following of Aircraft and Marine Craft," in IEEE Transactions on
Control Systems Technology, 31(6), 2887-2894, Nov. 2023, doi:
10.1109/TCST.2023.3259819.
[2] Fossen, T.I. (2021). Handbook of Marine Craft Hydrodynamics and Motion
Control. 2nd Edition, Wiley.
Examples
--------
### Basic path following setup:
>>> auv = Remus100s()
>>> auv.wpt = guid.Waypoint( # Define waypoint path
... [0, 100, 200, 300], # x coordinates (East)
... [0, 0, 100, 100], # y coordinates (North)
... [10, 10, 10, 10] # z coordinates (Down / Depth)
... )
>>> auv.loadPathFollowing() # Configure path following
>>> print(auv.info['Guidance System'])
'Path Following, ALOS'
"""
# Load Vehicle Modules
self.GuidSystem = guid.pathFollow
self.GuidLaw = guid.ALOSlaw # Adaptive Line of Sight
self.DepthObs = nav.depthFilter
self.HeadingObs = nav.headingFilterLOS # LOS Observer
self.DepthAP = ctrl.depthPID
self.HeadingAP = ctrl.headingPID
# Generate Information Parameters
self.info.update([("Guidance System", "Path Following, ALOS"),
("Waypoints", f"{len(self.wpt)}")])
#--------------------------------------------------------------------------
[docs] def loadTargetTracking(self,
target:Vehicle,
law:str='APF',
mission:str='targetFeedforward',
formation:str='targetNormPoly',
survival:str='groupNormPoly',
)->None:
"""
Configure vehicle for swarm target tracking.
Sets up autonomous target-following behavior for swarm coordination
using specified guidance law.
Parameters
----------
target : Vehicle
Target vehicle to track and follow. The target's position and
veloctiy are tracked for coordinated motion.
law : str
Guidance law selection:
- 'APF': Artificial Potential Field guidance
- 'CB': Constant Bearing guidance
mission : str
APF mission control component:
- 'targetFeedforward': Feed-forward target vehicle velocity vector.
formation : str
APF Formation keeping component:
- 'targetNormPoly': Radial attraction and repulsion to target.
survival : str
APF Collision avoidance component:
- 'groupNormPoly': Repulsion from all swarm group vehicles.
Notes
-----
**Side Effects:**
- Assigns function handles to the following vehicle attributes:
- **Guidance System:**
- self.GuidSystem : callable
Main guidance system to coordinate components at a high-level.
- self.GuidLaw : callable
Core guidance algorithm implementing specific approach for path
generation.
- **APF Component Functions (if law='APF'):**
- self.GuidLaw.mission : callable
Contributes to mission-based control velocity vector.
- self.GuidLaw.formation : callable
Artificial potential that contributes to formation keeping.
- self.GuidLaw.survival : callable
Artificial potential that contributes to collision avoidance.
- **Control Autopilots:**
- self.DepthAP : callable
Depth autopilot implementing control commands for stern plane
deflection.
- self.HeadingAP : callable
Heading autopilot implementing control commands for rudder
defleciton.
See assigned functions for details and configuration options.
- Updates the vehicles target attribute. Uses the passed Vehicle
instance if no CommNetwork, otherwise creates a Model instance (a
lightweight representation for tracking data).
- Updates the vehicles groupId attribute, inheriting from the target
vehicle.
- Updates the vehicle's info dictionary with configuration details for
display and logging.
**Swarm Coordination Parameters:**
The following vehicle attributes control swarm behavior:
- r_safe : float
Minimum safe separation distance, triggers strong repulsion
- r_avoid : float
Avoidance radius, repulsion active within this range
- r_inner : float
Inner radius of neutral zone around preferred following distance
- r_follow : float
Preferred following distance from target
Tune these parameters based on:
- Vehicle size and maneuverability
- Communication latency and update rate
- Environmental conditions (currents, obstacles)
- Desired formation tightness
**Depth Control Difference:**
Unlike path following which uses cascade depth control, target
tracking uses direct pitch control. This provides:
- Faster depth response for dynamic target following
- Direct 3D tracking without waypoint constraints
- More aggressive maneuvering capability
See Also
--------
guidance.targetTrack : Main target tracking guidance function
guidance.velocitySubsumptionCascadeAPF : APF guidance law
guidance.velCB : Constant bearing velocity guidance law
guidance.missionTargetFeedForwardAPF : Target velociy feed-forward
guidance.formationTargetNormPolyAPF : Target attraction & repulsion
guidance.survivalGroupNormPolyAPF : Group member repulsion
control.pitchPID : Direct pitch controller
control.headingPID : Heading autopilot
loadPathFollowing : Alternative guidance for waypoint following
loadMuNetFdmaLF : Communication setup for swarm coordination
Examples
--------
### Basic leader-follower setup with default APF:
>>> leader = Remus100s(groupId='A', isLeader=True)
>>> leader.wpt = guid.Waypoint([0, 100], [0, 0], [0, 20])
>>> leader.loadPathFollowing()
>>> follwer = Remus100s()
>>> follwer.loadTargetTracking(leader)
>>> print(follower.info['Target'])
A01-LEADER
"""
# Load Vehicle Modules
self.GuidSystem = guid.targetTrack
self.DepthAP = ctrl.pitchPID
self.HeadingAP = ctrl.headingPID
# Load Guidance Law
## Artificial Potential Field
if (law == 'APF'):
self.GuidLaw = guid.velocitySubsumptionCascadeAPF
if (mission == 'targetFeedforward'):
self.GuidLaw.mission = guid.missionTargetFeedForwardAPF
if (formation == 'targetNormPoly'):
self.GuidLaw.formation = guid.formationTargetNormPolyAPF
if (survival == 'groupNormPoly'):
self.GuidLaw.survival = guid.survivalGroupNormPolyAPF
## Constant Bearing
elif (law == 'CB'):
self.GuidLaw = guid.velCB
# Swarm Group Data Acquisition Method
## Using Direct-Access
if (self.CommNetwork is None):
self.target = target
## Using Network Communication
else:
self.target = Model(target)
# Set Parameters
self.groupId = target.groupId
# Generate Information Parameters
self.info.update([("Guidance System", f"Target Tracking, {law}")])
if (law == 'APF'):
self.info.update([("Mission Function", f"{mission}"),
("Formation Function", f"{formation}"),
("Survival Function", f"{survival}")]),
self.info.update([("Target", f"{self.target.callSign}")])
#--------------------------------------------------------------------------
[docs] def loadConstantProp(self,n_setpt:float=1200)->None:
"""
Configure constant propeller RPM command.
Sets up a simple propeller control that maintains constant RPM
throughout simulation. Useful for basic testing and scenarios
with fixed thrust.
Parameters
----------
n_setpt : float
Propeller RPM setpoint (default: 1200 rpm, ~2.0 m/s forward speed).
Notes
-----
Assigns ctrl.constProp to self.PropCmd and stores n_setpt.
Does not implement thrust control or speed regulation.
"""
# Assign Function as Vehicle Method
self.PropCmd = ctrl.constProp
# Load Parameters
self.n_setpt = n_setpt
# Generate Information Parameter
self.info.update([('Propeller Command', f"Constant, {n_setpt} rpm")])
#--------------------------------------------------------------------------
[docs] def loadAquaNetTdmaLF(self,
epDur:float=10,
bcDur:float=1,
rpDur:float=1,
)->None:
"""
Configure TDMA communication via AquaNet for leader-follower swarm.
Parameters
----------
epDur : float
Episode cycle duration in seconds (full comm cycle period).
bcDur : float
Broadcast/request frame duration in seconds (leader transmit time).
rpDur : float
Response frame duration in seconds (follower slot time).
Notes
-----
- Sets up the Episode structure, which is the full set of transmission
frames.
- The Episode starts with the Leader Broadcast / Request (BCRQ) frame,
followed by a Response (RSPN) frame for each follower.
- Configures network addresses, message schedules, and communication
manager.
"""
# AquaNet Manager Parameters
self.txAddr = 255 if self.isLeader else self.target.nodeAddr
self.nodeAddr = self.id
self.aqBaseDir = os.path.dirname(inspect.getfile(aqnet))
# Load Vehicle Modules
commNetwork = aqnet.AquaNetManager(self.nodeAddr,self.aqBaseDir)
commNetwork.start = commNetwork.initAquaNet
commNetwork.monitor = comm.recvMsg # Incoming Msg Monitor
self.CommNetwork = commNetwork
self.CommSched = comm.schedulerTdmaLF
# Message Handling Parameters
self.epsdDur = epDur # Episode Cycle Duration (s)
self.bcrqDur = bcDur # BCST/RQST Duration (s)
if (self.group is not None):
self.rspnSched = [v.id for v in self.group] # using same as id list
self.rspnFrame = 0 # RSPN Frame Start Time (s)
self.rspnSlot = 0 # RSPN Slot Start Time (s)
self.rspnDur = rpDur # RSPN Time Slot Duration (s)
if (not self.isLeader):
self.rspnRqst = False # RSPN REQUEST Received (Follower)
else:
self.nextEta, self.nextVel = guid.predictNextEtaVel(self)
# Generate Information Parameters
self.info.update([("Communication Network", "AquaNet"),
("Access Mode", f"TDMA "+
f"({self.epsdDur}|{self.bcrqDur}-{self.rspnDur})s"),
("Node Address", f"{self.nodeAddr}")])
#--------------------------------------------------------------------------
[docs] def loadMuNetLF(self, network:comm.MuNet, **kwargs)->None:
"""
Configure communication via MuNet for leader-follower swarm.
Parameters
----------
network : comm.MuNet
MuNet network object to register vehicle with
**kwargs : dict
Configuration options:
- accessMode : str
Network access mode ('fdma', 'tdma', 'tdma_lf')
- epDur : float
Episode Duration (s). Vehicle message transmission interval
- bcDur : float
TDMA - Broadcast-Request frame duration (s). (Leader)
- rpDur : float
TDMA - Response frame duration (s). (Followers)
- txOset : float
FDMA - Transmit offset (s). Time between each vehicle reporting.
Notes
-----
- Supports FDMA and TDMA access modes for flexible swarm communication.
- Registers vehicle with network and configures message timing parameters.
"""
# Default Parameters
defaults = {
'accessMode': network.accessMode,
'epDur': 5.0,
'bcDur': 1.0,
'rpDur': 1.0,
'txOset': 0.5,
}
config = {**defaults, **kwargs} # kwargs overwrite defaults
# Communication Schedule
accessMode = config['accessMode'].lower()
accessStrategies = {
'fdma': comm.schedulerFdmaLF,
'tdma': comm.schedulerTdmaLF,
'tdma_lf': comm.schedulerTdmaLF,
}
self.CommSched = accessStrategies[accessMode]
# Message Timing Parameters
self.epsdDur = config['epDur'] # TX interval (s)
if (accessMode == 'fdma'):
txOffset = config['txOset'] # TX offset between members (s)
'''Dependency on id is a problem here, need to build *much* better
parameter handling'''
self.rspnSlot = txOffset * self.id # Next TX time (s)
details = f"({self.epsdDur}|{txOffset})s"
elif (accessMode == 'tdma' or accessMode == 'tdma_lf'):
self.bcrqDur = config['bcDur'] # BCST/RQST Duration (s)
self.rspnDur = config['rpDur'] # RSPN Slot Start Time (s)
self.rspnFrame = 0 # RSPN Frame Start Time (s)
self.rspnSlot = 0 # RSPN Slot Start Time (s)
if (not self.isLeader):
self.rspnRqst = False # REQUEST Received (Follower)
if (self.group is not None):
self.rspnSched = [m.id for m in self.group] # RSPN order
details = f"({self.epsdDur}|{self.bcrqDur}-{self.rspnDur})s"
# Generate Information Parameter
# note: TX/RX channels info updated from munet after assignments
self.nodeAddr = self.id
self.info.update([('Communication Network', 'muNet'),
('Access Mode', f"{accessMode.upper()} {details}"),
('Node Address', f"{self.nodeAddr}"),
('TX Channel IDs', '-'),
('RX Channel IDs', '-'),])
# Network Access Point
self.CommNetwork = network.register(self)
'''Use single TX for now. Node stores as set for future extension.'''
self.txAddr = next(iter(self.CommNetwork.tx_channels), None)
# Prime initial state parameters -> move this somwhere else, guid?
if (self.isLeader):
self.nextEta, self.nextVel = guid.predictNextEtaVel(self)
###############################################################################
[docs]def buildGroup(num:int,
gid:str,
hasLeader:bool=True,
vehType:Vehicle=Remus100s,
**kwargs)->List[Vehicle]:
"""
Create a list of vehicle instances for swarm simulation.
Parameters
----------
num : int
Number of vehicles in the group.
gid : str
Group identifier, single character (e.g., "A").
hasLeader : bool
If True, first vehicle in list is generated with isLeader=True.
vehType : type
Vehicle class to instantiate (default: Remus100s).
**kwargs : dict
Optional keyword arguments to pass to vehicle constructor.
Returns
-------
group : list of Vehicle
List of initialized vehicle objects with consistent group ID.
If hasLeader=True, group[0].isLeader is True.
Notes
-----
**Parameter Inheritance:**
All vehicles in the group receive the same groupId and any keyword
attributes passed in kwargs. The hasLeader parameter will only set
isLeader=True for the first vehicle in the returned list.
**Vehicle Numbering:**
Vehicle IDs are assigned sequentially based on the class counter to avoid
any duplicate vehicle IDs in a single session.
Examples
--------
### Basic group with default parameters:
>>> swarm = buildGroup(5, "A", True)
>>> for auv in swarm:
... print(auv.callSign)
...
A01-LEADER
A02
A03
A04
A05
### Group with custom vehicle parameters:
>>> swarm = buildGroup(
... num=3,
... gid="B",
... hasLeader=True,
... vehType=Remus100s,
... r_safe=15.0,
... r_avoid=50.0
... )
>>> print(swarm[0].sampleTime)
0.01
>>> print(swarm[1].r_safe)
15.0
### Group without leader:
>>> swarm = buildGroup(5, "C", hasLeader=False)
>>> print(swarm[0].isLeader)
False
"""
group = []
if (hasLeader):
leader = vehType(groupId=gid, isLeader=True, **kwargs)
group.append(leader)
num -= 1
for _ in range(num):
group.append(vehType(groupId=gid, **kwargs))
return group
###############################################################################